fix(ui): handle message.part.delta streaming
Wire message.part.delta SSE events into the v2 message store and append deltas onto existing part fields.
This commit is contained in:
@@ -4,6 +4,7 @@ import {
|
||||
MessageRemovedEvent,
|
||||
MessagePartUpdatedEvent,
|
||||
MessagePartRemovedEvent,
|
||||
MessagePartDeltaEvent,
|
||||
} from "../types/message"
|
||||
import type {
|
||||
EventLspUpdated,
|
||||
@@ -58,6 +59,7 @@ type SSEEvent =
|
||||
| MessageRemovedEvent
|
||||
| MessagePartUpdatedEvent
|
||||
| MessagePartRemovedEvent
|
||||
| MessagePartDeltaEvent
|
||||
| EventSessionUpdated
|
||||
| EventSessionCompacted
|
||||
| EventSessionDiff
|
||||
@@ -118,6 +120,9 @@ class SSEManager {
|
||||
case "message.part.updated":
|
||||
this.onMessagePartUpdated?.(instanceId, event as MessagePartUpdatedEvent)
|
||||
break
|
||||
case "message.part.delta":
|
||||
this.onMessagePartDelta?.(instanceId, event as MessagePartDeltaEvent)
|
||||
break
|
||||
case "message.removed":
|
||||
this.onMessageRemoved?.(instanceId, event as MessageRemovedEvent)
|
||||
break
|
||||
@@ -184,6 +189,7 @@ class SSEManager {
|
||||
onMessageUpdate?: (instanceId: string, event: MessageUpdateEvent) => void
|
||||
onMessageRemoved?: (instanceId: string, event: MessageRemovedEvent) => void
|
||||
onMessagePartUpdated?: (instanceId: string, event: MessagePartUpdatedEvent) => void
|
||||
onMessagePartDelta?: (instanceId: string, event: MessagePartDeltaEvent) => void
|
||||
onMessagePartRemoved?: (instanceId: string, event: MessagePartRemovedEvent) => void
|
||||
onSessionUpdate?: (instanceId: string, event: EventSessionUpdated) => void
|
||||
onSessionCompacted?: (instanceId: string, event: EventSessionCompacted) => void
|
||||
|
||||
@@ -104,6 +104,22 @@ export function applyPartUpdateV2(instanceId: string, part: ClientPart | null |
|
||||
})
|
||||
}
|
||||
|
||||
export function applyPartDeltaV2(
|
||||
instanceId: string,
|
||||
input: { messageId: string; partId: string; field: string; delta: string },
|
||||
): void {
|
||||
if (!input?.messageId || !input.partId || !input.field || typeof input.delta !== "string") {
|
||||
return
|
||||
}
|
||||
const store = messageStoreBus.getOrCreate(instanceId)
|
||||
store.applyPartDelta({
|
||||
messageId: input.messageId,
|
||||
partId: input.partId,
|
||||
field: input.field,
|
||||
delta: input.delta,
|
||||
})
|
||||
}
|
||||
|
||||
export function replaceMessageIdV2(instanceId: string, oldId: string, newId: string): void {
|
||||
if (!oldId || !newId || oldId === newId) return
|
||||
const store = messageStoreBus.getOrCreate(instanceId)
|
||||
|
||||
@@ -189,6 +189,7 @@ export interface InstanceMessageStore {
|
||||
hydrateMessages: (sessionId: string, inputs: MessageUpsertInput[], infos?: Iterable<MessageInfo>) => void
|
||||
upsertMessage: (input: MessageUpsertInput) => void
|
||||
applyPartUpdate: (input: PartUpdateInput) => void
|
||||
applyPartDelta: (input: { messageId: string; partId: string; field: string; delta: string; bumpRevision?: boolean }) => void
|
||||
removeMessage: (messageId: string) => void
|
||||
removeMessagePart: (messageId: string, partId: string) => void
|
||||
bufferPendingPart: (entry: PendingPartEntry) => void
|
||||
@@ -597,6 +598,45 @@ export function createInstanceMessageStore(instanceId: string, hooks?: MessageSt
|
||||
bumpSessionRevision(message.sessionId)
|
||||
}
|
||||
|
||||
function applyPartDelta(input: { messageId: string; partId: string; field: string; delta: string; bumpRevision?: boolean }) {
|
||||
if (!input?.messageId || !input.partId || !input.field || typeof input.delta !== "string") {
|
||||
return
|
||||
}
|
||||
|
||||
const message = state.messages[input.messageId]
|
||||
if (!message) {
|
||||
// Best-effort: drop deltas for unknown messages.
|
||||
return
|
||||
}
|
||||
|
||||
let applied = false
|
||||
|
||||
setState(
|
||||
"messages",
|
||||
input.messageId,
|
||||
produce((draft: MessageRecord) => {
|
||||
const entry = draft.parts[input.partId]
|
||||
if (!entry?.data) return
|
||||
const part = entry.data as any
|
||||
const currentValue = part?.[input.field]
|
||||
if (typeof currentValue === "string" || currentValue === undefined || currentValue === null) {
|
||||
part[input.field] = `${currentValue ?? ""}${input.delta}`
|
||||
applied = true
|
||||
}
|
||||
if (!applied) return
|
||||
entry.revision += 1
|
||||
draft.updatedAt = Date.now()
|
||||
if (input.bumpRevision ?? true) {
|
||||
draft.revision += 1
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
if (applied) {
|
||||
bumpSessionRevision(message.sessionId)
|
||||
}
|
||||
}
|
||||
|
||||
function removeMessage(messageId: string) {
|
||||
if (!messageId) return
|
||||
|
||||
@@ -1087,19 +1127,20 @@ export function createInstanceMessageStore(instanceId: string, hooks?: MessageSt
|
||||
setState(reconcile(createInitialState(instanceId)))
|
||||
}
|
||||
|
||||
return {
|
||||
return {
|
||||
|
||||
instanceId,
|
||||
state,
|
||||
setState,
|
||||
addOrUpdateSession,
|
||||
hydrateMessages,
|
||||
upsertMessage,
|
||||
hydrateMessages,
|
||||
upsertMessage,
|
||||
applyPartUpdate,
|
||||
applyPartDelta,
|
||||
removeMessage,
|
||||
removeMessagePart,
|
||||
bufferPendingPart,
|
||||
flushPendingParts,
|
||||
flushPendingParts,
|
||||
replaceMessageId,
|
||||
setMessageInfo,
|
||||
getMessageInfo,
|
||||
@@ -1125,4 +1166,3 @@ export function createInstanceMessageStore(instanceId: string, hooks?: MessageSt
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type {
|
||||
MessageInfo,
|
||||
MessagePartRemovedEvent,
|
||||
MessagePartDeltaEvent,
|
||||
MessagePartUpdatedEvent,
|
||||
MessageRemovedEvent,
|
||||
MessageUpdateEvent,
|
||||
@@ -48,6 +49,7 @@ import { loadMessages } from "./session-api"
|
||||
import { getOrCreateWorktreeClient, getRootClient, getWorktreeSlugForDirectory, getWorktreeSlugForSession } from "./worktrees"
|
||||
import {
|
||||
applyPartUpdateV2,
|
||||
applyPartDeltaV2,
|
||||
replaceMessageIdV2,
|
||||
reconcilePendingQuestionsV2,
|
||||
upsertMessageInfoV2,
|
||||
@@ -348,6 +350,14 @@ function handleMessageUpdate(instanceId: string, event: MessageUpdateEvent | Mes
|
||||
}
|
||||
}
|
||||
|
||||
function handleMessagePartDelta(instanceId: string, event: MessagePartDeltaEvent): void {
|
||||
const props = event.properties
|
||||
if (!props) return
|
||||
const { messageID, partID, field, delta } = props
|
||||
if (!messageID || !partID || !field || typeof delta !== "string") return
|
||||
applyPartDeltaV2(instanceId, { messageId: messageID, partId: partID, field, delta })
|
||||
}
|
||||
|
||||
function handleSessionUpdate(instanceId: string, event: EventSessionUpdated): void {
|
||||
const info = event.properties?.info
|
||||
|
||||
@@ -625,6 +635,7 @@ function handleQuestionAnswered(
|
||||
export {
|
||||
handleMessagePartRemoved,
|
||||
handleMessageRemoved,
|
||||
handleMessagePartDelta,
|
||||
handleMessageUpdate,
|
||||
handlePermissionReplied,
|
||||
handlePermissionUpdated,
|
||||
|
||||
@@ -58,6 +58,7 @@ import {
|
||||
import {
|
||||
handleMessagePartRemoved,
|
||||
handleMessageRemoved,
|
||||
handleMessagePartDelta,
|
||||
handleMessageUpdate,
|
||||
handlePermissionReplied,
|
||||
handlePermissionUpdated,
|
||||
@@ -74,6 +75,7 @@ import {
|
||||
|
||||
sseManager.onMessageUpdate = handleMessageUpdate
|
||||
sseManager.onMessagePartUpdated = handleMessageUpdate
|
||||
sseManager.onMessagePartDelta = handleMessagePartDelta
|
||||
sseManager.onMessageRemoved = handleMessageRemoved
|
||||
sseManager.onMessagePartRemoved = handleMessagePartRemoved
|
||||
sseManager.onSessionUpdate = handleSessionUpdate
|
||||
|
||||
@@ -20,6 +20,19 @@ export type {
|
||||
SDKMessage
|
||||
}
|
||||
|
||||
// Server streaming event: append-only delta updates.
|
||||
// Emitted over SSE by newer OpenCode builds.
|
||||
export interface MessagePartDeltaEvent {
|
||||
type: "message.part.delta"
|
||||
properties: {
|
||||
sessionID: string
|
||||
messageID: string
|
||||
partID: string
|
||||
field: string
|
||||
delta: string
|
||||
}
|
||||
}
|
||||
|
||||
export interface RenderCache {
|
||||
text: string
|
||||
html: string
|
||||
|
||||
Reference in New Issue
Block a user