import type { MessageInfo, MessagePartRemovedEvent, MessagePartDeltaEvent, MessagePartUpdatedEvent, MessageRemovedEvent, MessageUpdateEvent, } from "../types/message" import type { EventSessionCompacted, EventSessionDiff, EventSessionError, EventSessionIdle, EventSessionUpdated, EventSessionStatus, } from "@opencode-ai/sdk" import type { MessageStatus } from "./message-v2/types" import { getLogger } from "../lib/logger" import { requestData } from "../lib/opencode-api" import { getPermissionId, getPermissionKind, getPermissionSessionId, getRequestIdFromPermissionReply, } from "../types/permission" import type { PermissionReplyEventPropertiesLike, PermissionRequestLike } from "../types/permission" import { getQuestionId, getQuestionSessionId, getRequestIdFromQuestionReply } from "../types/question" import type { QuestionRequest } from "../types/question" import type { EventQuestionReplied, EventQuestionRejected } from "@opencode-ai/sdk/v2" import { showToastNotification, ToastVariant } from "../lib/notifications" import { sendOsNotification } from "../lib/os-notifications" import { preferences } from "./preferences" import { instances, addPermissionToQueue, removePermissionFromQueue, addQuestionToQueue, removeQuestionFromQueue, } from "./instances" import { showAlertDialog } from "./alerts" import { createClientSession, mapSdkSessionStatus, type Session, type SessionStatus } from "../types/session" import { ensureSessionParentExpanded, sessions, setSessions, syncInstanceSessionIndicator, withSession } from "./session-state" import { normalizeMessagePart } from "./message-v2/normalizers" import { updateSessionInfo } from "./message-v2/session-info" import { tGlobal } from "../lib/i18n" import { loadMessages } from "./session-api" import { getOrCreateWorktreeClient, getRootClient, getWorktreeSlugForDirectory, getWorktreeSlugForSession } from "./worktrees" import { applyPartUpdateV2, applyPartDeltaV2, replaceMessageIdV2, reconcilePendingQuestionsV2, upsertMessageInfoV2, upsertPermissionV2, upsertQuestionV2, removeMessagePartV2, removeMessageV2, removePermissionV2, removeQuestionV2, setSessionRevertV2, } from "./message-v2/bridge" import { messageStoreBus } from "./message-v2/bus" import type { InstanceMessageStore } from "./message-v2/instance-store" const log = getLogger("sse") const pendingSessionFetches = new Map>() function shouldSendOsNotification(kind: "needsInput" | "idle"): boolean { if (typeof document === "undefined") return false const pref = preferences() if (!pref.osNotificationsEnabled) return false if (!pref.osNotificationsAllowWhenVisible && document.visibilityState === "visible") return false if (kind === "needsInput") return Boolean(pref.notifyOnNeedsInput) if (kind === "idle") return Boolean(pref.notifyOnIdle) return false } function getInstanceDisplayName(instanceId: string): string { const instanceFolder = instances().get(instanceId)?.folder ?? instanceId return instanceFolder.split(/[\\/]/).filter(Boolean).pop() ?? instanceFolder } function getSessionTitle(instanceId: string, sessionId: string | undefined | null): string { if (!sessionId) return "" const session = sessions().get(instanceId)?.get(sessionId) const title = session?.title?.trim() return title && title.length > 0 ? title : sessionId } function fireOsNotification(payload: { title: string; body: string }) { void sendOsNotification(payload).catch((error) => { log.warn("Failed to send OS notification", error) }) } interface TuiToastEvent { type: "tui.toast.show" properties: { title?: string message: string variant: "info" | "success" | "warning" | "error" duration?: number } } const ALLOWED_TOAST_VARIANTS = new Set(["info", "success", "warning", "error"]) function applySessionStatus(instanceId: string, sessionId: string, status: SessionStatus) { let parentToExpand: string | null = null withSession(instanceId, sessionId, (session) => { const current = session.status ?? "idle" if (current === status) return false if (current === "compacting" && status !== "compacting") { return false } session.status = status // Auto-expand the parent thread when a child session starts working. // Users can still collapse it; we only expand on the transition. if (session.parentId && status === "working" && current !== "working") { parentToExpand = session.parentId } }) if (parentToExpand) { ensureSessionParentExpanded(instanceId, parentToExpand) } } async function fetchSessionInfo(instanceId: string, sessionId: string, directory?: string): Promise { const instance = instances().get(instanceId) if (!instance?.client) return null const slugFromDirectory = getWorktreeSlugForDirectory(instanceId, directory) const slug = slugFromDirectory ?? getWorktreeSlugForSession(instanceId, sessionId) const client = getOrCreateWorktreeClient(instanceId, slug) const rootClient = getRootClient(instanceId) try { const info = await requestData( client.session.get({ sessionID: sessionId }), "session.get", ) let fetchedStatus: SessionStatus = "idle" try { let statuses: Record = {} try { statuses = await requestData>(rootClient.session.status(), "session.status") } catch { statuses = await requestData>(client.session.status(), "session.status") } // Session status is global-ish; prefer the root context when available. // (OpenCode may scope status by directory in older builds.) // If root fails, fall back to the worktree-scoped client. // // Note: requestData throws on error, so we catch below. const rawStatus = (info as any)?.status ?? statuses?.[sessionId] const hasType = rawStatus && typeof rawStatus === "object" && typeof rawStatus.type === "string" fetchedStatus = hasType ? mapSdkSessionStatus(rawStatus) : "idle" } catch (error) { log.error("Failed to fetch session status", error) } const fetched = createClientSession(info, instanceId, "", { providerId: "", modelId: "" }, fetchedStatus) let updatedInstanceSessions: Map | undefined let shouldExpandParent: string | null = null setSessions((prev) => { const next = new Map(prev) const instanceSessions = next.get(instanceId) ?? new Map() const existing = instanceSessions.get(sessionId) const merged: Session = { ...fetched, agent: existing?.agent ?? fetched.agent, model: existing?.model ?? fetched.model, status: existing?.status === "compacting" ? "compacting" : fetched.status, pendingPermission: existing?.pendingPermission ?? fetched.pendingPermission, pendingQuestion: existing?.pendingQuestion ?? false, } instanceSessions.set(sessionId, merged) next.set(instanceId, instanceSessions) updatedInstanceSessions = instanceSessions if (merged.parentId && merged.status === "working" && (existing?.status ?? "idle") !== "working") { shouldExpandParent = merged.parentId } return next }) syncInstanceSessionIndicator(instanceId, updatedInstanceSessions) if (shouldExpandParent) { ensureSessionParentExpanded(instanceId, shouldExpandParent) } return fetched } catch (error) { log.error("Failed to fetch session info", error) return null } } function ensureSessionStatus(instanceId: string, sessionId: string, status: SessionStatus, directory?: string) { const instanceSessions = sessions().get(instanceId) const existing = instanceSessions?.get(sessionId) if (existing) { if ((existing.status ?? "idle") === status) { return } applySessionStatus(instanceId, sessionId, status) return } const key = `${instanceId}:${sessionId}` if (pendingSessionFetches.has(key)) { return } const pending = (async () => { const fetched = await fetchSessionInfo(instanceId, sessionId, directory) if (!fetched) return applySessionStatus(instanceId, sessionId, status) })() pendingSessionFetches.set(key, pending) void pending.finally(() => pendingSessionFetches.delete(key)) } type MessageRole = "user" | "assistant" function resolveMessageRole(info?: MessageInfo | null): MessageRole { return info?.role === "user" ? "user" : "assistant" } function findPendingMessageId( store: InstanceMessageStore, sessionId: string, role: MessageRole, ): string | undefined { const messageIds = store.getSessionMessageIds(sessionId) const lastId = messageIds[messageIds.length - 1] if (!lastId) return undefined const record = store.getMessage(lastId) if (!record) return undefined if (record.sessionId !== sessionId) return undefined if (record.role !== role) return undefined return record.status === "sending" ? record.id : undefined } function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | MessagePartUpdatedEvent): void { const instanceSessions = sessions().get(instanceId) if (event.type === "message.part.updated") { const rawPart = event.properties?.part if (!rawPart) return const part = normalizeMessagePart(rawPart) const messageInfo = (event as any)?.properties?.message as MessageInfo | undefined const fallbackSessionId = typeof messageInfo?.sessionID === "string" ? messageInfo.sessionID : undefined const fallbackMessageId = typeof messageInfo?.id === "string" ? messageInfo.id : undefined const sessionId = typeof part.sessionID === "string" ? part.sessionID : fallbackSessionId const messageId = typeof part.messageID === "string" ? part.messageID : fallbackMessageId if (!sessionId || !messageId) return if (part.type === "compaction") { ensureSessionStatus(instanceId, sessionId, "compacting", (event as any)?.directory) } const store = messageStoreBus.getOrCreate(instanceId) const role: MessageRole = resolveMessageRole(messageInfo) const createdAt = typeof messageInfo?.time?.created === "number" ? messageInfo.time.created : Date.now() let record = store.getMessage(messageId) if (!record) { const pendingId = findPendingMessageId(store, sessionId, role) if (pendingId && pendingId !== messageId) { replaceMessageIdV2(instanceId, pendingId, messageId) record = store.getMessage(messageId) } } if (!record) { store.upsertMessage({ id: messageId, sessionId, role, status: "streaming", createdAt, updatedAt: createdAt, isEphemeral: true, }) } if (messageInfo) { upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" }) } applyPartUpdateV2(instanceId, { ...part, sessionID: sessionId, messageID: messageId }) if (part.type === "tool" && part.tool === "question") { // Questions can arrive before their tool part exists; re-link now. reconcilePendingQuestionsV2(instanceId, sessionId) } updateSessionInfo(instanceId, sessionId) } else if (event.type === "message.updated") { const info = event.properties?.info if (!info) return const sessionId = typeof info.sessionID === "string" ? info.sessionID : undefined const messageId = typeof info.id === "string" ? info.id : undefined if (!sessionId || !messageId) return const timeInfo = (info.time ?? {}) as { created?: number; updated?: number; end?: number } const nextUpdated = typeof timeInfo.end === "number" && timeInfo.end > 0 ? timeInfo.end : typeof timeInfo.updated === "number" && timeInfo.updated > 0 ? timeInfo.updated : typeof timeInfo.created === "number" && timeInfo.created > 0 ? timeInfo.created : Date.now() withSession(instanceId, sessionId, (session) => { const currentUpdated = session.time?.updated ?? 0 if (nextUpdated <= currentUpdated) return false session.time = { ...(session.time ?? {}), updated: nextUpdated } }) const store = messageStoreBus.getOrCreate(instanceId) const role: MessageRole = info.role === "user" ? "user" : "assistant" const hasError = Boolean((info as any).error) const status: MessageStatus = hasError ? "error" : "complete" let record = store.getMessage(messageId) if (!record) { const pendingId = findPendingMessageId(store, sessionId, role) if (pendingId && pendingId !== messageId) { replaceMessageIdV2(instanceId, pendingId, messageId) record = store.getMessage(messageId) } } if (!record) { const createdAt = info.time?.created ?? Date.now() const endAt = (info.time as { end?: number } | undefined)?.end store.upsertMessage({ id: messageId, sessionId, role, status, createdAt, updatedAt: endAt ?? createdAt, }) } upsertMessageInfoV2(instanceId, info, { status, bumpRevision: true }) updateSessionInfo(instanceId, sessionId) } } function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void { const props = event.properties if (!props) return const { messageID, partID, field, delta } = props if (!messageID || !partID || !field || typeof delta !== "string") return applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta }) } function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void { const info = event.properties?.info if (!info) return const instanceSessions = sessions().get(instanceId) ?? new Map() const existingSession = instanceSessions.get(info.id) if (!existingSession) { const newSession = { id: info.id, instanceId, title: info.title || tGlobal("sessionList.session.untitled"), parentId: info.parentID || null, agent: "", model: { providerId: "", modelId: "", }, status: "idle", version: info.version || "0", time: info.time ? { ...info.time } : { created: Date.now(), updated: Date.now(), }, } as Session let updatedInstanceSessions: Map | undefined setSessions((prev) => { const next = new Map(prev) const instanceSessions = next.get(instanceId) ?? new Map() instanceSessions.set(newSession.id, newSession) next.set(instanceId, instanceSessions) updatedInstanceSessions = instanceSessions return next }) syncInstanceSessionIndicator(instanceId, updatedInstanceSessions) setSessionRevertV2(instanceId, info.id, info.revert ?? null) log.info(`[SSE] New session created: ${info.id}`, newSession) } else { const mergedTime = { ...existingSession.time, ...(info.time ?? {}), } const updatedSession = { ...existingSession, title: info.title || existingSession.title, status: existingSession.status ?? "idle", time: mergedTime, revert: info.revert ? { messageID: info.revert.messageID, partID: info.revert.partID, snapshot: info.revert.snapshot, diff: info.revert.diff, } : existingSession.revert, } let updatedInstanceSessions: Map | undefined setSessions((prev) => { const next = new Map(prev) const instanceSessions = next.get(instanceId) ?? new Map() instanceSessions.set(existingSession.id, updatedSession) next.set(instanceId, instanceSessions) updatedInstanceSessions = instanceSessions return next }) syncInstanceSessionIndicator(instanceId, updatedInstanceSessions) setSessionRevertV2(instanceId, info.id, info.revert ?? null) } } function handleSessionDiff(instanceId: string, event: EventSessionDiff): void { const sessionId = event.properties?.sessionID if (!sessionId) return const diffs = event.properties?.diff if (!Array.isArray(diffs)) return const existing = sessions().get(instanceId)?.get(sessionId) if (existing) { withSession(instanceId, sessionId, (session) => { session.diff = diffs }) return } // A diff event can arrive before we have hydrated the session list. // Best-effort: fetch the session record so the diff has somewhere to live. void (async () => { await fetchSessionInfo(instanceId, sessionId, (event as any)?.directory) withSession(instanceId, sessionId, (session) => { session.diff = diffs }) })().catch((error) => log.warn("Failed to hydrate session for diff event", { instanceId, sessionId, error })) } function handleSessionIdle(instanceId: string, event: EventSessionIdle): void { const sessionId = event.properties?.sessionID if (!sessionId) return if (shouldSendOsNotification("idle")) { const title = getInstanceDisplayName(instanceId) const label = getSessionTitle(instanceId, sessionId) const body = label ? `Session "${label}" is idle` : "Session is idle" fireOsNotification({ title, body }) } ensureSessionStatus(instanceId, sessionId, "idle", (event as any)?.directory) log.info(`[SSE] Session idle: ${sessionId}`) } function handleSessionStatus(instanceId: string, event: EventSessionStatus): void { const sessionId = event.properties?.sessionID if (!sessionId) return const status = mapSdkSessionStatus(event.properties.status) ensureSessionStatus(instanceId, sessionId, status, (event as any)?.directory) log.info(`[SSE] Session status updated: ${sessionId}`, { status }) } function handleSessionCompacted(instanceId: string, event: EventSessionCompacted): void { const sessionID = event.properties?.sessionID if (!sessionID) return log.info(`[SSE] Session compacted: ${sessionID}`) const existing = sessions().get(instanceId)?.get(sessionID) if (existing) { withSession(instanceId, sessionID, (session) => { session.status = "working" }) } else { ensureSessionStatus(instanceId, sessionID, "working", (event as any)?.directory) } loadMessages(instanceId, sessionID, true).catch((error) => log.error("Failed to reload session after compaction", error)) const instanceSessions = sessions().get(instanceId) const session = instanceSessions?.get(sessionID) const label = session?.title?.trim() ? session.title : sessionID const instanceFolder = instances().get(instanceId)?.folder ?? instanceId const instanceName = instanceFolder.split(/[\\/]/).filter(Boolean).pop() ?? instanceFolder const displayLabel = label ? `"${label}"` : sessionID showToastNotification({ title: instanceName, message: tGlobal("sessionEvents.sessionCompactedToast", { label: displayLabel }), variant: "info", duration: 10000, }) } function handleSessionError(_instanceId: string, event: EventSessionError): void { const error = event.properties?.error log.error(`[SSE] Session error:`, error) let message = tGlobal("sessionEvents.sessionError.unknown") if (error) { if ("data" in error && error.data && typeof error.data === "object" && "message" in error.data) { message = error.data.message as string } else if ("message" in error && typeof error.message === "string") { message = error.message } } showAlertDialog(tGlobal("sessionEvents.sessionError.message", { message }), { title: tGlobal("sessionEvents.sessionError.title"), variant: "error", }) } function handleMessageRemoved(instanceId: string, event: MessageRemovedEvent): void { const { sessionID, messageID } = event.properties if (!sessionID || !messageID) return log.info(`[SSE] Message removed from session ${sessionID}`, { messageID }) removeMessageV2(instanceId, messageID) updateSessionInfo(instanceId, sessionID) } function handleMessagePartRemoved(instanceId: string, event: MessagePartRemovedEvent): void { const { sessionID, messageID, partID } = event.properties if (!sessionID || !messageID || !partID) return log.info(`[SSE] Message part removed from session ${sessionID}`, { messageID, partID }) removeMessagePartV2(instanceId, messageID, partID) updateSessionInfo(instanceId, sessionID) } function handleTuiToast(_instanceId: string, event: TuiToastEvent): void { const payload = event?.properties if (!payload || typeof payload.message !== "string" || typeof payload.variant !== "string") return if (!payload.message.trim()) return const variant: ToastVariant = ALLOWED_TOAST_VARIANTS.has(payload.variant as ToastVariant) ? (payload.variant as ToastVariant) : "info" showToastNotification({ title: typeof payload.title === "string" ? payload.title : undefined, message: payload.message, variant, duration: typeof payload.duration === "number" ? payload.duration : undefined, }) } function handlePermissionUpdated(instanceId: string, event: { type: string; properties?: PermissionRequestLike } | any): void { const permission = event?.properties as PermissionRequestLike | undefined if (!permission) return log.info(`[SSE] Permission request: ${getPermissionId(permission)} (${getPermissionKind(permission)})`) addPermissionToQueue(instanceId, permission) upsertPermissionV2(instanceId, permission) if (shouldSendOsNotification("needsInput")) { const title = getInstanceDisplayName(instanceId) const sessionId = getPermissionSessionId(permission) const label = getSessionTitle(instanceId, sessionId) const body = label ? `Session "${label}" needs permission` : "Session needs permission" fireOsNotification({ title, body }) } } function handlePermissionReplied(instanceId: string, event: { type: string; properties?: PermissionReplyEventPropertiesLike } | any): void { const properties = event?.properties as PermissionReplyEventPropertiesLike | undefined const requestId = getRequestIdFromPermissionReply(properties) if (!requestId) return log.info(`[SSE] Permission replied: ${requestId}`) removePermissionFromQueue(instanceId, requestId) removePermissionV2(instanceId, requestId) } function handleQuestionAsked(instanceId: string, event: { type: string; properties?: QuestionRequest } | any): void { const request = event?.properties as QuestionRequest | undefined if (!request) return log.info(`[SSE] Question asked: ${getQuestionId(request)}`) addQuestionToQueue(instanceId, request) upsertQuestionV2(instanceId, request) if (shouldSendOsNotification("needsInput")) { const title = getInstanceDisplayName(instanceId) const sessionId = getQuestionSessionId(request) const label = getSessionTitle(instanceId, sessionId) const body = label ? `Session "${label}" needs input` : "Session needs input" fireOsNotification({ title, body }) } } function handleQuestionAnswered( instanceId: string, event: { type: string; properties?: EventQuestionReplied["properties"] | EventQuestionRejected["properties"] } | any, ): void { const properties = event?.properties as EventQuestionReplied["properties"] | EventQuestionRejected["properties"] | undefined const requestId = getRequestIdFromQuestionReply(properties) if (!requestId) return log.info(`[SSE] Question answered: ${requestId}`) removeQuestionFromQueue(instanceId, requestId) removeQuestionV2(instanceId, requestId) } export { handleMessagePartRemoved, handleMessageRemoved, handleMessagePartDelta, handleMessageUpdate, handlePermissionReplied, handlePermissionUpdated, handleQuestionAsked, handleQuestionAnswered, handleSessionCompacted, handleSessionDiff, handleSessionError, handleSessionIdle, handleSessionStatus, handleSessionUpdate, handleTuiToast, }