From 45dca7a7f0c3a93afbba9e3fcd6aa269dc3fe7c6 Mon Sep 17 00:00:00 2001 From: Shantur Rathore Date: Wed, 19 Nov 2025 17:48:07 +0000 Subject: [PATCH] cache per-instance history via SSE --- packages/cli/src/api-types.ts | 2 + packages/cli/src/events/bus.ts | 2 + packages/cli/src/server/http-server.ts | 3 +- packages/cli/src/server/routes/storage.ts | 9 +++ packages/ui/src/lib/storage.ts | 88 ++++++++++++++++++++++- packages/ui/src/stores/message-history.ts | 79 +++++++++++++------- 6 files changed, 155 insertions(+), 28 deletions(-) diff --git a/packages/cli/src/api-types.ts b/packages/cli/src/api-types.ts index 29eb0962..f100bca1 100644 --- a/packages/cli/src/api-types.ts +++ b/packages/cli/src/api-types.ts @@ -154,6 +154,7 @@ export type WorkspaceEventType = | "workspace.log" | "config.appChanged" | "config.binariesChanged" + | "instance.dataChanged" export type WorkspaceEventPayload = | { type: "workspace.created"; workspace: WorkspaceDescriptor } @@ -163,6 +164,7 @@ export type WorkspaceEventPayload = | { type: "workspace.log"; entry: WorkspaceLogEntry } | { type: "config.appChanged"; config: AppConfig } | { type: "config.binariesChanged"; binaries: BinaryRecord[] } + | { type: "instance.dataChanged"; instanceId: string; data: InstanceData } export interface ServerMeta { /** Base URL clients should target for REST calls (useful for Electron embedding). */ diff --git a/packages/cli/src/events/bus.ts b/packages/cli/src/events/bus.ts index 983d929c..d4c523fc 100644 --- a/packages/cli/src/events/bus.ts +++ b/packages/cli/src/events/bus.ts @@ -21,6 +21,7 @@ export class EventBus extends EventEmitter { this.on("workspace.log", handler) this.on("config.appChanged", handler) this.on("config.binariesChanged", handler) + this.on("instance.dataChanged", handler) return () => { this.off("workspace.created", handler) this.off("workspace.started", handler) @@ -29,6 +30,7 @@ export class EventBus extends EventEmitter { this.off("workspace.log", handler) this.off("config.appChanged", handler) this.off("config.binariesChanged", handler) + this.off("instance.dataChanged", handler) } } } diff --git a/packages/cli/src/server/http-server.ts b/packages/cli/src/server/http-server.ts index f8768707..8a1b7a5c 100644 --- a/packages/cli/src/server/http-server.ts +++ b/packages/cli/src/server/http-server.ts @@ -67,9 +67,10 @@ export function createHttpServer(deps: HttpServerDeps) { registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser }) registerMetaRoutes(app, { serverMeta: deps.serverMeta }) registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient }) - registerStorageRoutes(app, { instanceStore: deps.instanceStore }) + registerStorageRoutes(app, { instanceStore: deps.instanceStore, eventBus: deps.eventBus }) registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger }) + if (deps.uiDevServerUrl) { setupDevProxy(app, deps.uiDevServerUrl) } else { diff --git a/packages/cli/src/server/routes/storage.ts b/packages/cli/src/server/routes/storage.ts index 285b2aed..e4211a34 100644 --- a/packages/cli/src/server/routes/storage.ts +++ b/packages/cli/src/server/routes/storage.ts @@ -1,15 +1,22 @@ import { FastifyInstance } from "fastify" import { z } from "zod" import { InstanceStore } from "../../storage/instance-store" +import { EventBus } from "../../events/bus" +import type { InstanceData } from "../../api-types" interface RouteDeps { instanceStore: InstanceStore + eventBus: EventBus } const InstanceDataSchema = z.object({ messageHistory: z.array(z.string()).default([]), }) +const EMPTY_INSTANCE_DATA: InstanceData = { + messageHistory: [], +} + export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) { app.get<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => { try { @@ -25,6 +32,7 @@ export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) { try { const body = InstanceDataSchema.parse(request.body ?? {}) await deps.instanceStore.write(request.params.id, body) + deps.eventBus.publish({ type: "instance.dataChanged", instanceId: request.params.id, data: body }) reply.code(204) } catch (error) { reply.code(400) @@ -35,6 +43,7 @@ export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) { app.delete<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => { try { await deps.instanceStore.delete(request.params.id) + deps.eventBus.publish({ type: "instance.dataChanged", instanceId: request.params.id, data: EMPTY_INSTANCE_DATA }) reply.code(204) } catch (error) { reply.code(500) diff --git a/packages/ui/src/lib/storage.ts b/packages/ui/src/lib/storage.ts index 72d4603b..434c7a6c 100644 --- a/packages/ui/src/lib/storage.ts +++ b/packages/ui/src/lib/storage.ts @@ -4,12 +4,17 @@ import { cliEvents } from "./cli-events" export type ConfigData = AppConfig +const DEFAULT_INSTANCE_DATA: InstanceData = { + messageHistory: [], +} + function isDeepEqual(a: unknown, b: unknown): boolean { if (a === b) { return true } if (typeof a === "object" && a !== null && typeof b === "object" && b !== null) { + try { return JSON.stringify(a) === JSON.stringify(b) } catch (error) { @@ -24,12 +29,20 @@ export class ServerStorage { private configChangeListeners: Set<(config: ConfigData) => void> = new Set() private configCache: ConfigData | null = null private loadPromise: Promise | null = null + private instanceDataCache = new Map() + private instanceDataListeners = new Map void>>() + private instanceLoadPromises = new Map>() constructor() { cliEvents.on("config.appChanged", (event) => { if (event.type !== "config.appChanged") return this.setConfigCache(event.config) }) + + cliEvents.on("instance.dataChanged", (event) => { + if (event.type !== "instance.dataChanged") return + this.setInstanceDataCache(event.instanceId, event.data) + }) } async loadConfig(): Promise { @@ -59,15 +72,38 @@ export class ServerStorage { } async loadInstanceData(instanceId: string): Promise { - return cliApi.readInstanceData(instanceId) + const cached = this.instanceDataCache.get(instanceId) + if (cached) { + return cached + } + + if (!this.instanceLoadPromises.has(instanceId)) { + const promise = cliApi + .readInstanceData(instanceId) + .then((data) => { + const normalized = this.normalizeInstanceData(data) + this.setInstanceDataCache(instanceId, normalized) + return normalized + }) + .finally(() => { + this.instanceLoadPromises.delete(instanceId) + }) + + this.instanceLoadPromises.set(instanceId, promise) + } + + return this.instanceLoadPromises.get(instanceId)! } async saveInstanceData(instanceId: string, data: InstanceData): Promise { - await cliApi.writeInstanceData(instanceId, data) + const normalized = this.normalizeInstanceData(data) + await cliApi.writeInstanceData(instanceId, normalized) + this.setInstanceDataCache(instanceId, normalized) } async deleteInstanceData(instanceId: string): Promise { await cliApi.deleteInstanceData(instanceId) + this.setInstanceDataCache(instanceId, DEFAULT_INSTANCE_DATA) } onConfigChanged(listener: (config: ConfigData) => void): () => void { @@ -78,6 +114,24 @@ export class ServerStorage { return () => this.configChangeListeners.delete(listener) } + onInstanceDataChanged(instanceId: string, listener: (data: InstanceData) => void): () => void { + if (!this.instanceDataListeners.has(instanceId)) { + this.instanceDataListeners.set(instanceId, new Set()) + } + const bucket = this.instanceDataListeners.get(instanceId)! + bucket.add(listener) + const cached = this.instanceDataCache.get(instanceId) + if (cached) { + listener(cached) + } + return () => { + bucket.delete(listener) + if (bucket.size === 0) { + this.instanceDataListeners.delete(instanceId) + } + } + } + private setConfigCache(config: ConfigData) { if (this.configCache && isDeepEqual(this.configCache, config)) { this.configCache = config @@ -92,6 +146,36 @@ export class ServerStorage { listener(config) } } + + private normalizeInstanceData(data?: InstanceData | null): InstanceData { + const source = data ?? DEFAULT_INSTANCE_DATA + const messageHistory = Array.isArray(source.messageHistory) ? [...source.messageHistory] : [] + return { + ...source, + messageHistory, + } + } + + private setInstanceDataCache(instanceId: string, data: InstanceData) { + const normalized = this.normalizeInstanceData(data) + const previous = this.instanceDataCache.get(instanceId) + if (previous && isDeepEqual(previous, normalized)) { + this.instanceDataCache.set(instanceId, normalized) + return + } + this.instanceDataCache.set(instanceId, normalized) + this.notifyInstanceDataChanged(instanceId, normalized) + } + + private notifyInstanceDataChanged(instanceId: string, data: InstanceData) { + const listeners = this.instanceDataListeners.get(instanceId) + if (!listeners) { + return + } + for (const listener of listeners) { + listener(data) + } + } } export const storage = new ServerStorage() diff --git a/packages/ui/src/stores/message-history.ts b/packages/ui/src/stores/message-history.ts index 83423ddb..4c15357c 100644 --- a/packages/ui/src/stores/message-history.ts +++ b/packages/ui/src/stores/message-history.ts @@ -1,59 +1,88 @@ +import type { InstanceData } from "../../../cli/src/api-types" import { storage } from "../lib/storage" const MAX_HISTORY = 100 -const instanceHistories = new Map() -const historyLoaded = new Set() +const instanceDataCache = new Map() +const instanceSubscriptions = new Map void>() export async function addToHistory(instanceId: string, text: string): Promise { - await ensureHistoryLoaded(instanceId) - - const history = instanceHistories.get(instanceId) || [] - - history.unshift(text) - - if (history.length > MAX_HISTORY) { - history.length = MAX_HISTORY + const data = await ensureInstanceData(instanceId) + const nextHistory = [text, ...data.messageHistory] + if (nextHistory.length > MAX_HISTORY) { + nextHistory.length = MAX_HISTORY } - instanceHistories.set(instanceId, history) + const nextData: InstanceData = { + ...data, + messageHistory: nextHistory, + } + + instanceDataCache.set(instanceId, cloneInstanceData(nextData)) try { - await storage.saveInstanceData(instanceId, { messageHistory: history }) + await storage.saveInstanceData(instanceId, nextData) } catch (err) { console.warn("Failed to persist message history:", err) } } export async function getHistory(instanceId: string): Promise { - await ensureHistoryLoaded(instanceId) - return instanceHistories.get(instanceId) || [] + const data = await ensureInstanceData(instanceId) + return [...data.messageHistory] } export async function clearHistory(instanceId: string): Promise { - instanceHistories.delete(instanceId) - historyLoaded.delete(instanceId) + const data = await ensureInstanceData(instanceId) + const nextData: InstanceData = { + ...data, + messageHistory: [], + } + + instanceDataCache.set(instanceId, cloneInstanceData(nextData)) try { - await storage.saveInstanceData(instanceId, { messageHistory: [] }) + await storage.saveInstanceData(instanceId, nextData) } catch (error) { console.warn("Failed to clear history:", error) } } -async function ensureHistoryLoaded(instanceId: string): Promise { - if (historyLoaded.has(instanceId)) { - return +async function ensureInstanceData(instanceId: string): Promise { + const cached = instanceDataCache.get(instanceId) + if (cached) { + return cached } try { const data = await storage.loadInstanceData(instanceId) - const history = Array.isArray(data.messageHistory) ? data.messageHistory : [] - instanceHistories.set(instanceId, history) - historyLoaded.add(instanceId) + const normalized = cloneInstanceData(data) + instanceDataCache.set(instanceId, normalized) + attachInstanceSubscription(instanceId) + return normalized } catch (error) { console.warn("Failed to load history:", error) - instanceHistories.set(instanceId, []) - historyLoaded.add(instanceId) + const fallback = cloneInstanceData({ messageHistory: [] }) + instanceDataCache.set(instanceId, fallback) + attachInstanceSubscription(instanceId) + return fallback + } +} + +function attachInstanceSubscription(instanceId: string) { + if (instanceSubscriptions.has(instanceId)) { + return + } + const unsubscribe = storage.onInstanceDataChanged(instanceId, (data) => { + instanceDataCache.set(instanceId, cloneInstanceData(data)) + }) + instanceSubscriptions.set(instanceId, unsubscribe) +} + +function cloneInstanceData(data?: InstanceData | null): InstanceData { + const source: InstanceData = data ?? { messageHistory: [] } + return { + ...source, + messageHistory: Array.isArray(source.messageHistory) ? [...source.messageHistory] : [], } }