From 3a15ba7f769ddc0d0d066f2d160bb17bfd74bb15 Mon Sep 17 00:00:00 2001 From: Shantur Rathore Date: Tue, 28 Oct 2025 11:19:19 +0000 Subject: [PATCH] Optimize streaming updates and clear optimistic parts --- src/components/message-stream.tsx | 9 +- src/components/opencode-binary-selector.tsx | 2 +- src/stores/instances.ts | 5 +- src/stores/sessions.ts | 442 +++++++++++++++----- 4 files changed, 358 insertions(+), 100 deletions(-) diff --git a/src/components/message-stream.tsx b/src/components/message-stream.tsx index cf9cc770..0af80ed1 100644 --- a/src/components/message-stream.tsx +++ b/src/components/message-stream.tsx @@ -180,15 +180,16 @@ export default function MessageStream(props: MessageStreamProps) { const displayItems = createMemo(() => { const items: DisplayItem[] = [] - let lastAssistantMessageId = "" + let lastAssistantIndex = -1 for (let i = props.messages.length - 1; i >= 0; i--) { if (props.messages[i].type === "assistant") { - lastAssistantMessageId = props.messages[i].id + lastAssistantIndex = i break } } - for (const message of props.messages) { + for (let index = 0; index < props.messages.length; index++) { + const message = props.messages[index] const messageInfo = props.messagesInfo?.get(message.id) // If we hit the revert point, stop rendering messages @@ -200,7 +201,7 @@ export default function MessageStream(props: MessageStreamProps) { const toolParts = message.parts.filter((p) => p.type === "tool") const reasoningParts = preferences().showThinkingBlocks ? message.parts.filter((p) => p.type === "reasoning") : [] - const isQueued = message.type === "user" && message.id > lastAssistantMessageId + const isQueued = message.type === "user" && (lastAssistantIndex === -1 || index > lastAssistantIndex) if (textParts.length > 0 || reasoningParts.length > 0 || messageInfo?.error) { items.push({ diff --git a/src/components/opencode-binary-selector.tsx b/src/components/opencode-binary-selector.tsx index 92d8413a..a71c149a 100644 --- a/src/components/opencode-binary-selector.tsx +++ b/src/components/opencode-binary-selector.tsx @@ -67,7 +67,7 @@ const OpenCodeBinarySelector: Component = (props) = onCleanup(() => { document.removeEventListener("click", handleClickOutside) // Clean up validating state on unmount - setValidatingPaths(new Set()) + setValidatingPaths(new Set()) setValidating(false) }) }) diff --git a/src/stores/instances.ts b/src/stores/instances.ts index 20686c79..3f8b69ec 100644 --- a/src/stores/instances.ts +++ b/src/stores/instances.ts @@ -2,7 +2,7 @@ import { createSignal } from "solid-js" import type { Instance, LogEntry } from "../types/instance" import { sdkManager } from "../lib/sdk-manager" import { sseManager } from "../lib/sse-manager" -import { fetchSessions, fetchAgents, fetchProviders } from "./sessions" +import { fetchSessions, fetchAgents, fetchProviders, removeSessionIndexes } from "./sessions" import { preferences, updateLastUsedBinary } from "./preferences" const [instances, setInstances] = createSignal>(new Map()) @@ -39,6 +39,9 @@ function removeInstance(id: string) { if (activeInstanceId() === id) { setActiveInstanceId(null) } + + // Clean up session indexes for removed instance + removeSessionIndexes(id) } async function createInstance(folder: string, binaryPath?: string): Promise { diff --git a/src/stores/sessions.ts b/src/stores/sessions.ts index 580ef99a..27336da3 100644 --- a/src/stores/sessions.ts +++ b/src/stores/sessions.ts @@ -27,6 +27,129 @@ const [loading, setLoading] = createSignal({ const [messagesLoaded, setMessagesLoaded] = createSignal>>(new Map()) const [sessionInfoByInstance, setSessionInfoByInstance] = createSignal>>(new Map()) +// Message index cache structure: instanceId -> sessionId -> { messageIndex, partIndex } +const sessionIndexes = new Map< + string, + Map; partIndex: Map> }> +>() + +function getSessionIndex(instanceId: string, sessionId: string) { + let instanceMap = sessionIndexes.get(instanceId) + if (!instanceMap) { + instanceMap = new Map() + sessionIndexes.set(instanceId, instanceMap) + } + + let sessionMap = instanceMap.get(sessionId) + if (!sessionMap) { + sessionMap = { messageIndex: new Map(), partIndex: new Map() } + instanceMap.set(sessionId, sessionMap) + } + + return sessionMap +} + +function rebuildSessionIndex(instanceId: string, sessionId: string, messages: Message[]) { + const index = getSessionIndex(instanceId, sessionId) + index.messageIndex.clear() + index.partIndex.clear() + + messages.forEach((message, messageIdx) => { + index.messageIndex.set(message.id, messageIdx) + + const partMap = new Map() + message.parts.forEach((part, partIdx) => { + if (part.id && typeof part.id === "string") { + partMap.set(part.id, partIdx) + } + }) + index.partIndex.set(message.id, partMap) + }) +} + +function clearSessionIndex(instanceId: string, sessionId: string) { + const instanceMap = sessionIndexes.get(instanceId) + if (instanceMap) { + instanceMap.delete(sessionId) + if (instanceMap.size === 0) { + sessionIndexes.delete(instanceId) + } + } +} + +function removeSessionIndexes(instanceId: string) { + sessionIndexes.delete(instanceId) +} + +function withSession(instanceId: string, sessionId: string, updater: (session: Session) => void) { + const instanceSessions = sessions().get(instanceId) + if (!instanceSessions) return + + const session = instanceSessions.get(sessionId) + if (!session) return + + updater(session) + + // Create new session object with fresh references to trigger reactivity + const updatedSession = { + ...session, + messages: [...session.messages], + messagesInfo: new Map(session.messagesInfo), + } + + setSessions((prev) => { + const next = new Map(prev) + const newInstanceSessions = new Map(instanceSessions) + newInstanceSessions.set(sessionId, updatedSession) + next.set(instanceId, newInstanceSessions) + return next + }) +} + +const ID_LENGTH = 26 +const BASE62_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + +let lastTimestamp = 0 +let localCounter = 0 + +function randomBase62(length: number): string { + let result = "" + const cryptoObj = (globalThis as unknown as { crypto?: Crypto }).crypto + if (cryptoObj && typeof cryptoObj.getRandomValues === "function") { + const bytes = new Uint8Array(length) + cryptoObj.getRandomValues(bytes) + for (let i = 0; i < length; i++) { + result += BASE62_CHARS[bytes[i] % BASE62_CHARS.length] + } + } else { + for (let i = 0; i < length; i++) { + const idx = Math.floor(Math.random() * BASE62_CHARS.length) + result += BASE62_CHARS[idx] + } + } + return result +} + +function createId(prefix: string): string { + const timestamp = Date.now() + if (timestamp !== lastTimestamp) { + lastTimestamp = timestamp + localCounter = 0 + } + localCounter++ + + const value = (BigInt(timestamp) << BigInt(12)) + BigInt(localCounter) + const bytes = new Array(6) + for (let i = 0; i < 6; i++) { + const shift = BigInt(8 * (5 - i)) + bytes[i] = Number((value >> shift) & BigInt(0xff)) + } + const hex = bytes.map((b) => b.toString(16).padStart(2, "0")).join("") + const random = randomBase62(ID_LENGTH - 12) + + return `${prefix}_${hex}${random}` +} + async function fetchSessions(instanceId: string): Promise { const instance = instances().get(instanceId) if (!instance || !instance.client) { @@ -287,6 +410,9 @@ async function createSession(instanceId: string, agent?: string): Promise { const next = new Map(prev) @@ -609,6 +738,9 @@ async function loadMessages(instanceId: string, sessionId: string, force = false return next }) + // Rebuild index after loading messages + rebuildSessionIndex(instanceId, sessionId, messages) + setMessagesLoaded((prev) => { const next = new Map(prev) const loadedSet = next.get(instanceId) || new Set() @@ -641,42 +773,112 @@ function handleMessageUpdate(instanceId: string, event: any): void { const part = event.properties?.part if (!part) return - setSessions((prev) => { - const next = new Map(prev) - const instanceSessions = new Map(prev.get(instanceId)) - const session = instanceSessions.get(part.sessionID) + const session = instanceSessions.get(part.sessionID) + if (!session) return - if (!session) return prev + const index = getSessionIndex(instanceId, part.sessionID) + let messageIndex = index.messageIndex.get(part.messageID) + let replacedTemp = false - const messages = [...session.messages] - const messageIndex = messages.findIndex((m) => m.id === part.messageID) - - if (messageIndex === -1) { - messages.push({ - id: part.messageID, - sessionId: part.sessionID, - type: "assistant", - parts: [part], - timestamp: Date.now(), - status: "streaming", - }) - } else { - const message = messages[messageIndex] - const parts = [...message.parts] - const partIndex = parts.findIndex((p: any) => p.id === part.id) - - if (partIndex === -1) { - parts.push(part) - } else { - parts[partIndex] = part + if (messageIndex === undefined) { + // Search for queued message with status 'sending' and no server id + for (let i = 0; i < session.messages.length; i++) { + const msg = session.messages[i] + if (msg.sessionId === part.sessionID && msg.status === "sending") { + messageIndex = i + replacedTemp = true + break } + } + } - messages[messageIndex] = { ...message, parts } + if (messageIndex === undefined) { + // Create new message + const newMessage = { + id: part.messageID, + sessionId: part.sessionID, + type: "assistant" as const, + parts: [part], + timestamp: Date.now(), + status: "streaming" as const, } - instanceSessions.set(part.sessionID, { ...session, messages }) - next.set(instanceId, instanceSessions) - return next + let insertIndex = session.messages.length + for (let i = session.messages.length - 1; i >= 0; i--) { + if (session.messages[i].id < newMessage.id) { + insertIndex = i + 1 + break + } + } + + session.messages.splice(insertIndex, 0, newMessage) + rebuildSessionIndex(instanceId, part.sessionID, session.messages) + } else { + // Update existing message + const message = session.messages[messageIndex] + + // Strip synthetic parts when real data arrives + let filteredSynthetics = false + if (message.parts.some((partItem: any) => partItem.synthetic === true)) { + message.parts = message.parts.filter((partItem: any) => partItem.synthetic !== true) + filteredSynthetics = true + } + + let baseParts: any[] + if (replacedTemp) { + baseParts = message.parts.filter((partItem: any) => partItem.type !== "text") + message.parts = baseParts + } else { + baseParts = message.parts + } + + // Update part in place + let partMap = index.partIndex.get(message.id) + if (!partMap) { + partMap = new Map() + index.partIndex.set(message.id, partMap) + } + + const partIndex = partMap.get(part.id) + if (partIndex === undefined) { + baseParts.push(part) + if (part.id && typeof part.id === "string") { + partMap.set(part.id, baseParts.length - 1) + } + } else { + baseParts[partIndex] = part + } + + const oldId = message.id + message.id = replacedTemp ? part.messageID : message.id + message.status = message.status === "sending" ? "streaming" : message.status + message.parts = baseParts + + // Update message index if ID changed + if (oldId !== message.id) { + index.messageIndex.delete(oldId) + index.messageIndex.set(message.id, messageIndex) + const existingPartMap = index.partIndex.get(oldId) + if (existingPartMap) { + index.partIndex.delete(oldId) + index.partIndex.set(message.id, existingPartMap) + } + } + + // Refresh part indexes after filtering synthetic parts or replacing optimistic content + if (filteredSynthetics || replacedTemp) { + const partMap = new Map() + message.parts.forEach((partItem, idx) => { + if (partItem.id && typeof partItem.id === "string") { + partMap.set(partItem.id, idx) + } + }) + index.partIndex.set(message.id, partMap) + } + } + + withSession(instanceId, part.sessionID, (session) => { + // Session already mutated in place }) updateSessionInfo(instanceId, part.sessionID) @@ -684,53 +886,87 @@ function handleMessageUpdate(instanceId: string, event: any): void { const info = event.properties?.info if (!info) return - setSessions((prev) => { - const next = new Map(prev) - const instanceSessions = new Map(prev.get(instanceId)) - const session = instanceSessions.get(info.sessionID) + const session = instanceSessions.get(info.sessionID) + if (!session) return - if (!session) return prev + const index = getSessionIndex(instanceId, info.sessionID) + let messageIndex = index.messageIndex.get(info.id) - const messages = [...session.messages] - const messageIndex = messages.findIndex((m) => m.id === info.id) - const tempMessageIndex = messages.findIndex( - (m) => - m.id.startsWith("temp-") && - m.type === (info.role === "user" ? "user" : "assistant") && - m.status === "sending", - ) - - if (messageIndex > -1) { - messages[messageIndex] = { - ...messages[messageIndex], - status: "complete", + if (messageIndex === undefined) { + // Look for queued message to replace + let tempMessageIndex = -1 + for (let i = 0; i < session.messages.length; i++) { + const msg = session.messages[i] + if ( + msg.sessionId === info.sessionID && + msg.type === (info.role === "user" ? "user" : "assistant") && + msg.status === "sending" + ) { + tempMessageIndex = i + break } - } else if (tempMessageIndex > -1) { - messages[tempMessageIndex] = { - id: info.id, - sessionId: info.sessionID, - type: info.role === "user" ? "user" : "assistant", - parts: [], - timestamp: info.time?.created || Date.now(), - status: "complete", - } - } else { - messages.push({ - id: info.id, - sessionId: info.sessionID, - type: info.role === "user" ? "user" : "assistant", - parts: [], - timestamp: info.time?.created || Date.now(), - status: "complete", - }) } - const messagesInfo = new Map(session.messagesInfo) - messagesInfo.set(info.id, info) + if (tempMessageIndex === -1) { + for (let i = 0; i < session.messages.length; i++) { + const msg = session.messages[i] + if (msg.sessionId === info.sessionID && msg.status === "sending") { + tempMessageIndex = i + break + } + } + } - instanceSessions.set(info.sessionID, { ...session, messages, messagesInfo }) - next.set(instanceId, instanceSessions) - return next + if (tempMessageIndex > -1) { + // Replace queued message + const message = session.messages[tempMessageIndex] + const oldId = message.id + message.id = info.id + message.type = (info.role === "user" ? "user" : "assistant") as "user" | "assistant" + message.timestamp = info.time?.created || Date.now() + message.status = "complete" as const + + if (oldId !== message.id) { + index.messageIndex.delete(oldId) + index.messageIndex.set(message.id, tempMessageIndex) + const existingPartMap = index.partIndex.get(oldId) + if (existingPartMap) { + index.partIndex.delete(oldId) + index.partIndex.set(message.id, existingPartMap) + } + } + } else { + // Append new message + const newMessage = { + id: info.id, + sessionId: info.sessionID, + type: (info.role === "user" ? "user" : "assistant") as "user" | "assistant", + parts: [], + timestamp: info.time?.created || Date.now(), + status: "complete" as const, + } + + let insertIndex = session.messages.length + for (let i = session.messages.length - 1; i >= 0; i--) { + if (session.messages[i].id < newMessage.id) { + insertIndex = i + 1 + break + } + } + + session.messages.splice(insertIndex, 0, newMessage) + rebuildSessionIndex(instanceId, info.sessionID, session.messages) + } + } else { + // Update existing message status + const message = session.messages[messageIndex] + message.status = "complete" as const + } + + session.messagesInfo.set(info.id, info) + + withSession(instanceId, info.sessionID, (session) => { + // Session already mutated in place }) updateSessionInfo(instanceId, info.sessionID) @@ -826,38 +1062,36 @@ async function sendMessage( throw new Error("Session not found") } - const tempMessageId = `temp-${Date.now()}-${Math.random().toString(36).substring(7)}` + const messageId = createId("msg") + const textPartId = createId("part") - const textParts: any[] = [] - textParts.push({ - type: "text" as const, - text: prompt, - id: `${tempMessageId}-text`, - }) + const optimisticParts: any[] = [ + { + id: textPartId, + type: "text" as const, + text: prompt, + synthetic: true, + }, + ] const optimisticMessage: Message = { - id: tempMessageId, + id: messageId, sessionId, type: "user", - parts: textParts, + parts: optimisticParts, timestamp: Date.now(), status: "sending", } - setSessions((prev) => { - const next = new Map(prev) - const instanceSessions = new Map(prev.get(instanceId)) - const session = instanceSessions.get(sessionId) - if (session) { - const messages = [...session.messages, optimisticMessage] - instanceSessions.set(sessionId, { ...session, messages }) - next.set(instanceId, instanceSessions) - } - return next + withSession(instanceId, sessionId, (session) => { + session.messages.push(optimisticMessage) + const index = getSessionIndex(instanceId, sessionId) + index.messageIndex.set(optimisticMessage.id, session.messages.length - 1) }) - const parts: any[] = [ + const requestParts: any[] = [ { + id: textPartId, type: "text" as const, text: prompt, }, @@ -867,23 +1101,42 @@ async function sendMessage( for (const att of attachments) { const source = att.source if (source.type === "file") { - parts.push({ + const partId = createId("part") + requestParts.push({ + id: partId, type: "file" as const, url: att.url, mime: source.mime, filename: att.filename, }) + optimisticParts.push({ + id: partId, + type: "file" as const, + url: att.url, + mime: source.mime, + filename: att.filename, + synthetic: true, + }) } else if (source.type === "text") { - parts.push({ + const partId = createId("part") + requestParts.push({ + id: partId, type: "text" as const, text: source.value, }) + optimisticParts.push({ + id: partId, + type: "text" as const, + text: source.value, + synthetic: true, + }) } } } const requestBody = { - parts, + messageID: messageId, + parts: requestParts, ...(session.agent && { agent: session.agent }), ...(session.model.providerId && session.model.modelId && { @@ -1057,4 +1310,5 @@ export { updateSessionAgent, updateSessionModel, getDefaultModel, + removeSessionIndexes, }