Track session status via SSE updates
This commit is contained in:
@@ -12,6 +12,7 @@ import type {
|
||||
EventSessionError,
|
||||
EventSessionIdle,
|
||||
EventSessionUpdated,
|
||||
EventSessionStatus,
|
||||
} from "@opencode-ai/sdk"
|
||||
import type { MessageStatus } from "./message-v2/types"
|
||||
|
||||
@@ -19,11 +20,11 @@ import { getLogger } from "../lib/logger"
|
||||
import { showToastNotification, ToastVariant } from "../lib/notifications"
|
||||
import { instances, addPermissionToQueue, removePermissionFromQueue } from "./instances"
|
||||
import { showAlertDialog } from "./alerts"
|
||||
import { createClientSession, Session, SessionStatus } from "../types/session"
|
||||
import { sessions, setSessions, withSession } from "./session-state"
|
||||
import { normalizeMessagePart } from "./message-v2/normalizers"
|
||||
import { updateSessionInfo } from "./message-v2/session-info"
|
||||
|
||||
const log = getLogger("sse")
|
||||
import { loadMessages } from "./session-api"
|
||||
import { setSessionCompactionState } from "./session-compaction"
|
||||
import {
|
||||
@@ -39,6 +40,9 @@ import {
|
||||
import { messageStoreBus } from "./message-v2/bus"
|
||||
import type { InstanceMessageStore } from "./message-v2/instance-store"
|
||||
|
||||
const log = getLogger("sse")
|
||||
const pendingSessionFetches = new Map<string, Promise<void>>()
|
||||
|
||||
interface TuiToastEvent {
|
||||
type: "tui.toast.show"
|
||||
properties: {
|
||||
@@ -51,8 +55,83 @@ interface TuiToastEvent {
|
||||
|
||||
const ALLOWED_TOAST_VARIANTS = new Set<ToastVariant>(["info", "success", "warning", "error"])
|
||||
|
||||
const mapSdkSessionStatus = (status: EventSessionStatus["properties"]["status"]): SessionStatus => {
|
||||
if (!status || status.type === "idle") {
|
||||
return "idle"
|
||||
}
|
||||
if (status.type === "retry") {
|
||||
return "working"
|
||||
}
|
||||
return "working"
|
||||
}
|
||||
|
||||
function applySessionStatus(instanceId: string, sessionId: string, status: SessionStatus, bumpUpdated = false) {
|
||||
withSession(instanceId, sessionId, (session) => {
|
||||
session.status = status
|
||||
if (bumpUpdated) {
|
||||
session.time = { ...(session.time ?? {}), updated: Date.now() }
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async function fetchSessionInfo(instanceId: string, sessionId: string): Promise<Session | null> {
|
||||
const instance = instances().get(instanceId)
|
||||
if (!instance?.client) return null
|
||||
|
||||
try {
|
||||
const response = await instance.client.session.get({ path: { id: sessionId } })
|
||||
if (!response.data) return null
|
||||
|
||||
const fetched = createClientSession(response.data, instanceId)
|
||||
|
||||
setSessions((prev) => {
|
||||
const next = new Map(prev)
|
||||
const instanceSessions = new Map(next.get(instanceId) ?? [])
|
||||
const existing = instanceSessions.get(sessionId)
|
||||
instanceSessions.set(sessionId, {
|
||||
...fetched,
|
||||
agent: existing?.agent ?? fetched.agent,
|
||||
model: existing?.model ?? fetched.model,
|
||||
status: existing?.status ?? fetched.status,
|
||||
pendingPermission: existing?.pendingPermission ?? fetched.pendingPermission,
|
||||
})
|
||||
next.set(instanceId, instanceSessions)
|
||||
return next
|
||||
})
|
||||
|
||||
return fetched
|
||||
} catch (error) {
|
||||
log.error("Failed to fetch session info", error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
function ensureSessionStatus(instanceId: string, sessionId: string, status: SessionStatus, bumpUpdated = false) {
|
||||
const instanceSessions = sessions().get(instanceId)
|
||||
const existing = instanceSessions?.get(sessionId)
|
||||
if (existing) {
|
||||
applySessionStatus(instanceId, sessionId, status, bumpUpdated)
|
||||
return
|
||||
}
|
||||
|
||||
const key = `${instanceId}:${sessionId}`
|
||||
if (pendingSessionFetches.has(key)) {
|
||||
return
|
||||
}
|
||||
|
||||
const pending = (async () => {
|
||||
const fetched = await fetchSessionInfo(instanceId, sessionId)
|
||||
if (!fetched) return
|
||||
applySessionStatus(instanceId, sessionId, status, bumpUpdated)
|
||||
})()
|
||||
|
||||
pendingSessionFetches.set(key, pending)
|
||||
void pending.finally(() => pendingSessionFetches.delete(key))
|
||||
}
|
||||
|
||||
type MessageRole = "user" | "assistant"
|
||||
|
||||
|
||||
function resolveMessageRole(info?: MessageInfo | null): MessageRole {
|
||||
return info?.role === "user" ? "user" : "assistant"
|
||||
}
|
||||
@@ -74,7 +153,6 @@ function findPendingMessageId(
|
||||
|
||||
function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | MessagePartUpdatedEvent): void {
|
||||
const instanceSessions = sessions().get(instanceId)
|
||||
if (!instanceSessions) return
|
||||
|
||||
if (event.type === "message.part.updated") {
|
||||
const rawPart = event.properties?.part
|
||||
@@ -90,9 +168,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
|
||||
const messageId = typeof part.messageID === "string" ? part.messageID : fallbackMessageId
|
||||
if (!sessionId || !messageId) return
|
||||
|
||||
const session = instanceSessions.get(sessionId)
|
||||
if (!session) return
|
||||
|
||||
const session = instanceSessions?.get(sessionId)
|
||||
if (!session) {
|
||||
ensureSessionStatus(instanceId, sessionId, "working", true)
|
||||
return
|
||||
}
|
||||
|
||||
applySessionStatus(instanceId, sessionId, "working", true)
|
||||
|
||||
const store = messageStoreBus.getOrCreate(instanceId)
|
||||
const role: MessageRole = resolveMessageRole(messageInfo)
|
||||
const createdAt = typeof messageInfo?.time?.created === "number" ? messageInfo.time.created : Date.now()
|
||||
@@ -135,10 +218,16 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
|
||||
const messageId = typeof info.id === "string" ? info.id : undefined
|
||||
if (!sessionId || !messageId) return
|
||||
|
||||
const session = instanceSessions.get(sessionId)
|
||||
if (!session) return
|
||||
const session = instanceSessions?.get(sessionId)
|
||||
if (!session) {
|
||||
ensureSessionStatus(instanceId, sessionId, "working", true)
|
||||
return
|
||||
}
|
||||
|
||||
applySessionStatus(instanceId, sessionId, "working", true)
|
||||
|
||||
const store = messageStoreBus.getOrCreate(instanceId)
|
||||
|
||||
const role: MessageRole = info.role === "user" ? "user" : "assistant"
|
||||
const hasError = Boolean((info as any).error)
|
||||
const status: MessageStatus = hasError ? "error" : "complete"
|
||||
@@ -180,8 +269,7 @@ function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): vo
|
||||
const isCompacting = typeof compactingFlag === "number" ? compactingFlag > 0 : Boolean(compactingFlag)
|
||||
setSessionCompactionState(instanceId, info.id, isCompacting)
|
||||
|
||||
const instanceSessions = sessions().get(instanceId)
|
||||
if (!instanceSessions) return
|
||||
const instanceSessions = sessions().get(instanceId) ?? new Map<string, Session>()
|
||||
|
||||
const existingSession = instanceSessions.get(info.id)
|
||||
|
||||
@@ -196,6 +284,7 @@ function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): vo
|
||||
providerId: "",
|
||||
modelId: "",
|
||||
},
|
||||
status: isCompacting ? "compacting" : "idle",
|
||||
version: info.version || "0",
|
||||
time: info.time
|
||||
? { ...info.time }
|
||||
@@ -203,7 +292,7 @@ function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): vo
|
||||
created: Date.now(),
|
||||
updated: Date.now(),
|
||||
},
|
||||
} as any
|
||||
} as Session
|
||||
|
||||
setSessions((prev) => {
|
||||
const next = new Map(prev)
|
||||
@@ -227,6 +316,7 @@ function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): vo
|
||||
const updatedSession = {
|
||||
...existingSession,
|
||||
title: info.title || existingSession.title,
|
||||
status: isCompacting ? "compacting" : (existingSession.status ?? "idle"),
|
||||
time: mergedTime,
|
||||
revert: info.revert
|
||||
? {
|
||||
@@ -249,13 +339,23 @@ function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): vo
|
||||
}
|
||||
}
|
||||
|
||||
function handleSessionIdle(_instanceId: string, event: EventSessionIdle): void {
|
||||
function handleSessionIdle(instanceId: string, event: EventSessionIdle): void {
|
||||
const sessionId = event.properties?.sessionID
|
||||
if (!sessionId) return
|
||||
|
||||
ensureSessionStatus(instanceId, sessionId, "idle")
|
||||
log.info(`[SSE] Session idle: ${sessionId}`)
|
||||
}
|
||||
|
||||
function handleSessionStatus(instanceId: string, event: EventSessionStatus): void {
|
||||
const sessionId = event.properties?.sessionID
|
||||
if (!sessionId) return
|
||||
|
||||
const status = mapSdkSessionStatus(event.properties.status)
|
||||
ensureSessionStatus(instanceId, sessionId, status, status === "working")
|
||||
log.info(`[SSE] Session status updated: ${sessionId}`, { status })
|
||||
}
|
||||
|
||||
function handleSessionCompacted(instanceId: string, event: EventSessionCompacted): void {
|
||||
const sessionID = event.properties?.sessionID
|
||||
if (!sessionID) return
|
||||
@@ -263,6 +363,7 @@ function handleSessionCompacted(instanceId: string, event: EventSessionCompacted
|
||||
log.info(`[SSE] Session compacted: ${sessionID}`)
|
||||
|
||||
setSessionCompactionState(instanceId, sessionID, false)
|
||||
ensureSessionStatus(instanceId, sessionID, "idle")
|
||||
|
||||
withSession(instanceId, sessionID, (session) => {
|
||||
const time = { ...(session.time ?? {}) }
|
||||
@@ -368,6 +469,7 @@ export {
|
||||
handleSessionCompacted,
|
||||
handleSessionError,
|
||||
handleSessionIdle,
|
||||
handleSessionStatus,
|
||||
handleSessionUpdate,
|
||||
handleTuiToast,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user