From 93a5c16cab3ca1157227f54c57b499a2a61076d9 Mon Sep 17 00:00:00 2001 From: Shantur Rathore Date: Wed, 26 Nov 2025 09:57:21 +0000 Subject: [PATCH] migrate session event/actions to v2 store --- packages/ui/src/stores/session-actions.ts | 54 ++-- packages/ui/src/stores/session-events.ts | 342 ++++++---------------- 2 files changed, 113 insertions(+), 283 deletions(-) diff --git a/packages/ui/src/stores/session-actions.ts b/packages/ui/src/stores/session-actions.ts index d7e8da24..5326108b 100644 --- a/packages/ui/src/stores/session-actions.ts +++ b/packages/ui/src/stores/session-actions.ts @@ -1,21 +1,11 @@ -import type { Message } from "../types/message" - import { resolvePastedPlaceholders } from "../lib/prompt-placeholders" import { instances } from "./instances" -import { - addRecentModelPreference, - preferences, - setAgentModelPreference, -} from "./preferences" +import { addRecentModelPreference, setAgentModelPreference } from "./preferences" import { sessions, withSession } from "./session-state" import { getDefaultModel, isModelValid } from "./session-models" -import { - computeDisplayParts, - getSessionIndex, - initializePartVersion, - updateSessionInfo, -} from "./session-messages" +import { updateSessionInfo } from "./session-messages" +import { messageStoreBus } from "./message-v2/bus" const ID_LENGTH = 26 const BASE62_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" @@ -93,26 +83,6 @@ async function sendMessage( }, ] - const optimisticMessage: Message = { - id: messageId, - sessionId, - type: "user", - parts: optimisticParts, - timestamp: Date.now(), - status: "sending", - version: 0, - } - - optimisticParts.forEach((part: any) => initializePartVersion(part)) - - optimisticMessage.displayParts = computeDisplayParts(optimisticMessage, preferences().showThinkingBlocks) - - withSession(instanceId, sessionId, (session) => { - session.messages.push(optimisticMessage) - const index = getSessionIndex(instanceId, sessionId) - index.messageIndex.set(optimisticMessage.id, session.messages.length - 1) - }) - const requestParts: any[] = [ { id: textPartId, @@ -167,6 +137,24 @@ async function sendMessage( } } + const store = messageStoreBus.getOrCreate(instanceId) + const createdAt = Date.now() + + store.upsertMessage({ + id: messageId, + sessionId, + role: "user", + status: "sending", + parts: optimisticParts, + createdAt, + updatedAt: createdAt, + isEphemeral: true, + }) + + withSession(instanceId, sessionId, () => { + /* trigger reactivity for legacy session data */ + }) + const requestBody = { messageID: messageId, parts: requestParts, diff --git a/packages/ui/src/stores/session-events.ts b/packages/ui/src/stores/session-events.ts index 0f1e6378..5bf7ec37 100644 --- a/packages/ui/src/stores/session-events.ts +++ b/packages/ui/src/stores/session-events.ts @@ -1,4 +1,5 @@ import type { + MessageInfo, MessagePartRemovedEvent, MessagePartUpdatedEvent, MessageRemovedEvent, @@ -14,7 +15,6 @@ import type { } from "@opencode-ai/sdk" import { showToastNotification, ToastVariant } from "../lib/notifications" -import { preferences } from "./preferences" import { instances, addPermissionToQueue, removePermissionFromQueue, refreshPermissionsForSession } from "./instances" import { showAlertDialog } from "./alerts" import { @@ -22,16 +22,7 @@ import { setSessions, withSession, } from "./session-state" -import { - bumpPartVersion, - computeDisplayParts, - getSessionIndex, - initializePartVersion, - normalizeMessagePart, - rebuildSessionIndex, - updateSessionInfo, - updateUsageFromMessageInfo, -} from "./session-messages" +import { normalizeMessagePart, updateSessionInfo } from "./session-messages" import { loadMessages } from "./session-api" import { setSessionCompactionState } from "./session-compaction" import { @@ -42,6 +33,8 @@ import { removePermissionV2, setSessionRevertV2, } from "./message-v2/bridge" +import { messageStoreBus } from "./message-v2/bus" +import type { InstanceMessageStore } from "./message-v2/instance-store" interface TuiToastEvent { type: "tui.toast.show" @@ -55,6 +48,30 @@ interface TuiToastEvent { const ALLOWED_TOAST_VARIANTS = new Set(["info", "success", "warning", "error"]) +type MessageRole = "user" | "assistant" + +function resolveMessageRole(info?: MessageInfo | null): MessageRole { + return info?.role === "user" ? "user" : "assistant" +} + +function findPendingMessageId( + store: InstanceMessageStore, + sessionId: string, + role: MessageRole, +): string | undefined { + const messageIds = store.getSessionMessageIds(sessionId) + for (let i = messageIds.length - 1; i >= 0; i -= 1) { + const record = store.getMessage(messageIds[i]) + if (!record) continue + if (record.sessionId !== sessionId) continue + if (record.role !== role) continue + if (record.status === "sending") { + return record.id + } + } + return undefined +} + function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | MessagePartUpdatedEvent): void { const instanceSessions = sessions().get(instanceId) if (!instanceSessions) return @@ -64,273 +81,98 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes if (!rawPart) return const part = normalizeMessagePart(rawPart) + const sessionId = typeof part.sessionID === "string" ? part.sessionID : undefined + const messageId = typeof part.messageID === "string" ? part.messageID : undefined + if (!sessionId || !messageId) return - const session = instanceSessions.get(part.sessionID) + const session = instanceSessions.get(sessionId) if (!session) return - const index = getSessionIndex(instanceId, part.sessionID) - let messageIndex = index.messageIndex.get(part.messageID) - let replacedTemp = false + const store = messageStoreBus.getOrCreate(instanceId) + const messageInfo = event.properties?.message as MessageInfo | undefined + const role: MessageRole = resolveMessageRole(messageInfo) + const createdAt = typeof messageInfo?.time?.created === "number" ? messageInfo.time.created : Date.now() - if (messageIndex === undefined) { - for (let i = 0; i < session.messages.length; i++) { - const msg = session.messages[i] - if (msg.sessionId === part.sessionID && msg.status === "sending") { - messageIndex = i - replacedTemp = true - break - } + let record = store.getMessage(messageId) + if (!record) { + const pendingId = findPendingMessageId(store, sessionId, role) + if (pendingId && pendingId !== messageId) { + replaceMessageIdV2(instanceId, pendingId, messageId) + record = store.getMessage(messageId) } } - if (messageIndex === undefined) { - const newMessage: any = { - id: part.messageID, - sessionId: part.sessionID, - type: "assistant" as const, - parts: [part], - timestamp: Date.now(), - status: "streaming" as const, - version: 0, - } - - initializePartVersion(part) - newMessage.displayParts = computeDisplayParts(newMessage, preferences().showThinkingBlocks) - - let insertIndex = session.messages.length - for (let i = session.messages.length - 1; i >= 0; i--) { - if (session.messages[i].id < newMessage.id) { - insertIndex = i + 1 - break - } - } - - session.messages.splice(insertIndex, 0, newMessage) - rebuildSessionIndex(instanceId, part.sessionID, session.messages) - } else { - const message = session.messages[messageIndex] - if (typeof message.version !== "number") { - message.version = 0 - } - - let filteredSynthetics = false - if (message.parts.some((partItem: any) => partItem.synthetic === true)) { - message.parts = message.parts.filter((partItem: any) => partItem.synthetic !== true) - filteredSynthetics = true - message.parts.forEach((partItem: any) => { - if (partItem.type === "text") { - partItem.renderCache = undefined - } - }) - } - - let baseParts: any[] - if (replacedTemp) { - baseParts = message.parts.filter((partItem: any) => partItem.type !== "text") - message.parts = baseParts - baseParts.forEach((partItem: any) => { - if (partItem.type === "text") { - partItem.renderCache = undefined - } - }) - } else { - baseParts = message.parts - } - - let partMap = index.partIndex.get(message.id) - if (!partMap) { - partMap = new Map() - index.partIndex.set(message.id, partMap) - } - - let shouldIncrementVersion = filteredSynthetics || replacedTemp - const partIndex = partMap.get(part.id) - - if (partIndex === undefined) { - initializePartVersion(part) - baseParts.push(part) - if (part.id && typeof part.id === "string") { - partMap.set(part.id, baseParts.length - 1) - } - shouldIncrementVersion = true - if (part.type === "text") { - part.renderCache = undefined - } - } else { - const previousPart = baseParts[partIndex] - const textUnchanged = - !filteredSynthetics && - !replacedTemp && - part.type === "text" && - previousPart?.type === "text" && - previousPart.text === part.text - - if (textUnchanged) { - return - } - - bumpPartVersion(previousPart, part) - baseParts[partIndex] = part - if (part.type !== "text" || !previousPart || previousPart.text !== part.text) { - shouldIncrementVersion = true - if (part.type === "text") { - part.renderCache = undefined - } - } - } - - const oldId = message.id - message.id = replacedTemp ? part.messageID : message.id - message.status = message.status === "sending" ? "streaming" : message.status - message.parts = baseParts - - if (shouldIncrementVersion) { - message.version += 1 - message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks) - } else if ( - !message.displayParts || - message.displayParts.showThinking !== preferences().showThinkingBlocks || - message.displayParts.version !== message.version - ) { - message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks) - } - - if (oldId !== message.id) { - index.messageIndex.delete(oldId) - index.messageIndex.set(message.id, messageIndex) - const existingPartMap = index.partIndex.get(oldId) - if (existingPartMap) { - index.partIndex.delete(oldId) - index.partIndex.set(message.id, existingPartMap) - } - replaceMessageIdV2(instanceId, oldId, message.id) - } - - if (filteredSynthetics || replacedTemp) { - const refreshed = new Map() - message.parts.forEach((partItem, idx) => { - if (partItem.id && typeof partItem.id === "string") { - refreshed.set(partItem.id, idx) - } - }) - index.partIndex.set(message.id, refreshed) - } + if (!record) { + store.upsertMessage({ + id: messageId, + sessionId, + role, + status: "streaming", + createdAt, + updatedAt: createdAt, + isEphemeral: true, + }) } - withSession(instanceId, part.sessionID, () => { - /* mutations already applied above */ - }) + if (messageInfo) { + upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" }) + } applyPartUpdateV2(instanceId, part) - updateSessionInfo(instanceId, part.sessionID) - refreshPermissionsForSession(instanceId, part.sessionID) + + withSession(instanceId, sessionId, () => { + /* trigger reactivity for legacy consumers */ + }) + + updateSessionInfo(instanceId, sessionId) + refreshPermissionsForSession(instanceId, sessionId) } else if (event.type === "message.updated") { const info = event.properties?.info if (!info) return - const session = instanceSessions.get(info.sessionID) + const sessionId = typeof info.sessionID === "string" ? info.sessionID : undefined + const messageId = typeof info.id === "string" ? info.id : undefined + if (!sessionId || !messageId) return + + const session = instanceSessions.get(sessionId) if (!session) return - const index = getSessionIndex(instanceId, info.sessionID) - let messageIndex = index.messageIndex.get(info.id) + const store = messageStoreBus.getOrCreate(instanceId) + const role: MessageRole = info.role === "user" ? "user" : "assistant" - if (messageIndex === undefined) { - let tempMessageIndex = -1 - for (let i = 0; i < session.messages.length; i++) { - const msg = session.messages[i] - if ( - msg.sessionId === info.sessionID && - msg.type === (info.role === "user" ? "user" : "assistant") && - msg.status === "sending" - ) { - tempMessageIndex = i - break - } + let record = store.getMessage(messageId) + if (!record) { + const pendingId = findPendingMessageId(store, sessionId, role) + if (pendingId && pendingId !== messageId) { + replaceMessageIdV2(instanceId, pendingId, messageId) + record = store.getMessage(messageId) } - - if (tempMessageIndex === -1) { - for (let i = 0; i < session.messages.length; i++) { - const msg = session.messages[i] - if (msg.sessionId === info.sessionID && msg.status === "sending") { - tempMessageIndex = i - break - } - } - } - - if (tempMessageIndex > -1) { - const message = session.messages[tempMessageIndex] - if (typeof message.version !== "number") { - message.version = 0 - } - - const oldId = message.id - message.id = info.id - message.type = (info.role === "user" ? "user" : "assistant") as "user" | "assistant" - message.timestamp = info.time?.created || Date.now() - message.status = "complete" as const - message.version += 1 - message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks) - - if (oldId !== message.id) { - index.messageIndex.delete(oldId) - index.messageIndex.set(message.id, tempMessageIndex) - const existingPartMap = index.partIndex.get(oldId) - if (existingPartMap) { - index.partIndex.delete(oldId) - index.partIndex.set(message.id, existingPartMap) - } - replaceMessageIdV2(instanceId, oldId, message.id) - } - } else { - const newMessage: any = { - id: info.id, - sessionId: info.sessionID, - type: (info.role === "user" ? "user" : "assistant") as "user" | "assistant", - parts: [], - timestamp: info.time?.created || Date.now(), - status: "complete" as const, - version: 0, - } - - newMessage.displayParts = computeDisplayParts(newMessage, preferences().showThinkingBlocks) - - let insertIndex = session.messages.length - for (let i = session.messages.length - 1; i >= 0; i--) { - if (session.messages[i].id < newMessage.id) { - insertIndex = i + 1 - break - } - } - - session.messages.splice(insertIndex, 0, newMessage) - rebuildSessionIndex(instanceId, info.sessionID, session.messages) - } - } else { - const message = session.messages[messageIndex] - if (typeof message.version !== "number") { - message.version = 0 - } - message.status = "complete" as const - message.version += 1 - message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks) } - upsertMessageInfoV2(instanceId, info, { status: "complete" }) + if (!record) { + const createdAt = info.time?.created ?? Date.now() + const completedAt = (info.time as { completed?: number } | undefined)?.completed + store.upsertMessage({ + id: messageId, + sessionId, + role, + status: "complete", + createdAt, + updatedAt: completedAt ?? createdAt, + }) + } - session.messagesInfo.set(info.id, info) - updateUsageFromMessageInfo(instanceId, info.sessionID, info) + upsertMessageInfoV2(instanceId, info, { status: "complete", bumpRevision: true }) - withSession(instanceId, info.sessionID, () => { - /* ensure reactivity */ + withSession(instanceId, sessionId, () => { + /* ensure reactivity for legacy session observers */ }) - updateSessionInfo(instanceId, info.sessionID) - refreshPermissionsForSession(instanceId, info.sessionID) + updateSessionInfo(instanceId, sessionId) + refreshPermissionsForSession(instanceId, sessionId) } } - function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void { const info = event.properties?.info if (!info) return