cache per-instance history via SSE
This commit is contained in:
@@ -154,6 +154,7 @@ export type WorkspaceEventType =
|
|||||||
| "workspace.log"
|
| "workspace.log"
|
||||||
| "config.appChanged"
|
| "config.appChanged"
|
||||||
| "config.binariesChanged"
|
| "config.binariesChanged"
|
||||||
|
| "instance.dataChanged"
|
||||||
|
|
||||||
export type WorkspaceEventPayload =
|
export type WorkspaceEventPayload =
|
||||||
| { type: "workspace.created"; workspace: WorkspaceDescriptor }
|
| { type: "workspace.created"; workspace: WorkspaceDescriptor }
|
||||||
@@ -163,6 +164,7 @@ export type WorkspaceEventPayload =
|
|||||||
| { type: "workspace.log"; entry: WorkspaceLogEntry }
|
| { type: "workspace.log"; entry: WorkspaceLogEntry }
|
||||||
| { type: "config.appChanged"; config: AppConfig }
|
| { type: "config.appChanged"; config: AppConfig }
|
||||||
| { type: "config.binariesChanged"; binaries: BinaryRecord[] }
|
| { type: "config.binariesChanged"; binaries: BinaryRecord[] }
|
||||||
|
| { type: "instance.dataChanged"; instanceId: string; data: InstanceData }
|
||||||
|
|
||||||
export interface ServerMeta {
|
export interface ServerMeta {
|
||||||
/** Base URL clients should target for REST calls (useful for Electron embedding). */
|
/** Base URL clients should target for REST calls (useful for Electron embedding). */
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ export class EventBus extends EventEmitter {
|
|||||||
this.on("workspace.log", handler)
|
this.on("workspace.log", handler)
|
||||||
this.on("config.appChanged", handler)
|
this.on("config.appChanged", handler)
|
||||||
this.on("config.binariesChanged", handler)
|
this.on("config.binariesChanged", handler)
|
||||||
|
this.on("instance.dataChanged", handler)
|
||||||
return () => {
|
return () => {
|
||||||
this.off("workspace.created", handler)
|
this.off("workspace.created", handler)
|
||||||
this.off("workspace.started", handler)
|
this.off("workspace.started", handler)
|
||||||
@@ -29,6 +30,7 @@ export class EventBus extends EventEmitter {
|
|||||||
this.off("workspace.log", handler)
|
this.off("workspace.log", handler)
|
||||||
this.off("config.appChanged", handler)
|
this.off("config.appChanged", handler)
|
||||||
this.off("config.binariesChanged", handler)
|
this.off("config.binariesChanged", handler)
|
||||||
|
this.off("instance.dataChanged", handler)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -67,9 +67,10 @@ export function createHttpServer(deps: HttpServerDeps) {
|
|||||||
registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser })
|
registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser })
|
||||||
registerMetaRoutes(app, { serverMeta: deps.serverMeta })
|
registerMetaRoutes(app, { serverMeta: deps.serverMeta })
|
||||||
registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient })
|
registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient })
|
||||||
registerStorageRoutes(app, { instanceStore: deps.instanceStore })
|
registerStorageRoutes(app, { instanceStore: deps.instanceStore, eventBus: deps.eventBus })
|
||||||
registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger })
|
registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger })
|
||||||
|
|
||||||
|
|
||||||
if (deps.uiDevServerUrl) {
|
if (deps.uiDevServerUrl) {
|
||||||
setupDevProxy(app, deps.uiDevServerUrl)
|
setupDevProxy(app, deps.uiDevServerUrl)
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,15 +1,22 @@
|
|||||||
import { FastifyInstance } from "fastify"
|
import { FastifyInstance } from "fastify"
|
||||||
import { z } from "zod"
|
import { z } from "zod"
|
||||||
import { InstanceStore } from "../../storage/instance-store"
|
import { InstanceStore } from "../../storage/instance-store"
|
||||||
|
import { EventBus } from "../../events/bus"
|
||||||
|
import type { InstanceData } from "../../api-types"
|
||||||
|
|
||||||
interface RouteDeps {
|
interface RouteDeps {
|
||||||
instanceStore: InstanceStore
|
instanceStore: InstanceStore
|
||||||
|
eventBus: EventBus
|
||||||
}
|
}
|
||||||
|
|
||||||
const InstanceDataSchema = z.object({
|
const InstanceDataSchema = z.object({
|
||||||
messageHistory: z.array(z.string()).default([]),
|
messageHistory: z.array(z.string()).default([]),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const EMPTY_INSTANCE_DATA: InstanceData = {
|
||||||
|
messageHistory: [],
|
||||||
|
}
|
||||||
|
|
||||||
export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) {
|
export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) {
|
||||||
app.get<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => {
|
app.get<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => {
|
||||||
try {
|
try {
|
||||||
@@ -25,6 +32,7 @@ export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) {
|
|||||||
try {
|
try {
|
||||||
const body = InstanceDataSchema.parse(request.body ?? {})
|
const body = InstanceDataSchema.parse(request.body ?? {})
|
||||||
await deps.instanceStore.write(request.params.id, body)
|
await deps.instanceStore.write(request.params.id, body)
|
||||||
|
deps.eventBus.publish({ type: "instance.dataChanged", instanceId: request.params.id, data: body })
|
||||||
reply.code(204)
|
reply.code(204)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
reply.code(400)
|
reply.code(400)
|
||||||
@@ -35,6 +43,7 @@ export function registerStorageRoutes(app: FastifyInstance, deps: RouteDeps) {
|
|||||||
app.delete<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => {
|
app.delete<{ Params: { id: string } }>("/api/storage/instances/:id", async (request, reply) => {
|
||||||
try {
|
try {
|
||||||
await deps.instanceStore.delete(request.params.id)
|
await deps.instanceStore.delete(request.params.id)
|
||||||
|
deps.eventBus.publish({ type: "instance.dataChanged", instanceId: request.params.id, data: EMPTY_INSTANCE_DATA })
|
||||||
reply.code(204)
|
reply.code(204)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
reply.code(500)
|
reply.code(500)
|
||||||
|
|||||||
@@ -4,12 +4,17 @@ import { cliEvents } from "./cli-events"
|
|||||||
|
|
||||||
export type ConfigData = AppConfig
|
export type ConfigData = AppConfig
|
||||||
|
|
||||||
|
const DEFAULT_INSTANCE_DATA: InstanceData = {
|
||||||
|
messageHistory: [],
|
||||||
|
}
|
||||||
|
|
||||||
function isDeepEqual(a: unknown, b: unknown): boolean {
|
function isDeepEqual(a: unknown, b: unknown): boolean {
|
||||||
if (a === b) {
|
if (a === b) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
if (typeof a === "object" && a !== null && typeof b === "object" && b !== null) {
|
if (typeof a === "object" && a !== null && typeof b === "object" && b !== null) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return JSON.stringify(a) === JSON.stringify(b)
|
return JSON.stringify(a) === JSON.stringify(b)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -24,12 +29,20 @@ export class ServerStorage {
|
|||||||
private configChangeListeners: Set<(config: ConfigData) => void> = new Set()
|
private configChangeListeners: Set<(config: ConfigData) => void> = new Set()
|
||||||
private configCache: ConfigData | null = null
|
private configCache: ConfigData | null = null
|
||||||
private loadPromise: Promise<ConfigData> | null = null
|
private loadPromise: Promise<ConfigData> | null = null
|
||||||
|
private instanceDataCache = new Map<string, InstanceData>()
|
||||||
|
private instanceDataListeners = new Map<string, Set<(data: InstanceData) => void>>()
|
||||||
|
private instanceLoadPromises = new Map<string, Promise<InstanceData>>()
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
cliEvents.on("config.appChanged", (event) => {
|
cliEvents.on("config.appChanged", (event) => {
|
||||||
if (event.type !== "config.appChanged") return
|
if (event.type !== "config.appChanged") return
|
||||||
this.setConfigCache(event.config)
|
this.setConfigCache(event.config)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
cliEvents.on("instance.dataChanged", (event) => {
|
||||||
|
if (event.type !== "instance.dataChanged") return
|
||||||
|
this.setInstanceDataCache(event.instanceId, event.data)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async loadConfig(): Promise<ConfigData> {
|
async loadConfig(): Promise<ConfigData> {
|
||||||
@@ -59,15 +72,38 @@ export class ServerStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async loadInstanceData(instanceId: string): Promise<InstanceData> {
|
async loadInstanceData(instanceId: string): Promise<InstanceData> {
|
||||||
return cliApi.readInstanceData(instanceId)
|
const cached = this.instanceDataCache.get(instanceId)
|
||||||
|
if (cached) {
|
||||||
|
return cached
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.instanceLoadPromises.has(instanceId)) {
|
||||||
|
const promise = cliApi
|
||||||
|
.readInstanceData(instanceId)
|
||||||
|
.then((data) => {
|
||||||
|
const normalized = this.normalizeInstanceData(data)
|
||||||
|
this.setInstanceDataCache(instanceId, normalized)
|
||||||
|
return normalized
|
||||||
|
})
|
||||||
|
.finally(() => {
|
||||||
|
this.instanceLoadPromises.delete(instanceId)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.instanceLoadPromises.set(instanceId, promise)
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.instanceLoadPromises.get(instanceId)!
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveInstanceData(instanceId: string, data: InstanceData): Promise<void> {
|
async saveInstanceData(instanceId: string, data: InstanceData): Promise<void> {
|
||||||
await cliApi.writeInstanceData(instanceId, data)
|
const normalized = this.normalizeInstanceData(data)
|
||||||
|
await cliApi.writeInstanceData(instanceId, normalized)
|
||||||
|
this.setInstanceDataCache(instanceId, normalized)
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteInstanceData(instanceId: string): Promise<void> {
|
async deleteInstanceData(instanceId: string): Promise<void> {
|
||||||
await cliApi.deleteInstanceData(instanceId)
|
await cliApi.deleteInstanceData(instanceId)
|
||||||
|
this.setInstanceDataCache(instanceId, DEFAULT_INSTANCE_DATA)
|
||||||
}
|
}
|
||||||
|
|
||||||
onConfigChanged(listener: (config: ConfigData) => void): () => void {
|
onConfigChanged(listener: (config: ConfigData) => void): () => void {
|
||||||
@@ -78,6 +114,24 @@ export class ServerStorage {
|
|||||||
return () => this.configChangeListeners.delete(listener)
|
return () => this.configChangeListeners.delete(listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onInstanceDataChanged(instanceId: string, listener: (data: InstanceData) => void): () => void {
|
||||||
|
if (!this.instanceDataListeners.has(instanceId)) {
|
||||||
|
this.instanceDataListeners.set(instanceId, new Set())
|
||||||
|
}
|
||||||
|
const bucket = this.instanceDataListeners.get(instanceId)!
|
||||||
|
bucket.add(listener)
|
||||||
|
const cached = this.instanceDataCache.get(instanceId)
|
||||||
|
if (cached) {
|
||||||
|
listener(cached)
|
||||||
|
}
|
||||||
|
return () => {
|
||||||
|
bucket.delete(listener)
|
||||||
|
if (bucket.size === 0) {
|
||||||
|
this.instanceDataListeners.delete(instanceId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private setConfigCache(config: ConfigData) {
|
private setConfigCache(config: ConfigData) {
|
||||||
if (this.configCache && isDeepEqual(this.configCache, config)) {
|
if (this.configCache && isDeepEqual(this.configCache, config)) {
|
||||||
this.configCache = config
|
this.configCache = config
|
||||||
@@ -92,6 +146,36 @@ export class ServerStorage {
|
|||||||
listener(config)
|
listener(config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private normalizeInstanceData(data?: InstanceData | null): InstanceData {
|
||||||
|
const source = data ?? DEFAULT_INSTANCE_DATA
|
||||||
|
const messageHistory = Array.isArray(source.messageHistory) ? [...source.messageHistory] : []
|
||||||
|
return {
|
||||||
|
...source,
|
||||||
|
messageHistory,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private setInstanceDataCache(instanceId: string, data: InstanceData) {
|
||||||
|
const normalized = this.normalizeInstanceData(data)
|
||||||
|
const previous = this.instanceDataCache.get(instanceId)
|
||||||
|
if (previous && isDeepEqual(previous, normalized)) {
|
||||||
|
this.instanceDataCache.set(instanceId, normalized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
this.instanceDataCache.set(instanceId, normalized)
|
||||||
|
this.notifyInstanceDataChanged(instanceId, normalized)
|
||||||
|
}
|
||||||
|
|
||||||
|
private notifyInstanceDataChanged(instanceId: string, data: InstanceData) {
|
||||||
|
const listeners = this.instanceDataListeners.get(instanceId)
|
||||||
|
if (!listeners) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for (const listener of listeners) {
|
||||||
|
listener(data)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const storage = new ServerStorage()
|
export const storage = new ServerStorage()
|
||||||
|
|||||||
@@ -1,59 +1,88 @@
|
|||||||
|
import type { InstanceData } from "../../../cli/src/api-types"
|
||||||
import { storage } from "../lib/storage"
|
import { storage } from "../lib/storage"
|
||||||
|
|
||||||
const MAX_HISTORY = 100
|
const MAX_HISTORY = 100
|
||||||
|
|
||||||
const instanceHistories = new Map<string, string[]>()
|
const instanceDataCache = new Map<string, InstanceData>()
|
||||||
const historyLoaded = new Set<string>()
|
const instanceSubscriptions = new Map<string, () => void>()
|
||||||
|
|
||||||
export async function addToHistory(instanceId: string, text: string): Promise<void> {
|
export async function addToHistory(instanceId: string, text: string): Promise<void> {
|
||||||
await ensureHistoryLoaded(instanceId)
|
const data = await ensureInstanceData(instanceId)
|
||||||
|
const nextHistory = [text, ...data.messageHistory]
|
||||||
const history = instanceHistories.get(instanceId) || []
|
if (nextHistory.length > MAX_HISTORY) {
|
||||||
|
nextHistory.length = MAX_HISTORY
|
||||||
history.unshift(text)
|
|
||||||
|
|
||||||
if (history.length > MAX_HISTORY) {
|
|
||||||
history.length = MAX_HISTORY
|
|
||||||
}
|
}
|
||||||
|
|
||||||
instanceHistories.set(instanceId, history)
|
const nextData: InstanceData = {
|
||||||
|
...data,
|
||||||
|
messageHistory: nextHistory,
|
||||||
|
}
|
||||||
|
|
||||||
|
instanceDataCache.set(instanceId, cloneInstanceData(nextData))
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await storage.saveInstanceData(instanceId, { messageHistory: history })
|
await storage.saveInstanceData(instanceId, nextData)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.warn("Failed to persist message history:", err)
|
console.warn("Failed to persist message history:", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getHistory(instanceId: string): Promise<string[]> {
|
export async function getHistory(instanceId: string): Promise<string[]> {
|
||||||
await ensureHistoryLoaded(instanceId)
|
const data = await ensureInstanceData(instanceId)
|
||||||
return instanceHistories.get(instanceId) || []
|
return [...data.messageHistory]
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function clearHistory(instanceId: string): Promise<void> {
|
export async function clearHistory(instanceId: string): Promise<void> {
|
||||||
instanceHistories.delete(instanceId)
|
const data = await ensureInstanceData(instanceId)
|
||||||
historyLoaded.delete(instanceId)
|
const nextData: InstanceData = {
|
||||||
|
...data,
|
||||||
|
messageHistory: [],
|
||||||
|
}
|
||||||
|
|
||||||
|
instanceDataCache.set(instanceId, cloneInstanceData(nextData))
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await storage.saveInstanceData(instanceId, { messageHistory: [] })
|
await storage.saveInstanceData(instanceId, nextData)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.warn("Failed to clear history:", error)
|
console.warn("Failed to clear history:", error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function ensureHistoryLoaded(instanceId: string): Promise<void> {
|
async function ensureInstanceData(instanceId: string): Promise<InstanceData> {
|
||||||
if (historyLoaded.has(instanceId)) {
|
const cached = instanceDataCache.get(instanceId)
|
||||||
return
|
if (cached) {
|
||||||
|
return cached
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const data = await storage.loadInstanceData(instanceId)
|
const data = await storage.loadInstanceData(instanceId)
|
||||||
const history = Array.isArray(data.messageHistory) ? data.messageHistory : []
|
const normalized = cloneInstanceData(data)
|
||||||
instanceHistories.set(instanceId, history)
|
instanceDataCache.set(instanceId, normalized)
|
||||||
historyLoaded.add(instanceId)
|
attachInstanceSubscription(instanceId)
|
||||||
|
return normalized
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.warn("Failed to load history:", error)
|
console.warn("Failed to load history:", error)
|
||||||
instanceHistories.set(instanceId, [])
|
const fallback = cloneInstanceData({ messageHistory: [] })
|
||||||
historyLoaded.add(instanceId)
|
instanceDataCache.set(instanceId, fallback)
|
||||||
|
attachInstanceSubscription(instanceId)
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function attachInstanceSubscription(instanceId: string) {
|
||||||
|
if (instanceSubscriptions.has(instanceId)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const unsubscribe = storage.onInstanceDataChanged(instanceId, (data) => {
|
||||||
|
instanceDataCache.set(instanceId, cloneInstanceData(data))
|
||||||
|
})
|
||||||
|
instanceSubscriptions.set(instanceId, unsubscribe)
|
||||||
|
}
|
||||||
|
|
||||||
|
function cloneInstanceData(data?: InstanceData | null): InstanceData {
|
||||||
|
const source: InstanceData = data ?? { messageHistory: [] }
|
||||||
|
return {
|
||||||
|
...source,
|
||||||
|
messageHistory: Array.isArray(source.messageHistory) ? [...source.messageHistory] : [],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user