From 1741e49568f3bfb283a8ba8e9fda77ea634aeefc Mon Sep 17 00:00:00 2001 From: Shantur Rathore Date: Sun, 23 Nov 2025 19:07:10 +0000 Subject: [PATCH] aggregate instance SSE streams through server bus so UI uses single connection --- packages/server/src/api-types.ts | 12 ++ packages/server/src/events/bus.ts | 8 +- packages/server/src/index.ts | 8 + .../server/src/workspaces/instance-events.ts | 187 ++++++++++++++++++ packages/ui/src/lib/sse-manager.ts | 149 ++++---------- packages/ui/src/stores/instances.ts | 5 +- 6 files changed, 252 insertions(+), 117 deletions(-) create mode 100644 packages/server/src/workspaces/instance-events.ts diff --git a/packages/server/src/api-types.ts b/packages/server/src/api-types.ts index 13b48377..75d9bd59 100644 --- a/packages/server/src/api-types.ts +++ b/packages/server/src/api-types.ts @@ -111,6 +111,14 @@ export interface InstanceData { agentModelSelections: AgentModelSelection } +export type InstanceStreamStatus = "connecting" | "connected" | "error" | "disconnected" + +export interface InstanceStreamEvent { + type: string + properties?: Record + [key: string]: unknown +} + export interface BinaryRecord { id: string path: string @@ -157,6 +165,8 @@ export type WorkspaceEventType = | "config.appChanged" | "config.binariesChanged" | "instance.dataChanged" + | "instance.event" + | "instance.eventStatus" export type WorkspaceEventPayload = | { type: "workspace.created"; workspace: WorkspaceDescriptor } @@ -167,6 +177,8 @@ export type WorkspaceEventPayload = | { type: "config.appChanged"; config: AppConfig } | { type: "config.binariesChanged"; binaries: BinaryRecord[] } | { type: "instance.dataChanged"; instanceId: string; data: InstanceData } + | { type: "instance.event"; instanceId: string; event: InstanceStreamEvent } + | { type: "instance.eventStatus"; instanceId: string; status: InstanceStreamStatus; reason?: string } export interface ServerMeta { /** Base URL clients should target for REST calls (useful for Electron embedding). */ diff --git a/packages/server/src/events/bus.ts b/packages/server/src/events/bus.ts index d4c523fc..b1657fd0 100644 --- a/packages/server/src/events/bus.ts +++ b/packages/server/src/events/bus.ts @@ -8,7 +8,9 @@ export class EventBus extends EventEmitter { } publish(event: WorkspaceEventPayload): boolean { - this.logger?.debug({ event }, "Publishing workspace event") + if (event.type !== "instance.event" && event.type !== "instance.eventStatus") { + this.logger?.debug({ event }, "Publishing workspace event") + } return super.emit(event.type, event) } @@ -22,6 +24,8 @@ export class EventBus extends EventEmitter { this.on("config.appChanged", handler) this.on("config.binariesChanged", handler) this.on("instance.dataChanged", handler) + this.on("instance.event", handler) + this.on("instance.eventStatus", handler) return () => { this.off("workspace.created", handler) this.off("workspace.started", handler) @@ -31,6 +35,8 @@ export class EventBus extends EventEmitter { this.off("config.appChanged", handler) this.off("config.binariesChanged", handler) this.off("instance.dataChanged", handler) + this.off("instance.event", handler) + this.off("instance.eventStatus", handler) } } } diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index ba0935e0..5ef50c1e 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -14,10 +14,12 @@ import { FileSystemBrowser } from "./filesystem/browser" import { EventBus } from "./events/bus" import { ServerMeta } from "./api-types" import { InstanceStore } from "./storage/instance-store" +import { InstanceEventBridge } from "./workspaces/instance-events" import { createLogger } from "./logger" import { launchInBrowser } from "./launcher" const require = createRequire(import.meta.url) + const packageJson = require("../package.json") as { version: string } const __filename = fileURLToPath(import.meta.url) const __dirname = path.dirname(__filename) @@ -121,6 +123,11 @@ async function main() { }) const fileSystemBrowser = new FileSystemBrowser({ rootDir: options.rootDir, unrestricted: options.unrestrictedRoot }) const instanceStore = new InstanceStore() + const instanceEventBridge = new InstanceEventBridge({ + workspaceManager, + eventBus, + logger: logger.child({ component: "instance-events" }), + }) const serverMeta: ServerMeta = { httpBaseUrl: `http://${options.host}:${options.port}`, @@ -169,6 +176,7 @@ async function main() { } try { + instanceEventBridge.shutdown() await workspaceManager.shutdown() logger.info("Workspace manager shutdown complete") } catch (error) { diff --git a/packages/server/src/workspaces/instance-events.ts b/packages/server/src/workspaces/instance-events.ts new file mode 100644 index 00000000..4e8bb65a --- /dev/null +++ b/packages/server/src/workspaces/instance-events.ts @@ -0,0 +1,187 @@ +import { fetch } from "undici" +import { EventBus } from "../events/bus" +import { Logger } from "../logger" +import { WorkspaceManager } from "./manager" +import { InstanceStreamEvent, InstanceStreamStatus } from "../api-types" + +const INSTANCE_HOST = "127.0.0.1" +const RECONNECT_DELAY_MS = 1000 + +interface InstanceEventBridgeOptions { + workspaceManager: WorkspaceManager + eventBus: EventBus + logger: Logger +} + +interface ActiveStream { + controller: AbortController + task: Promise +} + +export class InstanceEventBridge { + private readonly streams = new Map() + + constructor(private readonly options: InstanceEventBridgeOptions) { + const bus = this.options.eventBus + bus.on("workspace.started", (event) => this.startStream(event.workspace.id)) + bus.on("workspace.stopped", (event) => this.stopStream(event.workspaceId)) + bus.on("workspace.error", (event) => this.stopStream(event.workspace.id)) + } + + shutdown() { + for (const [id, active] of this.streams) { + active.controller.abort() + this.publishStatus(id, "disconnected") + } + this.streams.clear() + } + + private startStream(workspaceId: string) { + if (this.streams.has(workspaceId)) { + return + } + + const controller = new AbortController() + const task = this.runStream(workspaceId, controller.signal) + .catch((error) => { + if (!controller.signal.aborted) { + this.options.logger.warn({ workspaceId, err: error }, "Instance event stream failed") + this.publishStatus(workspaceId, "error", error instanceof Error ? error.message : String(error)) + } + }) + .finally(() => { + const active = this.streams.get(workspaceId) + if (active?.controller === controller) { + this.streams.delete(workspaceId) + } + }) + + this.streams.set(workspaceId, { controller, task }) + } + + private stopStream(workspaceId: string) { + const active = this.streams.get(workspaceId) + if (!active) { + return + } + active.controller.abort() + this.streams.delete(workspaceId) + this.publishStatus(workspaceId, "disconnected") + } + + private async runStream(workspaceId: string, signal: AbortSignal) { + while (!signal.aborted) { + const port = this.options.workspaceManager.getInstancePort(workspaceId) + if (!port) { + await this.delay(RECONNECT_DELAY_MS, signal) + continue + } + + this.publishStatus(workspaceId, "connecting") + + try { + await this.consumeStream(workspaceId, port, signal) + } catch (error) { + if (signal.aborted) { + break + } + this.options.logger.warn({ workspaceId, err: error }, "Instance event stream disconnected") + this.publishStatus(workspaceId, "error", error instanceof Error ? error.message : String(error)) + await this.delay(RECONNECT_DELAY_MS, signal) + } + } + } + + private async consumeStream(workspaceId: string, port: number, signal: AbortSignal) { + const url = `http://${INSTANCE_HOST}:${port}/event` + const response = await fetch(url, { + headers: { Accept: "text/event-stream" }, + signal, + }) + + if (!response.ok || !response.body) { + throw new Error(`Instance event stream unavailable (${response.status})`) + } + + this.publishStatus(workspaceId, "connected") + + const reader = response.body.getReader() + const decoder = new TextDecoder() + let buffer = "" + + while (!signal.aborted) { + const { done, value } = await reader.read() + if (done || !value) { + break + } + buffer += decoder.decode(value, { stream: true }) + buffer = this.flushEvents(buffer, workspaceId) + } + } + + private flushEvents(buffer: string, workspaceId: string) { + let separatorIndex = buffer.indexOf("\n\n") + + while (separatorIndex >= 0) { + const chunk = buffer.slice(0, separatorIndex) + buffer = buffer.slice(separatorIndex + 2) + this.processChunk(chunk, workspaceId) + separatorIndex = buffer.indexOf("\n\n") + } + + return buffer + } + + private processChunk(chunk: string, workspaceId: string) { + const lines = chunk.split(/\r?\n/) + const dataLines: string[] = [] + + for (const line of lines) { + if (line.startsWith(":")) { + continue + } + if (line.startsWith("data:")) { + dataLines.push(line.slice(5).trimStart()) + } + } + + if (dataLines.length === 0) { + return + } + + const payload = dataLines.join("\n").trim() + if (!payload) { + return + } + + try { + const event = JSON.parse(payload) as InstanceStreamEvent + this.options.eventBus.publish({ type: "instance.event", instanceId: workspaceId, event }) + } catch (error) { + this.options.logger.warn({ workspaceId, chunk: payload, err: error }, "Failed to parse instance SSE payload") + } + } + + private publishStatus(instanceId: string, status: InstanceStreamStatus, reason?: string) { + this.options.eventBus.publish({ type: "instance.eventStatus", instanceId, status, reason }) + } + + private delay(duration: number, signal: AbortSignal) { + if (duration <= 0) { + return Promise.resolve() + } + return new Promise((resolve) => { + const timeout = setTimeout(() => { + signal.removeEventListener("abort", onAbort) + resolve() + }, duration) + + const onAbort = () => { + clearTimeout(timeout) + resolve() + } + + signal.addEventListener("abort", onAbort, { once: true }) + }) + } +} diff --git a/packages/ui/src/lib/sse-manager.ts b/packages/ui/src/lib/sse-manager.ts index 2b993e3c..5329b1af 100644 --- a/packages/ui/src/lib/sse-manager.ts +++ b/packages/ui/src/lib/sse-manager.ts @@ -14,16 +14,15 @@ import type { EventSessionIdle, EventSessionUpdated, } from "@opencode-ai/sdk" -import { CODENOMAD_API_BASE } from "./api-client" +import { serverEvents } from "./server-events" +import type { + InstanceStreamEvent, + InstanceStreamStatus, + WorkspaceEventPayload, +} from "../../../server/src/api-types" -interface SSEConnection { - instanceId: string - proxyPath: string - eventSource: EventSource - status: "connecting" | "connected" | "disconnected" | "error" - reconnectAttempts: number - reconnectTimer?: ReturnType -} +type InstanceEventPayload = Extract +type InstanceStatusPayload = Extract interface TuiToastEvent { type: "tui.toast.show" @@ -35,7 +34,7 @@ interface TuiToastEvent { } } -type SSEEvent = +type SSEEvent = | MessageUpdateEvent | MessageRemovedEvent | MessagePartUpdatedEvent @@ -48,73 +47,40 @@ type SSEEvent = | EventPermissionReplied | EventLspUpdated | TuiToastEvent - | { type: string; properties?: Record } // Fallback for unknown event types + | { type: string; properties?: Record } -const [connectionStatus, setConnectionStatus] = createSignal< - Map ->(new Map()) +type ConnectionStatus = InstanceStreamStatus + +const [connectionStatus, setConnectionStatus] = createSignal>(new Map()) class SSEManager { - private connections = new Map() - private static readonly MAX_RECONNECT_DELAY_MS = 5000 - - connect(instanceId: string, proxyPath: string, reconnectAttempts = 0): void { - const existing = this.connections.get(instanceId) - if (existing) { - this.clearReconnectTimer(existing) - existing.eventSource.close() - } - - const url = buildInstanceEventsUrl(proxyPath) - const eventSource = new EventSource(url) - - const connection: SSEConnection = { - instanceId, - proxyPath, - eventSource, - status: "connecting", - reconnectAttempts, - } - - this.connections.set(instanceId, connection) - this.updateConnectionStatus(instanceId, "connecting") - - eventSource.onopen = () => { - connection.status = "connected" - connection.reconnectAttempts = 0 - this.updateConnectionStatus(instanceId, "connected") - console.log(`[SSE] Connected to instance ${instanceId}`) - } - - eventSource.onmessage = (event) => { - try { - const data = JSON.parse(event.data) - this.handleEvent(instanceId, data) - } catch (error) { - console.error("[SSE] Failed to parse event:", error) + constructor() { + serverEvents.on("instance.eventStatus", (event) => { + const payload = event as InstanceStatusPayload + this.updateConnectionStatus(payload.instanceId, payload.status) + if (payload.status === "error") { + const reason = payload.reason ?? "Instance stream error" + void this.onConnectionLost?.(payload.instanceId, reason) } - } + }) - eventSource.onerror = () => { - connection.status = "error" - this.updateConnectionStatus(instanceId, "error") - console.error(`[SSE] Connection error for instance ${instanceId}`) - this.handleConnectionError(instanceId, "Connection to instance lost") - } + serverEvents.on("instance.event", (event) => { + const payload = event as InstanceEventPayload + this.updateConnectionStatus(payload.instanceId, "connected") + this.handleEvent(payload.instanceId, payload.event as SSEEvent) + }) } - disconnect(instanceId: string): void { - const connection = this.connections.get(instanceId) - if (connection) { - this.clearReconnectTimer(connection) - connection.eventSource.close() - this.connections.delete(instanceId) - this.updateConnectionStatus(instanceId, "disconnected") - console.log(`[SSE] Disconnected from instance ${instanceId}`) - } + seedStatus(instanceId: string, status: ConnectionStatus) { + this.updateConnectionStatus(instanceId, status) } - private handleEvent(instanceId: string, event: SSEEvent): void { + private handleEvent(instanceId: string, event: SSEEvent | InstanceStreamEvent): void { + if (!event || typeof event !== "object" || typeof (event as { type?: unknown }).type !== "string") { + console.warn("[SSE] Dropping malformed event", event) + return + } + console.log("[SSE] Received event:", event.type, event) switch (event.type) { @@ -159,35 +125,7 @@ class SSEManager { } } - private handleConnectionError(instanceId: string, reason: string): void { - const connection = this.connections.get(instanceId) - if (!connection) return - - connection.eventSource.close() - - const nextAttempt = connection.reconnectAttempts + 1 - const delay = Math.min(nextAttempt * 1000, SSEManager.MAX_RECONNECT_DELAY_MS) - - connection.reconnectAttempts = nextAttempt - connection.status = "connecting" - this.updateConnectionStatus(instanceId, "connecting") - - console.warn(`[SSE] Attempting reconnect ${nextAttempt} for instance ${instanceId}`) - - connection.reconnectTimer = setTimeout(() => { - connection.reconnectTimer = undefined - this.connect(instanceId, connection.proxyPath, nextAttempt) - }, delay) - } - - private clearReconnectTimer(connection: SSEConnection): void { - if (connection.reconnectTimer) { - clearTimeout(connection.reconnectTimer) - connection.reconnectTimer = undefined - } - } - - private updateConnectionStatus(instanceId: string, status: SSEConnection["status"]): void { + private updateConnectionStatus(instanceId: string, status: ConnectionStatus): void { setConnectionStatus((prev) => { const next = new Map(prev) next.set(instanceId, status) @@ -209,7 +147,7 @@ class SSEManager { onLspUpdated?: (instanceId: string, event: EventLspUpdated) => void onConnectionLost?: (instanceId: string, reason: string) => void | Promise - getStatus(instanceId: string): "connecting" | "connected" | "disconnected" | "error" | null { + getStatus(instanceId: string): ConnectionStatus | null { return connectionStatus().get(instanceId) ?? null } @@ -218,19 +156,4 @@ class SSEManager { } } -function buildInstanceEventsUrl(proxyPath: string): string { - const normalized = normalizeProxyPath(proxyPath) - const base = stripTrailingSlashes(CODENOMAD_API_BASE) - return `${base}${normalized}/event` -} - -function normalizeProxyPath(proxyPath: string): string { - const withLeading = proxyPath.startsWith("/") ? proxyPath : `/${proxyPath}` - return withLeading.replace(/\/+/g, "/").replace(/\/+$/, "") -} - -function stripTrailingSlashes(input: string): string { - return input.replace(/\/+$/, "") -} - export const sseManager = new SSEManager() diff --git a/packages/ui/src/stores/instances.ts b/packages/ui/src/stores/instances.ts index adb1f7a6..1a1f3fb3 100644 --- a/packages/ui/src/stores/instances.ts +++ b/packages/ui/src/stores/instances.ts @@ -89,7 +89,6 @@ function attachClient(descriptor: WorkspaceDescriptor) { if (instance.client) { sdkManager.destroyClient(descriptor.id) - sseManager.disconnect(descriptor.id) } const client = sdkManager.createClient(descriptor.id, nextProxyPath) @@ -99,7 +98,7 @@ function attachClient(descriptor: WorkspaceDescriptor) { proxyPath: nextProxyPath, status: "ready", }) - sseManager.connect(descriptor.id, nextProxyPath) + sseManager.seedStatus(descriptor.id, "connecting") void hydrateInstanceData(descriptor.id).catch((error) => { console.error("Failed to hydrate instance data", error) }) @@ -112,7 +111,7 @@ function releaseInstanceResources(instanceId: string) { if (instance.client) { sdkManager.destroyClient(instanceId) } - sseManager.disconnect(instanceId) + sseManager.seedStatus(instanceId, "disconnected") } async function hydrateInstanceData(instanceId: string) {