migrate session event/actions to v2 store

This commit is contained in:
Shantur Rathore
2025-11-26 09:57:21 +00:00
parent 16b76385e2
commit 93a5c16cab
2 changed files with 113 additions and 283 deletions

View File

@@ -1,21 +1,11 @@
import type { Message } from "../types/message"
import { resolvePastedPlaceholders } from "../lib/prompt-placeholders"
import { instances } from "./instances"
import {
addRecentModelPreference,
preferences,
setAgentModelPreference,
} from "./preferences"
import { addRecentModelPreference, setAgentModelPreference } from "./preferences"
import { sessions, withSession } from "./session-state"
import { getDefaultModel, isModelValid } from "./session-models"
import {
computeDisplayParts,
getSessionIndex,
initializePartVersion,
updateSessionInfo,
} from "./session-messages"
import { updateSessionInfo } from "./session-messages"
import { messageStoreBus } from "./message-v2/bus"
const ID_LENGTH = 26
const BASE62_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
@@ -93,26 +83,6 @@ async function sendMessage(
},
]
const optimisticMessage: Message = {
id: messageId,
sessionId,
type: "user",
parts: optimisticParts,
timestamp: Date.now(),
status: "sending",
version: 0,
}
optimisticParts.forEach((part: any) => initializePartVersion(part))
optimisticMessage.displayParts = computeDisplayParts(optimisticMessage, preferences().showThinkingBlocks)
withSession(instanceId, sessionId, (session) => {
session.messages.push(optimisticMessage)
const index = getSessionIndex(instanceId, sessionId)
index.messageIndex.set(optimisticMessage.id, session.messages.length - 1)
})
const requestParts: any[] = [
{
id: textPartId,
@@ -167,6 +137,24 @@ async function sendMessage(
}
}
const store = messageStoreBus.getOrCreate(instanceId)
const createdAt = Date.now()
store.upsertMessage({
id: messageId,
sessionId,
role: "user",
status: "sending",
parts: optimisticParts,
createdAt,
updatedAt: createdAt,
isEphemeral: true,
})
withSession(instanceId, sessionId, () => {
/* trigger reactivity for legacy session data */
})
const requestBody = {
messageID: messageId,
parts: requestParts,

View File

@@ -1,4 +1,5 @@
import type {
MessageInfo,
MessagePartRemovedEvent,
MessagePartUpdatedEvent,
MessageRemovedEvent,
@@ -14,7 +15,6 @@ import type {
} from "@opencode-ai/sdk"
import { showToastNotification, ToastVariant } from "../lib/notifications"
import { preferences } from "./preferences"
import { instances, addPermissionToQueue, removePermissionFromQueue, refreshPermissionsForSession } from "./instances"
import { showAlertDialog } from "./alerts"
import {
@@ -22,16 +22,7 @@ import {
setSessions,
withSession,
} from "./session-state"
import {
bumpPartVersion,
computeDisplayParts,
getSessionIndex,
initializePartVersion,
normalizeMessagePart,
rebuildSessionIndex,
updateSessionInfo,
updateUsageFromMessageInfo,
} from "./session-messages"
import { normalizeMessagePart, updateSessionInfo } from "./session-messages"
import { loadMessages } from "./session-api"
import { setSessionCompactionState } from "./session-compaction"
import {
@@ -42,6 +33,8 @@ import {
removePermissionV2,
setSessionRevertV2,
} from "./message-v2/bridge"
import { messageStoreBus } from "./message-v2/bus"
import type { InstanceMessageStore } from "./message-v2/instance-store"
interface TuiToastEvent {
type: "tui.toast.show"
@@ -55,6 +48,30 @@ interface TuiToastEvent {
const ALLOWED_TOAST_VARIANTS = new Set<ToastVariant>(["info", "success", "warning", "error"])
type MessageRole = "user" | "assistant"
function resolveMessageRole(info?: MessageInfo | null): MessageRole {
return info?.role === "user" ? "user" : "assistant"
}
function findPendingMessageId(
store: InstanceMessageStore,
sessionId: string,
role: MessageRole,
): string | undefined {
const messageIds = store.getSessionMessageIds(sessionId)
for (let i = messageIds.length - 1; i >= 0; i -= 1) {
const record = store.getMessage(messageIds[i])
if (!record) continue
if (record.sessionId !== sessionId) continue
if (record.role !== role) continue
if (record.status === "sending") {
return record.id
}
}
return undefined
}
function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | MessagePartUpdatedEvent): void {
const instanceSessions = sessions().get(instanceId)
if (!instanceSessions) return
@@ -64,273 +81,98 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
if (!rawPart) return
const part = normalizeMessagePart(rawPart)
const sessionId = typeof part.sessionID === "string" ? part.sessionID : undefined
const messageId = typeof part.messageID === "string" ? part.messageID : undefined
if (!sessionId || !messageId) return
const session = instanceSessions.get(part.sessionID)
const session = instanceSessions.get(sessionId)
if (!session) return
const index = getSessionIndex(instanceId, part.sessionID)
let messageIndex = index.messageIndex.get(part.messageID)
let replacedTemp = false
const store = messageStoreBus.getOrCreate(instanceId)
const messageInfo = event.properties?.message as MessageInfo | undefined
const role: MessageRole = resolveMessageRole(messageInfo)
const createdAt = typeof messageInfo?.time?.created === "number" ? messageInfo.time.created : Date.now()
if (messageIndex === undefined) {
for (let i = 0; i < session.messages.length; i++) {
const msg = session.messages[i]
if (msg.sessionId === part.sessionID && msg.status === "sending") {
messageIndex = i
replacedTemp = true
break
}
let record = store.getMessage(messageId)
if (!record) {
const pendingId = findPendingMessageId(store, sessionId, role)
if (pendingId && pendingId !== messageId) {
replaceMessageIdV2(instanceId, pendingId, messageId)
record = store.getMessage(messageId)
}
}
if (messageIndex === undefined) {
const newMessage: any = {
id: part.messageID,
sessionId: part.sessionID,
type: "assistant" as const,
parts: [part],
timestamp: Date.now(),
status: "streaming" as const,
version: 0,
}
initializePartVersion(part)
newMessage.displayParts = computeDisplayParts(newMessage, preferences().showThinkingBlocks)
let insertIndex = session.messages.length
for (let i = session.messages.length - 1; i >= 0; i--) {
if (session.messages[i].id < newMessage.id) {
insertIndex = i + 1
break
}
}
session.messages.splice(insertIndex, 0, newMessage)
rebuildSessionIndex(instanceId, part.sessionID, session.messages)
} else {
const message = session.messages[messageIndex]
if (typeof message.version !== "number") {
message.version = 0
}
let filteredSynthetics = false
if (message.parts.some((partItem: any) => partItem.synthetic === true)) {
message.parts = message.parts.filter((partItem: any) => partItem.synthetic !== true)
filteredSynthetics = true
message.parts.forEach((partItem: any) => {
if (partItem.type === "text") {
partItem.renderCache = undefined
}
})
}
let baseParts: any[]
if (replacedTemp) {
baseParts = message.parts.filter((partItem: any) => partItem.type !== "text")
message.parts = baseParts
baseParts.forEach((partItem: any) => {
if (partItem.type === "text") {
partItem.renderCache = undefined
}
})
} else {
baseParts = message.parts
}
let partMap = index.partIndex.get(message.id)
if (!partMap) {
partMap = new Map()
index.partIndex.set(message.id, partMap)
}
let shouldIncrementVersion = filteredSynthetics || replacedTemp
const partIndex = partMap.get(part.id)
if (partIndex === undefined) {
initializePartVersion(part)
baseParts.push(part)
if (part.id && typeof part.id === "string") {
partMap.set(part.id, baseParts.length - 1)
}
shouldIncrementVersion = true
if (part.type === "text") {
part.renderCache = undefined
}
} else {
const previousPart = baseParts[partIndex]
const textUnchanged =
!filteredSynthetics &&
!replacedTemp &&
part.type === "text" &&
previousPart?.type === "text" &&
previousPart.text === part.text
if (textUnchanged) {
return
}
bumpPartVersion(previousPart, part)
baseParts[partIndex] = part
if (part.type !== "text" || !previousPart || previousPart.text !== part.text) {
shouldIncrementVersion = true
if (part.type === "text") {
part.renderCache = undefined
}
}
}
const oldId = message.id
message.id = replacedTemp ? part.messageID : message.id
message.status = message.status === "sending" ? "streaming" : message.status
message.parts = baseParts
if (shouldIncrementVersion) {
message.version += 1
message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks)
} else if (
!message.displayParts ||
message.displayParts.showThinking !== preferences().showThinkingBlocks ||
message.displayParts.version !== message.version
) {
message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks)
}
if (oldId !== message.id) {
index.messageIndex.delete(oldId)
index.messageIndex.set(message.id, messageIndex)
const existingPartMap = index.partIndex.get(oldId)
if (existingPartMap) {
index.partIndex.delete(oldId)
index.partIndex.set(message.id, existingPartMap)
}
replaceMessageIdV2(instanceId, oldId, message.id)
}
if (filteredSynthetics || replacedTemp) {
const refreshed = new Map<string, number>()
message.parts.forEach((partItem, idx) => {
if (partItem.id && typeof partItem.id === "string") {
refreshed.set(partItem.id, idx)
}
})
index.partIndex.set(message.id, refreshed)
}
if (!record) {
store.upsertMessage({
id: messageId,
sessionId,
role,
status: "streaming",
createdAt,
updatedAt: createdAt,
isEphemeral: true,
})
}
withSession(instanceId, part.sessionID, () => {
/* mutations already applied above */
})
if (messageInfo) {
upsertMessageInfoV2(instanceId, messageInfo, { status: "streaming" })
}
applyPartUpdateV2(instanceId, part)
updateSessionInfo(instanceId, part.sessionID)
refreshPermissionsForSession(instanceId, part.sessionID)
withSession(instanceId, sessionId, () => {
/* trigger reactivity for legacy consumers */
})
updateSessionInfo(instanceId, sessionId)
refreshPermissionsForSession(instanceId, sessionId)
} else if (event.type === "message.updated") {
const info = event.properties?.info
if (!info) return
const session = instanceSessions.get(info.sessionID)
const sessionId = typeof info.sessionID === "string" ? info.sessionID : undefined
const messageId = typeof info.id === "string" ? info.id : undefined
if (!sessionId || !messageId) return
const session = instanceSessions.get(sessionId)
if (!session) return
const index = getSessionIndex(instanceId, info.sessionID)
let messageIndex = index.messageIndex.get(info.id)
const store = messageStoreBus.getOrCreate(instanceId)
const role: MessageRole = info.role === "user" ? "user" : "assistant"
if (messageIndex === undefined) {
let tempMessageIndex = -1
for (let i = 0; i < session.messages.length; i++) {
const msg = session.messages[i]
if (
msg.sessionId === info.sessionID &&
msg.type === (info.role === "user" ? "user" : "assistant") &&
msg.status === "sending"
) {
tempMessageIndex = i
break
}
let record = store.getMessage(messageId)
if (!record) {
const pendingId = findPendingMessageId(store, sessionId, role)
if (pendingId && pendingId !== messageId) {
replaceMessageIdV2(instanceId, pendingId, messageId)
record = store.getMessage(messageId)
}
if (tempMessageIndex === -1) {
for (let i = 0; i < session.messages.length; i++) {
const msg = session.messages[i]
if (msg.sessionId === info.sessionID && msg.status === "sending") {
tempMessageIndex = i
break
}
}
}
if (tempMessageIndex > -1) {
const message = session.messages[tempMessageIndex]
if (typeof message.version !== "number") {
message.version = 0
}
const oldId = message.id
message.id = info.id
message.type = (info.role === "user" ? "user" : "assistant") as "user" | "assistant"
message.timestamp = info.time?.created || Date.now()
message.status = "complete" as const
message.version += 1
message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks)
if (oldId !== message.id) {
index.messageIndex.delete(oldId)
index.messageIndex.set(message.id, tempMessageIndex)
const existingPartMap = index.partIndex.get(oldId)
if (existingPartMap) {
index.partIndex.delete(oldId)
index.partIndex.set(message.id, existingPartMap)
}
replaceMessageIdV2(instanceId, oldId, message.id)
}
} else {
const newMessage: any = {
id: info.id,
sessionId: info.sessionID,
type: (info.role === "user" ? "user" : "assistant") as "user" | "assistant",
parts: [],
timestamp: info.time?.created || Date.now(),
status: "complete" as const,
version: 0,
}
newMessage.displayParts = computeDisplayParts(newMessage, preferences().showThinkingBlocks)
let insertIndex = session.messages.length
for (let i = session.messages.length - 1; i >= 0; i--) {
if (session.messages[i].id < newMessage.id) {
insertIndex = i + 1
break
}
}
session.messages.splice(insertIndex, 0, newMessage)
rebuildSessionIndex(instanceId, info.sessionID, session.messages)
}
} else {
const message = session.messages[messageIndex]
if (typeof message.version !== "number") {
message.version = 0
}
message.status = "complete" as const
message.version += 1
message.displayParts = computeDisplayParts(message, preferences().showThinkingBlocks)
}
upsertMessageInfoV2(instanceId, info, { status: "complete" })
if (!record) {
const createdAt = info.time?.created ?? Date.now()
const completedAt = (info.time as { completed?: number } | undefined)?.completed
store.upsertMessage({
id: messageId,
sessionId,
role,
status: "complete",
createdAt,
updatedAt: completedAt ?? createdAt,
})
}
session.messagesInfo.set(info.id, info)
updateUsageFromMessageInfo(instanceId, info.sessionID, info)
upsertMessageInfoV2(instanceId, info, { status: "complete", bumpRevision: true })
withSession(instanceId, info.sessionID, () => {
/* ensure reactivity */
withSession(instanceId, sessionId, () => {
/* ensure reactivity for legacy session observers */
})
updateSessionInfo(instanceId, info.sessionID)
refreshPermissionsForSession(instanceId, info.sessionID)
updateSessionInfo(instanceId, sessionId)
refreshPermissionsForSession(instanceId, sessionId)
}
}
function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {
const info = event.properties?.info
if (!info) return