Batch hydrate normalized messages for session load
This commit is contained in:
@@ -141,6 +141,7 @@ export interface InstanceMessageStore {
|
||||
state: InstanceMessageState
|
||||
setState: SetStoreFunction<InstanceMessageState>
|
||||
addOrUpdateSession: (input: SessionUpsertInput) => void
|
||||
hydrateMessages: (sessionId: string, inputs: MessageUpsertInput[], infos?: Iterable<MessageInfo>) => void
|
||||
upsertMessage: (input: MessageUpsertInput) => void
|
||||
applyPartUpdate: (input: PartUpdateInput) => void
|
||||
bufferPendingPart: (entry: PendingPartEntry) => void
|
||||
@@ -234,6 +235,81 @@ export function createInstanceMessageStore(instanceId: string): InstanceMessageS
|
||||
})
|
||||
}
|
||||
|
||||
function hydrateMessages(sessionId: string, inputs: MessageUpsertInput[], infos?: Iterable<MessageInfo>) {
|
||||
if (!Array.isArray(inputs) || inputs.length === 0) return
|
||||
|
||||
ensureSessionEntry(sessionId)
|
||||
|
||||
const incomingIds = inputs.map((item) => item.id)
|
||||
const incomingIdSet = new Set(incomingIds)
|
||||
const existingIds = state.sessions[sessionId]?.messageIds ?? []
|
||||
const removedIds = existingIds.filter((id) => !incomingIdSet.has(id))
|
||||
|
||||
const normalizedRecords: Record<string, MessageRecord> = {}
|
||||
const now = Date.now()
|
||||
|
||||
inputs.forEach((input) => {
|
||||
const normalizedParts = normalizeParts(input.id, input.parts)
|
||||
const shouldBump = Boolean(input.bumpRevision || normalizedParts)
|
||||
const previous = state.messages[input.id]
|
||||
normalizedRecords[input.id] = {
|
||||
id: input.id,
|
||||
sessionId: input.sessionId,
|
||||
role: input.role,
|
||||
status: input.status,
|
||||
createdAt: input.createdAt ?? previous?.createdAt ?? now,
|
||||
updatedAt: input.updatedAt ?? now,
|
||||
isEphemeral: input.isEphemeral ?? previous?.isEphemeral ?? false,
|
||||
revision: previous ? previous.revision + (shouldBump ? 1 : 0) : 0,
|
||||
partIds: normalizedParts ? normalizedParts.ids : previous?.partIds ?? [],
|
||||
parts: normalizedParts ? normalizedParts.map : previous?.parts ?? {},
|
||||
}
|
||||
})
|
||||
|
||||
const infoList = infos ? Array.from(infos) : undefined
|
||||
const usageState = infoList ? rebuildUsageStateFromInfos(infoList) : state.usage[sessionId]
|
||||
|
||||
setState(
|
||||
produce((draft) => {
|
||||
removedIds.forEach((id) => {
|
||||
if (draft.messages[id]?.sessionId === sessionId) {
|
||||
delete draft.messages[id]
|
||||
delete draft.messageInfoVersion[id]
|
||||
delete draft.pendingParts[id]
|
||||
if (draft.permissions.byMessage[id]) {
|
||||
delete draft.permissions.byMessage[id]
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
Object.entries(normalizedRecords).forEach(([id, record]) => {
|
||||
draft.messages[id] = record as MessageRecord
|
||||
})
|
||||
|
||||
const session = draft.sessions[sessionId]!
|
||||
session.messageIds = incomingIds
|
||||
session.updatedAt = Date.now()
|
||||
|
||||
if (usageState) {
|
||||
draft.usage[sessionId] = usageState
|
||||
}
|
||||
|
||||
if (infoList) {
|
||||
for (const info of infoList) {
|
||||
const messageId = info.id as string
|
||||
messageInfoCache.set(messageId, info)
|
||||
const currentVersion = draft.messageInfoVersion[messageId] ?? 0
|
||||
draft.messageInfoVersion[messageId] = currentVersion + 1
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
|
||||
removedIds.forEach((id) => {
|
||||
messageInfoCache.delete(id)
|
||||
})
|
||||
}
|
||||
|
||||
function insertMessageIntoSession(sessionId: string, messageId: string) {
|
||||
ensureSessionEntry(sessionId)
|
||||
setState("sessions", sessionId, "messageIds", (ids = []) => {
|
||||
@@ -508,6 +584,7 @@ export function createInstanceMessageStore(instanceId: string): InstanceMessageS
|
||||
state,
|
||||
setState,
|
||||
addOrUpdateSession,
|
||||
hydrateMessages,
|
||||
upsertMessage,
|
||||
applyPartUpdate,
|
||||
bufferPendingPart,
|
||||
|
||||
Reference in New Issue
Block a user