diff --git a/packages/ui/src/lib/api-client.ts b/packages/ui/src/lib/api-client.ts index 9719375a..f107e1a9 100644 --- a/packages/ui/src/lib/api-client.ts +++ b/packages/ui/src/lib/api-client.ts @@ -38,6 +38,7 @@ import type { } from "../../../server/src/api-types" import { getClientIdentity } from "./client-identity" import { getLogger } from "./logger" +import { attachEventSourceHandlers } from "./event-source-handlers" const RUNTIME_BASE = typeof window !== "undefined" ? window.location?.origin : undefined const DEFAULT_BASE = typeof window !== "undefined" ? window.__CODENOMAD_API_BASE__ ?? RUNTIME_BASE : undefined @@ -510,26 +511,7 @@ export const serverApi = { const url = buildClientEventsUrl(identity) sseLogger.info(`Connecting to ${url}`) const source = new EventSource(url, { withCredentials: true } as any) - source.onmessage = (event) => { - try { - const payload = JSON.parse(event.data) as WorkspaceEventPayload - onEvent(payload) - } catch (error) { - sseLogger.error("Failed to parse event", error) - } - } - source.onerror = () => { - sseLogger.warn("EventSource error, closing stream") - onError?.() - } - source.addEventListener("codenomad.client.ping", (event: MessageEvent) => { - try { - const payload = event.data ? (JSON.parse(event.data) as { ts?: number }) : {} - onPing?.(payload) - } catch (error) { - sseLogger.error("Failed to parse ping event", error) - } - }) + attachEventSourceHandlers(source, { onEvent, onError, onPing, logger: sseLogger }) return source }, } diff --git a/packages/ui/src/lib/event-source-handlers.test.ts b/packages/ui/src/lib/event-source-handlers.test.ts new file mode 100644 index 00000000..e732a44c --- /dev/null +++ b/packages/ui/src/lib/event-source-handlers.test.ts @@ -0,0 +1,69 @@ +import assert from "node:assert/strict" +import { describe, it } from "node:test" +import { attachEventSourceHandlers } from "./event-source-handlers.ts" + +class FakeEventSource extends EventTarget { + onmessage: ((event: MessageEvent) => void) | null = null + onerror: (() => void) | null = null + onclose: (() => void) | null = null +} + +const logger = { + warn() {}, + error() {}, +} + +describe("attachEventSourceHandlers", () => { + it("requests reconnect when EventSource emits close", () => { + const source = new FakeEventSource() + let reconnects = 0 + + attachEventSourceHandlers(source as unknown as EventSource, { + onEvent() {}, + onError: () => { + reconnects += 1 + }, + logger, + }) + + source.dispatchEvent(new Event("close")) + + assert.equal(reconnects, 1) + }) + + it("requests reconnect when EventSource invokes onclose", () => { + const source = new FakeEventSource() + let reconnects = 0 + + attachEventSourceHandlers(source as unknown as EventSource, { + onEvent() {}, + onError: () => { + reconnects += 1 + }, + logger, + }) + + source.onclose?.() + + assert.equal(reconnects, 1) + }) + + it("requests reconnect once when a close notification hits multiple handlers", () => { + const source = new FakeEventSource() + let reconnects = 0 + + attachEventSourceHandlers(source as unknown as EventSource, { + onEvent() {}, + onError: () => { + reconnects += 1 + }, + logger, + }) + + source.onclose?.() + source.dispatchEvent(new Event("close")) + source.onerror?.() + + assert.equal(reconnects, 1) + }) +}) diff --git a/packages/ui/src/lib/event-source-handlers.ts b/packages/ui/src/lib/event-source-handlers.ts new file mode 100644 index 00000000..7cd360ca --- /dev/null +++ b/packages/ui/src/lib/event-source-handlers.ts @@ -0,0 +1,60 @@ +import type { WorkspaceEventPayload } from "../../../server/src/api-types" + +type EventSourceLogger = { + warn: (message: string) => void + error: (message: string, error?: unknown) => void +} + +type EventSourceWithClose = EventSource & { + onclose?: () => void +} + +interface EventSourceHandlerOptions { + onEvent: (event: WorkspaceEventPayload) => void + onError?: () => void + onPing?: (payload: { ts?: number }) => void + logger: EventSourceLogger +} + +export function attachEventSourceHandlers(source: EventSource, options: EventSourceHandlerOptions) { + let disconnected = false + + source.onmessage = (event) => { + try { + const payload = JSON.parse(event.data) as WorkspaceEventPayload + options.onEvent(payload) + } catch (error) { + options.logger.error("Failed to parse event", error) + } + } + + const handleDisconnect = (reason: string) => { + if (disconnected) { + return + } + disconnected = true + options.logger.warn(reason) + options.onError?.() + } + + source.onerror = () => { + handleDisconnect("EventSource error, closing stream") + } + + ;(source as EventSourceWithClose).onclose = () => { + handleDisconnect("EventSource closed") + } + + source.addEventListener("close", () => { + handleDisconnect("EventSource closed") + }) + + source.addEventListener("codenomad.client.ping", (event: MessageEvent) => { + try { + const payload = event.data ? (JSON.parse(event.data) as { ts?: number }) : {} + options.onPing?.(payload) + } catch (error) { + options.logger.error("Failed to parse ping event", error) + } + }) +} diff --git a/packages/ui/src/lib/server-events.ts b/packages/ui/src/lib/server-events.ts index 68db4476..833e6c2a 100644 --- a/packages/ui/src/lib/server-events.ts +++ b/packages/ui/src/lib/server-events.ts @@ -20,12 +20,17 @@ class ServerEvents { private openHandlers = new Set<() => void>() private source: EventSource | null = null private retryDelay = RETRY_BASE_DELAY + private reconnectTimer: ReturnType | null = null constructor() { this.connect() } private connect() { + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer) + this.reconnectTimer = null + } if (this.source) { this.source.close() } @@ -52,15 +57,18 @@ class ServerEvents { } private scheduleReconnect() { - if (this.source) { - this.source.close() - this.source = null + if (this.reconnectTimer !== null) { + return } + const source = this.source + this.source = null logSse("Events stream disconnected, scheduling reconnect", { delayMs: this.retryDelay }) - setTimeout(() => { + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null this.retryDelay = Math.min(this.retryDelay * 2, RETRY_MAX_DELAY) this.connect() }, this.retryDelay) + source?.close() } private dispatch(event: WorkspaceEventPayload) {