chore: add message store v2 baseline

This commit is contained in:
Shantur Rathore
2025-11-26 09:42:10 +00:00
parent 9313b2bd6c
commit 16b76385e2
13 changed files with 1665 additions and 57 deletions

View File

@@ -1,7 +1,9 @@
import type { Session, SessionStatus } from "../types/session"
import type { Message, MessageInfo } from "../types/message"
import type { MessageRecord } from "./message-v2/types"
import { sessions } from "./sessions"
import { isSessionCompactionActive } from "./session-compaction"
import { messageStoreBus } from "./message-v2/bus"
function getSession(instanceId: string, sessionId: string): Session | null {
const instanceSessions = sessions().get(instanceId)
@@ -17,21 +19,49 @@ function isSessionCompacting(session: Session): boolean {
return Boolean(compactingFlag)
}
function getMessageTimestamp(session: Session, message?: Message): number {
if (!message) return Number.NEGATIVE_INFINITY
if (typeof message.timestamp === "number" && Number.isFinite(message.timestamp)) {
return message.timestamp
function getLatestInfoFromStore(instanceId: string, sessionId: string, role?: MessageInfo["role"]): MessageInfo | undefined {
const store = messageStoreBus.getOrCreate(instanceId)
const messageIds = store.getSessionMessageIds(sessionId)
let latest: MessageInfo | undefined
let latestTimestamp = Number.NEGATIVE_INFINITY
for (const id of messageIds) {
const info = store.getMessageInfo(id)
if (!info) continue
if (role && info.role !== role) continue
const timestamp = info.time?.created ?? 0
if (timestamp >= latestTimestamp) {
latest = info
latestTimestamp = timestamp
}
}
const info = session.messagesInfo.get(message.id)
return info?.time?.created ?? Number.NEGATIVE_INFINITY
return latest
}
function getLastMessage(session: Session): Message | undefined {
function getLastMessageFromStore(instanceId: string, sessionId: string): MessageRecord | undefined {
const store = messageStoreBus.getOrCreate(instanceId)
const messageIds = store.getSessionMessageIds(sessionId)
let latest: MessageRecord | undefined
let latestTimestamp = Number.NEGATIVE_INFINITY
for (const id of messageIds) {
const record = store.getMessage(id)
if (!record) continue
const info = store.getMessageInfo(id)
const timestamp = info?.time?.created ?? record.createdAt ?? Number.NEGATIVE_INFINITY
if (timestamp >= latestTimestamp) {
latest = record
latestTimestamp = timestamp
}
}
return latest
}
function getLegacyLastMessage(session: Session): Message | undefined {
let latest: Message | undefined
let latestTimestamp = Number.NEGATIVE_INFINITY
for (const message of session.messages) {
if (!message) continue
const timestamp = getMessageTimestamp(session, message)
const info = session.messagesInfo.get(message.id)
const timestamp = info?.time?.created ?? message.timestamp ?? Number.NEGATIVE_INFINITY
if (timestamp >= latestTimestamp) {
latest = message
latestTimestamp = timestamp
@@ -40,7 +70,7 @@ function getLastMessage(session: Session): Message | undefined {
return latest
}
function getLastMessageInfo(session: Session, role?: MessageInfo["role"]): MessageInfo | undefined {
function getLegacyLastMessageInfo(session: Session, role?: MessageInfo["role"]): MessageInfo | undefined {
if (session.messagesInfo.size === 0) {
return undefined
}
@@ -92,7 +122,28 @@ function isAssistantInfoPending(info?: MessageInfo): boolean {
return completed < created
}
function isAssistantStillGenerating(message: Message, info?: MessageInfo): boolean {
function isAssistantStillGeneratingRecord(record: MessageRecord, info?: MessageInfo): boolean {
if (record.role !== "assistant") {
return false
}
if (record.status === "error") {
return false
}
if (record.status === "streaming" || record.status === "sending") {
return true
}
const completedAt = (info?.time as { completed?: number } | undefined)?.completed
if (completedAt !== undefined && completedAt !== null) {
return false
}
return !(record.status === "complete" || record.status === "sent")
}
function isAssistantStillGeneratingLegacy(message: Message, info?: MessageInfo): boolean {
if (message.type !== "assistant") {
return false
}
@@ -119,15 +170,20 @@ export function getSessionStatus(instanceId: string, sessionId: string): Session
return "idle"
}
const store = messageStoreBus.getOrCreate(instanceId)
if (isSessionCompactionActive(instanceId, sessionId) || isSessionCompacting(session)) {
return "compacting"
}
const latestUserInfo = getLastMessageInfo(session, "user")
const latestAssistantInfo = getLastMessageInfo(session, "assistant")
const lastMessage = getLastMessage(session)
if (!lastMessage) {
const latestInfo = getLastMessageInfo(session)
const latestUserInfo = getLatestInfoFromStore(instanceId, sessionId, "user") ?? getLegacyLastMessageInfo(session, "user")
const latestAssistantInfo = getLatestInfoFromStore(instanceId, sessionId, "assistant") ?? getLegacyLastMessageInfo(session, "assistant")
const lastRecord = getLastMessageFromStore(instanceId, sessionId)
const legacyFallbackMessage = lastRecord ? undefined : getLegacyLastMessage(session)
if (!lastRecord && !legacyFallbackMessage) {
const latestInfo = latestUserInfo ?? latestAssistantInfo ?? getLegacyLastMessageInfo(session)
if (!latestInfo) {
return "idle"
}
@@ -138,13 +194,22 @@ export function getSessionStatus(instanceId: string, sessionId: string): Session
return infoCompleted ? "idle" : "working"
}
if (lastMessage.type === "user") {
return "working"
}
const infoForMessage = session.messagesInfo.get(lastMessage.id) ?? latestAssistantInfo
if (isAssistantStillGenerating(lastMessage, infoForMessage)) {
return "working"
if (lastRecord) {
if (lastRecord.role === "user") {
return "working"
}
const infoForRecord = store.getMessageInfo(lastRecord.id) ?? latestAssistantInfo
if (infoForRecord && isAssistantStillGeneratingRecord(lastRecord, infoForRecord)) {
return "working"
}
} else if (legacyFallbackMessage) {
if (legacyFallbackMessage.type === "user") {
return "working"
}
const infoForLegacy = session.messagesInfo.get(legacyFallbackMessage.id) ?? latestAssistantInfo
if (isAssistantStillGeneratingLegacy(legacyFallbackMessage, infoForLegacy)) {
return "working"
}
}
if (isAssistantInfoPending(latestAssistantInfo)) {