fix(ui): reconnect closed SSE streams (#362)
## Summary - Reconnect the UI event stream when a runtime surfaces an SSE close notification, not only on EventSource errors. - Avoid scheduling duplicate reconnect loops when close/error notifications arrive together. - Add a targeted EventSource handler test for the close paths described in #207. ## Validation - node --experimental-strip-types --test "packages/ui/src/lib/event-source-handlers.test.ts" - npm run build --workspace @codenomad/ui Closes #207
This commit is contained in:
@@ -38,6 +38,7 @@ import type {
|
|||||||
} from "../../../server/src/api-types"
|
} from "../../../server/src/api-types"
|
||||||
import { getClientIdentity } from "./client-identity"
|
import { getClientIdentity } from "./client-identity"
|
||||||
import { getLogger } from "./logger"
|
import { getLogger } from "./logger"
|
||||||
|
import { attachEventSourceHandlers } from "./event-source-handlers"
|
||||||
|
|
||||||
const RUNTIME_BASE = typeof window !== "undefined" ? window.location?.origin : undefined
|
const RUNTIME_BASE = typeof window !== "undefined" ? window.location?.origin : undefined
|
||||||
const DEFAULT_BASE = typeof window !== "undefined" ? window.__CODENOMAD_API_BASE__ ?? RUNTIME_BASE : undefined
|
const DEFAULT_BASE = typeof window !== "undefined" ? window.__CODENOMAD_API_BASE__ ?? RUNTIME_BASE : undefined
|
||||||
@@ -510,26 +511,7 @@ export const serverApi = {
|
|||||||
const url = buildClientEventsUrl(identity)
|
const url = buildClientEventsUrl(identity)
|
||||||
sseLogger.info(`Connecting to ${url}`)
|
sseLogger.info(`Connecting to ${url}`)
|
||||||
const source = new EventSource(url, { withCredentials: true } as any)
|
const source = new EventSource(url, { withCredentials: true } as any)
|
||||||
source.onmessage = (event) => {
|
attachEventSourceHandlers(source, { onEvent, onError, onPing, logger: sseLogger })
|
||||||
try {
|
|
||||||
const payload = JSON.parse(event.data) as WorkspaceEventPayload
|
|
||||||
onEvent(payload)
|
|
||||||
} catch (error) {
|
|
||||||
sseLogger.error("Failed to parse event", error)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
source.onerror = () => {
|
|
||||||
sseLogger.warn("EventSource error, closing stream")
|
|
||||||
onError?.()
|
|
||||||
}
|
|
||||||
source.addEventListener("codenomad.client.ping", (event: MessageEvent) => {
|
|
||||||
try {
|
|
||||||
const payload = event.data ? (JSON.parse(event.data) as { ts?: number }) : {}
|
|
||||||
onPing?.(payload)
|
|
||||||
} catch (error) {
|
|
||||||
sseLogger.error("Failed to parse ping event", error)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
return source
|
return source
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|||||||
69
packages/ui/src/lib/event-source-handlers.test.ts
Normal file
69
packages/ui/src/lib/event-source-handlers.test.ts
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
import assert from "node:assert/strict"
|
||||||
|
import { describe, it } from "node:test"
|
||||||
|
import { attachEventSourceHandlers } from "./event-source-handlers.ts"
|
||||||
|
|
||||||
|
class FakeEventSource extends EventTarget {
|
||||||
|
onmessage: ((event: MessageEvent) => void) | null = null
|
||||||
|
onerror: (() => void) | null = null
|
||||||
|
onclose: (() => void) | null = null
|
||||||
|
}
|
||||||
|
|
||||||
|
const logger = {
|
||||||
|
warn() {},
|
||||||
|
error() {},
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("attachEventSourceHandlers", () => {
|
||||||
|
it("requests reconnect when EventSource emits close", () => {
|
||||||
|
const source = new FakeEventSource()
|
||||||
|
let reconnects = 0
|
||||||
|
|
||||||
|
attachEventSourceHandlers(source as unknown as EventSource, {
|
||||||
|
onEvent() {},
|
||||||
|
onError: () => {
|
||||||
|
reconnects += 1
|
||||||
|
},
|
||||||
|
logger,
|
||||||
|
})
|
||||||
|
|
||||||
|
source.dispatchEvent(new Event("close"))
|
||||||
|
|
||||||
|
assert.equal(reconnects, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("requests reconnect when EventSource invokes onclose", () => {
|
||||||
|
const source = new FakeEventSource()
|
||||||
|
let reconnects = 0
|
||||||
|
|
||||||
|
attachEventSourceHandlers(source as unknown as EventSource, {
|
||||||
|
onEvent() {},
|
||||||
|
onError: () => {
|
||||||
|
reconnects += 1
|
||||||
|
},
|
||||||
|
logger,
|
||||||
|
})
|
||||||
|
|
||||||
|
source.onclose?.()
|
||||||
|
|
||||||
|
assert.equal(reconnects, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("requests reconnect once when a close notification hits multiple handlers", () => {
|
||||||
|
const source = new FakeEventSource()
|
||||||
|
let reconnects = 0
|
||||||
|
|
||||||
|
attachEventSourceHandlers(source as unknown as EventSource, {
|
||||||
|
onEvent() {},
|
||||||
|
onError: () => {
|
||||||
|
reconnects += 1
|
||||||
|
},
|
||||||
|
logger,
|
||||||
|
})
|
||||||
|
|
||||||
|
source.onclose?.()
|
||||||
|
source.dispatchEvent(new Event("close"))
|
||||||
|
source.onerror?.()
|
||||||
|
|
||||||
|
assert.equal(reconnects, 1)
|
||||||
|
})
|
||||||
|
})
|
||||||
60
packages/ui/src/lib/event-source-handlers.ts
Normal file
60
packages/ui/src/lib/event-source-handlers.ts
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import type { WorkspaceEventPayload } from "../../../server/src/api-types"
|
||||||
|
|
||||||
|
type EventSourceLogger = {
|
||||||
|
warn: (message: string) => void
|
||||||
|
error: (message: string, error?: unknown) => void
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventSourceWithClose = EventSource & {
|
||||||
|
onclose?: () => void
|
||||||
|
}
|
||||||
|
|
||||||
|
interface EventSourceHandlerOptions {
|
||||||
|
onEvent: (event: WorkspaceEventPayload) => void
|
||||||
|
onError?: () => void
|
||||||
|
onPing?: (payload: { ts?: number }) => void
|
||||||
|
logger: EventSourceLogger
|
||||||
|
}
|
||||||
|
|
||||||
|
export function attachEventSourceHandlers(source: EventSource, options: EventSourceHandlerOptions) {
|
||||||
|
let disconnected = false
|
||||||
|
|
||||||
|
source.onmessage = (event) => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data) as WorkspaceEventPayload
|
||||||
|
options.onEvent(payload)
|
||||||
|
} catch (error) {
|
||||||
|
options.logger.error("Failed to parse event", error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const handleDisconnect = (reason: string) => {
|
||||||
|
if (disconnected) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
disconnected = true
|
||||||
|
options.logger.warn(reason)
|
||||||
|
options.onError?.()
|
||||||
|
}
|
||||||
|
|
||||||
|
source.onerror = () => {
|
||||||
|
handleDisconnect("EventSource error, closing stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
;(source as EventSourceWithClose).onclose = () => {
|
||||||
|
handleDisconnect("EventSource closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
source.addEventListener("close", () => {
|
||||||
|
handleDisconnect("EventSource closed")
|
||||||
|
})
|
||||||
|
|
||||||
|
source.addEventListener("codenomad.client.ping", (event: MessageEvent) => {
|
||||||
|
try {
|
||||||
|
const payload = event.data ? (JSON.parse(event.data) as { ts?: number }) : {}
|
||||||
|
options.onPing?.(payload)
|
||||||
|
} catch (error) {
|
||||||
|
options.logger.error("Failed to parse ping event", error)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -20,12 +20,17 @@ class ServerEvents {
|
|||||||
private openHandlers = new Set<() => void>()
|
private openHandlers = new Set<() => void>()
|
||||||
private source: EventSource | null = null
|
private source: EventSource | null = null
|
||||||
private retryDelay = RETRY_BASE_DELAY
|
private retryDelay = RETRY_BASE_DELAY
|
||||||
|
private reconnectTimer: ReturnType<typeof setTimeout> | null = null
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.connect()
|
this.connect()
|
||||||
}
|
}
|
||||||
|
|
||||||
private connect() {
|
private connect() {
|
||||||
|
if (this.reconnectTimer !== null) {
|
||||||
|
clearTimeout(this.reconnectTimer)
|
||||||
|
this.reconnectTimer = null
|
||||||
|
}
|
||||||
if (this.source) {
|
if (this.source) {
|
||||||
this.source.close()
|
this.source.close()
|
||||||
}
|
}
|
||||||
@@ -52,15 +57,18 @@ class ServerEvents {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private scheduleReconnect() {
|
private scheduleReconnect() {
|
||||||
if (this.source) {
|
if (this.reconnectTimer !== null) {
|
||||||
this.source.close()
|
return
|
||||||
this.source = null
|
|
||||||
}
|
}
|
||||||
|
const source = this.source
|
||||||
|
this.source = null
|
||||||
logSse("Events stream disconnected, scheduling reconnect", { delayMs: this.retryDelay })
|
logSse("Events stream disconnected, scheduling reconnect", { delayMs: this.retryDelay })
|
||||||
setTimeout(() => {
|
this.reconnectTimer = setTimeout(() => {
|
||||||
|
this.reconnectTimer = null
|
||||||
this.retryDelay = Math.min(this.retryDelay * 2, RETRY_MAX_DELAY)
|
this.retryDelay = Math.min(this.retryDelay * 2, RETRY_MAX_DELAY)
|
||||||
this.connect()
|
this.connect()
|
||||||
}, this.retryDelay)
|
}, this.retryDelay)
|
||||||
|
source?.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
private dispatch(event: WorkspaceEventPayload) {
|
private dispatch(event: WorkspaceEventPayload) {
|
||||||
|
|||||||
Reference in New Issue
Block a user