From 42589464e56055f901f34aaf11db82b9576436fc Mon Sep 17 00:00:00 2001 From: Shantur Date: Tue, 31 Mar 2026 12:39:29 +0100 Subject: [PATCH] feat(voice): support per-client conversation mode state --- .../server/src/clients/connection-manager.ts | 128 ++++++++++++++++++ packages/server/src/plugins/voice-mode.ts | 96 +++++++++++++ packages/server/src/server/http-server.ts | 17 ++- packages/server/src/server/routes/events.ts | 30 +++- packages/server/src/server/routes/plugin.ts | 17 ++- packages/ui/src/lib/api-client.ts | 40 +++++- packages/ui/src/lib/client-identity.ts | 58 ++++++++ packages/ui/src/lib/server-events.ts | 23 +++- packages/ui/src/stores/conversation-speech.ts | 14 ++ 9 files changed, 409 insertions(+), 14 deletions(-) create mode 100644 packages/server/src/clients/connection-manager.ts create mode 100644 packages/server/src/plugins/voice-mode.ts create mode 100644 packages/ui/src/lib/client-identity.ts diff --git a/packages/server/src/clients/connection-manager.ts b/packages/server/src/clients/connection-manager.ts new file mode 100644 index 00000000..7eaa426f --- /dev/null +++ b/packages/server/src/clients/connection-manager.ts @@ -0,0 +1,128 @@ +import type { Logger } from "../logger" + +const STALE_CONNECTION_TIMEOUT_MS = 45000 +const STALE_SWEEP_INTERVAL_MS = 5000 + +export interface ClientConnectionRef { + clientId: string + connectionId: string +} + +export interface ClientConnectionRecord extends ClientConnectionRef { + key: string + connectedAt: number + lastSeenAt: number +} + +type ConnectionChangeEvent = { + type: "connected" | "disconnected" + connection: ClientConnectionRecord + reason?: string +} + +interface RegisteredConnection extends ClientConnectionRecord { + close: () => void +} + +export class ClientConnectionManager { + private readonly connections = new Map() + private readonly subscribers = new Set<(event: ConnectionChangeEvent) => void>() + private readonly sweepTimer: NodeJS.Timeout + + constructor(private readonly logger: Logger) { + this.sweepTimer = setInterval(() => this.sweepStaleConnections(), STALE_SWEEP_INTERVAL_MS) + this.sweepTimer.unref?.() + } + + shutdown(): void { + clearInterval(this.sweepTimer) + for (const connection of Array.from(this.connections.values())) { + this.disconnect(connection.key, "shutdown", false) + } + } + + subscribe(listener: (event: ConnectionChangeEvent) => void): () => void { + this.subscribers.add(listener) + return () => this.subscribers.delete(listener) + } + + register(input: ClientConnectionRef & { close: () => void }): () => void { + const key = getConnectionKey(input) + const now = Date.now() + const existing = this.connections.get(key) + + if (existing) { + this.logger.debug({ clientId: input.clientId, connectionId: input.connectionId }, "Replacing existing client connection") + this.disconnect(key, "replaced") + } + + const connection: RegisteredConnection = { + key, + clientId: input.clientId, + connectionId: input.connectionId, + connectedAt: now, + lastSeenAt: now, + close: input.close, + } + this.connections.set(key, connection) + this.logger.debug({ clientId: input.clientId, connectionId: input.connectionId }, "Client connected") + this.notify({ type: "connected", connection }) + return () => this.disconnect(key, "closed") + } + + pong(input: ClientConnectionRef): boolean { + const key = getConnectionKey(input) + const connection = this.connections.get(key) + if (!connection) { + this.logger.debug({ clientId: input.clientId, connectionId: input.connectionId }, "Ignoring pong for unknown client connection") + return false + } + + connection.lastSeenAt = Date.now() + return true + } + + isConnected(input: ClientConnectionRef): boolean { + return this.connections.has(getConnectionKey(input)) + } + + private sweepStaleConnections(): void { + const cutoff = Date.now() - STALE_CONNECTION_TIMEOUT_MS + for (const connection of Array.from(this.connections.values())) { + if (connection.lastSeenAt > cutoff) continue + this.logger.debug({ clientId: connection.clientId, connectionId: connection.connectionId }, "Client connection timed out") + this.disconnect(connection.key, "timeout") + } + } + + private disconnect(key: string, reason: string, invokeClose = true): void { + const connection = this.connections.get(key) + if (!connection) return + this.connections.delete(key) + this.logger.debug({ clientId: connection.clientId, connectionId: connection.connectionId, reason }, "Client disconnected") + + if (invokeClose) { + try { + connection.close() + } catch (error) { + this.logger.warn({ err: error, clientId: connection.clientId, connectionId: connection.connectionId }, "Failed to close stale client connection") + } + } + + this.notify({ type: "disconnected", connection, reason }) + } + + private notify(event: ConnectionChangeEvent): void { + for (const subscriber of this.subscribers) { + try { + subscriber(event) + } catch (error) { + this.logger.warn({ err: error, eventType: event.type }, "Client connection subscriber failed") + } + } + } +} + +function getConnectionKey(input: ClientConnectionRef): string { + return `${input.clientId}:${input.connectionId}` +} diff --git a/packages/server/src/plugins/voice-mode.ts b/packages/server/src/plugins/voice-mode.ts new file mode 100644 index 00000000..3e2f8cb5 --- /dev/null +++ b/packages/server/src/plugins/voice-mode.ts @@ -0,0 +1,96 @@ +import type { Logger } from "../logger" +import type { ClientConnectionManager, ClientConnectionRef } from "../clients/connection-manager" +import type { PluginChannelManager } from "./channel" + +interface VoiceModeManagerOptions { + connections: ClientConnectionManager + channel: PluginChannelManager + logger: Logger +} + +export class VoiceModeManager { + private readonly enabledConnectionsByInstance = new Map>() + private readonly aggregateByInstance = new Map() + + constructor(private readonly options: VoiceModeManagerOptions) { + this.options.connections.subscribe((event) => { + if (event.type !== "disconnected") return + this.clearConnection(event.connection) + }) + } + + setEnabled(instanceId: string, connection: ClientConnectionRef, enabled: boolean): void { + if (enabled && !this.options.connections.isConnected(connection)) { + this.options.logger.debug( + { instanceId, clientId: connection.clientId, connectionId: connection.connectionId }, + "Ignoring voice mode enable for disconnected client connection", + ) + return + } + + const key = getConnectionKey(connection) + const current = this.enabledConnectionsByInstance.get(instanceId) ?? new Set() + + if (enabled) { + current.add(key) + this.enabledConnectionsByInstance.set(instanceId, current) + } else if (current.delete(key)) { + if (current.size === 0) { + this.enabledConnectionsByInstance.delete(instanceId) + } else { + this.enabledConnectionsByInstance.set(instanceId, current) + } + } + + this.options.logger.debug({ instanceId, clientId: connection.clientId, connectionId: connection.connectionId, enabled }, "Voice mode updated for client connection") + this.publishIfChanged(instanceId) + } + + syncInstance(instanceId: string): void { + this.options.channel.send(instanceId, buildVoiceModeEvent(this.isEnabled(instanceId))) + } + + isEnabled(instanceId: string): boolean { + return this.aggregateByInstance.get(instanceId) === true + } + + private clearConnection(connection: ClientConnectionRef): void { + const key = getConnectionKey(connection) + for (const [instanceId, enabledConnections] of Array.from(this.enabledConnectionsByInstance.entries())) { + if (!enabledConnections.delete(key)) continue + if (enabledConnections.size === 0) { + this.enabledConnectionsByInstance.delete(instanceId) + } + this.publishIfChanged(instanceId) + } + } + + private publishIfChanged(instanceId: string): void { + const enabled = (this.enabledConnectionsByInstance.get(instanceId)?.size ?? 0) > 0 + const previous = this.aggregateByInstance.get(instanceId) === true + if (enabled === previous) return + + if (enabled) { + this.aggregateByInstance.set(instanceId, true) + } else { + this.aggregateByInstance.delete(instanceId) + } + + this.options.logger.debug({ instanceId, enabled }, "Broadcasting aggregate voice mode") + this.options.channel.send(instanceId, buildVoiceModeEvent(enabled)) + } +} + +function buildVoiceModeEvent(enabled: boolean) { + return { + type: "codenomad.voiceMode", + properties: { + enabled, + formatVersion: "v1", + }, + } +} + +function getConnectionKey(connection: ClientConnectionRef): string { + return `${connection.clientId}:${connection.connectionId}` +} diff --git a/packages/server/src/server/http-server.ts b/packages/server/src/server/http-server.ts index 13391946..61f82535 100644 --- a/packages/server/src/server/http-server.ts +++ b/packages/server/src/server/http-server.ts @@ -29,7 +29,9 @@ import type { AuthManager } from "../auth/manager" import { registerAuthRoutes } from "./routes/auth" import { sendUnauthorized, wantsHtml } from "../auth/http-auth" import type { SpeechService } from "../speech/service" +import { ClientConnectionManager } from "../clients/connection-manager" import { PluginChannelManager } from "../plugins/channel" +import { VoiceModeManager } from "../plugins/voice-mode" interface HttpServerDeps { bindHost: string @@ -174,7 +176,13 @@ export function createHttpServer(deps: HttpServerDeps) { eventBus: deps.eventBus, logger: deps.logger.child({ component: "background-processes" }), }) + const clientConnectionManager = new ClientConnectionManager(deps.logger.child({ component: "client-connections" })) const pluginChannel = new PluginChannelManager(deps.logger.child({ component: "plugin-channel" })) + const voiceModeManager = new VoiceModeManager({ + connections: clientConnectionManager, + channel: pluginChannel, + logger: deps.logger.child({ component: "voice-mode" }), + }) registerAuthRoutes(app, { authManager: deps.authManager }) @@ -250,7 +258,12 @@ export function createHttpServer(deps: HttpServerDeps) { registerSettingsRoutes(app, { settings: deps.settings, logger: apiLogger }) registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser }) registerMetaRoutes(app, { serverMeta: deps.serverMeta }) - registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient, logger: sseLogger }) + registerEventRoutes(app, { + eventBus: deps.eventBus, + registerClient: registerSseClient, + logger: sseLogger, + connectionManager: clientConnectionManager, + }) registerWorktreeRoutes(app, { workspaceManager: deps.workspaceManager }) registerStorageRoutes(app, { instanceStore: deps.instanceStore, @@ -263,6 +276,7 @@ export function createHttpServer(deps: HttpServerDeps) { eventBus: deps.eventBus, logger: proxyLogger, channel: pluginChannel, + voiceModeManager, }) registerBackgroundProcessRoutes(app, { backgroundProcessManager }) registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger }) @@ -328,6 +342,7 @@ export function createHttpServer(deps: HttpServerDeps) { }, stop: () => { closeSseClients() + clientConnectionManager.shutdown() return app.close() }, } diff --git a/packages/server/src/server/routes/events.ts b/packages/server/src/server/routes/events.ts index e8f23298..158266e1 100644 --- a/packages/server/src/server/routes/events.ts +++ b/packages/server/src/server/routes/events.ts @@ -1,19 +1,32 @@ import { FastifyInstance } from "fastify" +import { z } from "zod" import { EventBus } from "../../events/bus" import { WorkspaceEventPayload } from "../../api-types" +import type { ClientConnectionManager } from "../../clients/connection-manager" import { Logger } from "../../logger" interface RouteDeps { eventBus: EventBus registerClient: (cleanup: () => void) => () => void logger: Logger + connectionManager: ClientConnectionManager } let nextClientId = 0 +const ConnectionQuerySchema = z.object({ + clientId: z.string().trim().min(1), + connectionId: z.string().trim().min(1), +}) + +const PongBodySchema = ConnectionQuerySchema.extend({ + pingTs: z.number().optional(), +}) + export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) { app.get("/api/events", (request, reply) => { const clientId = ++nextClientId + const connection = ConnectionQuerySchema.parse(request.query ?? {}) deps.logger.debug({ clientId }, "SSE client connected") const origin = request.headers.origin ?? "*" @@ -35,7 +48,8 @@ export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) { const unsubscribe = deps.eventBus.onEvent(send) const heartbeat = setInterval(() => { - reply.raw.write(`:hb ${Date.now()}\n\n`) + const ping = { ts: Date.now() } + reply.raw.write(`event: codenomad.client.ping\ndata: ${JSON.stringify(ping)}\n\n`) }, 15000) let closed = false @@ -49,13 +63,27 @@ export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) { } const unregister = deps.registerClient(close) + const unregisterConnection = deps.connectionManager.register({ + ...connection, + close, + }) const handleClose = () => { close() unregister() + unregisterConnection() } request.raw.on("close", handleClose) request.raw.on("error", handleClose) }) + + app.post("/api/client-connections/pong", (request, reply) => { + const body = PongBodySchema.parse(request.body ?? {}) + if (!deps.connectionManager.pong(body)) { + reply.code(404).send({ error: "Client connection not found" }) + return + } + reply.code(204).send() + }) } diff --git a/packages/server/src/server/routes/plugin.ts b/packages/server/src/server/routes/plugin.ts index 5b37f3bb..daa7630e 100644 --- a/packages/server/src/server/routes/plugin.ts +++ b/packages/server/src/server/routes/plugin.ts @@ -6,12 +6,14 @@ import type { EventBus } from "../../events/bus" import type { Logger } from "../../logger" import { PluginChannelManager } from "../../plugins/channel" import { buildPingEvent, handlePluginEvent } from "../../plugins/handlers" +import { VoiceModeManager } from "../../plugins/voice-mode" interface RouteDeps { workspaceManager: WorkspaceManager eventBus: EventBus logger: Logger channel: PluginChannelManager + voiceModeManager: VoiceModeManager } const PluginEventSchema = z.object({ @@ -21,6 +23,8 @@ const PluginEventSchema = z.object({ const VoiceModeStateSchema = z.object({ enabled: z.boolean(), + clientId: z.string().trim().min(1), + connectionId: z.string().trim().min(1), }) export function registerPluginRoutes(app: FastifyInstance, deps: RouteDeps) { @@ -38,6 +42,7 @@ export function registerPluginRoutes(app: FastifyInstance, deps: RouteDeps) { reply.hijack() const registration = deps.channel.register(request.params.id, reply) + deps.voiceModeManager.syncInstance(request.params.id) const heartbeat = setInterval(() => { deps.channel.send(request.params.id, buildPingEvent()) @@ -61,13 +66,11 @@ export function registerPluginRoutes(app: FastifyInstance, deps: RouteDeps) { } const payload = VoiceModeStateSchema.parse(request.body ?? {}) - deps.channel.send(request.params.id, { - type: "codenomad.voiceMode", - properties: { - enabled: payload.enabled, - formatVersion: "v1", - }, - }) + deps.voiceModeManager.setEnabled( + request.params.id, + { clientId: payload.clientId, connectionId: payload.connectionId }, + payload.enabled, + ) return { enabled: payload.enabled } }) diff --git a/packages/ui/src/lib/api-client.ts b/packages/ui/src/lib/api-client.ts index 4dfc426c..10bb29ea 100644 --- a/packages/ui/src/lib/api-client.ts +++ b/packages/ui/src/lib/api-client.ts @@ -24,6 +24,7 @@ import type { WorktreeMap, WorktreeCreateRequest, } from "../../../server/src/api-types" +import { getClientIdentity } from "./client-identity" import { getLogger } from "./logger" const RUNTIME_BASE = typeof window !== "undefined" ? window.location?.origin : undefined @@ -350,9 +351,16 @@ export const serverApi = { ) }, updateVoiceMode(instanceId: string, enabled: boolean): Promise { + const identity = getClientIdentity() return request(`/workspaces/${encodeURIComponent(instanceId)}/plugin/voice-mode`, { method: "POST", - body: JSON.stringify({ enabled }), + body: JSON.stringify({ ...identity, enabled }), + }) + }, + sendClientConnectionPong(payload: { clientId: string; connectionId: string; pingTs?: number }): Promise { + return request("/api/client-connections/pong", { + method: "POST", + body: JSON.stringify(payload), }) }, fetchBackgroundProcessOutput( @@ -379,9 +387,15 @@ export const serverApi = { `/workspaces/${encodeURIComponent(instanceId)}/plugin/background-processes/${encodeURIComponent(processId)}/output${suffix}`, ) }, - connectEvents(onEvent: (event: WorkspaceEventPayload) => void, onError?: () => void) { - sseLogger.info(`Connecting to ${EVENTS_URL}`) - const source = new EventSource(EVENTS_URL, { withCredentials: true } as any) + connectEvents( + onEvent: (event: WorkspaceEventPayload) => void, + onError?: () => void, + onPing?: (payload: { ts?: number }) => void, + ) { + const identity = getClientIdentity() + const url = buildClientEventsUrl(identity) + sseLogger.info(`Connecting to ${url}`) + const source = new EventSource(url, { withCredentials: true } as any) source.onmessage = (event) => { try { const payload = JSON.parse(event.data) as WorkspaceEventPayload @@ -394,8 +408,26 @@ export const serverApi = { sseLogger.warn("EventSource error, closing stream") onError?.() } + source.addEventListener("codenomad.client.ping", (event: MessageEvent) => { + try { + const payload = event.data ? (JSON.parse(event.data) as { ts?: number }) : {} + onPing?.(payload) + } catch (error) { + sseLogger.error("Failed to parse ping event", error) + } + }) return source }, } +function buildClientEventsUrl(identity: { clientId: string; connectionId: string }): string { + const url = new URL(EVENTS_URL, typeof window !== "undefined" ? window.location.origin : "http://localhost") + url.searchParams.set("clientId", identity.clientId) + url.searchParams.set("connectionId", identity.connectionId) + if (EVENTS_URL.startsWith("http://") || EVENTS_URL.startsWith("https://")) { + return url.toString() + } + return `${url.pathname}${url.search}` +} + export type { WorkspaceDescriptor, WorkspaceLogEntry, WorkspaceEventPayload, WorkspaceEventType } diff --git a/packages/ui/src/lib/client-identity.ts b/packages/ui/src/lib/client-identity.ts new file mode 100644 index 00000000..75ed545e --- /dev/null +++ b/packages/ui/src/lib/client-identity.ts @@ -0,0 +1,58 @@ +const CLIENT_ID_STORAGE_KEY = "codenomad.client-id" +const CONNECTION_ID_STORAGE_KEY = "codenomad.connection-id" + +let cachedClientId: string | null = null +let cachedConnectionId: string | null = null + +export function getClientIdentity(): { clientId: string; connectionId: string } { + return { + clientId: getOrCreateClientId(), + connectionId: getOrCreateConnectionId(), + } +} + +function getOrCreateClientId(): string { + if (cachedClientId) return cachedClientId + cachedClientId = getOrCreateStoredValue(CLIENT_ID_STORAGE_KEY, window.localStorage) + return cachedClientId +} + +function getOrCreateConnectionId(): string { + if (cachedConnectionId) return cachedConnectionId + cachedConnectionId = getOrCreateStoredValue(CONNECTION_ID_STORAGE_KEY, window.sessionStorage) + return cachedConnectionId +} + +function getOrCreateStoredValue(key: string, storage: Storage): string { + if (typeof window === "undefined") { + return generateUUID() + } + + try { + const existing = storage.getItem(key) + if (existing && existing.trim()) { + return existing.trim() + } + } catch { + return generateUUID() + } + + const next = generateUUID() + try { + storage.setItem(key, next) + } catch { + // Ignore storage failures and fall back to the in-memory value. + } + return next +} + +function generateUUID(): string { + if (typeof crypto !== "undefined" && crypto.randomUUID) { + return crypto.randomUUID() + } + return "xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx".replace(/[xy]/g, (char) => { + const random = (Math.random() * 16) | 0 + const value = char === "x" ? random : (random & 0x3) | 0x8 + return value.toString(16) + }) +} diff --git a/packages/ui/src/lib/server-events.ts b/packages/ui/src/lib/server-events.ts index ba56e408..68db4476 100644 --- a/packages/ui/src/lib/server-events.ts +++ b/packages/ui/src/lib/server-events.ts @@ -1,5 +1,6 @@ import type { WorkspaceEventPayload, WorkspaceEventType } from "../../../server/src/api-types" import { serverApi } from "./api-client" +import { getClientIdentity } from "./client-identity" import { getLogger } from "./logger" const RETRY_BASE_DELAY = 1000 @@ -16,6 +17,7 @@ function logSse(message: string, context?: Record) { class ServerEvents { private handlers = new Map void>>() + private openHandlers = new Set<() => void>() private source: EventSource | null = null private retryDelay = RETRY_BASE_DELAY @@ -28,10 +30,24 @@ class ServerEvents { this.source.close() } logSse("Connecting to backend events stream") - this.source = serverApi.connectEvents((event) => this.dispatch(event), () => this.scheduleReconnect()) + this.source = serverApi.connectEvents( + (event) => this.dispatch(event), + () => this.scheduleReconnect(), + (payload) => { + void serverApi + .sendClientConnectionPong({ + ...getClientIdentity(), + pingTs: payload.ts, + }) + .catch((error) => { + log.error("Failed to send client connection pong", error) + }) + }, + ) this.source.onopen = () => { logSse("Events stream connected") this.retryDelay = RETRY_BASE_DELAY + this.openHandlers.forEach((handler) => handler()) } } @@ -61,6 +77,11 @@ class ServerEvents { bucket.add(handler) return () => bucket.delete(handler) } + + onOpen(handler: () => void): () => void { + this.openHandlers.add(handler) + return () => this.openHandlers.delete(handler) + } } export const serverEvents = new ServerEvents() diff --git a/packages/ui/src/stores/conversation-speech.ts b/packages/ui/src/stores/conversation-speech.ts index 665b6bd6..34a76784 100644 --- a/packages/ui/src/stores/conversation-speech.ts +++ b/packages/ui/src/stores/conversation-speech.ts @@ -4,6 +4,7 @@ import { showToastNotification } from "../lib/notifications" import { serverApi } from "../lib/api-client" import { getLogger } from "../lib/logger" import { formatToMimeType, getSpeechPlaybackSupport } from "../lib/speech-playback-support" +import { serverEvents } from "../lib/server-events" import { serverSettings } from "./preferences" import { loadSpeechCapabilities, speechCapabilities } from "./speech" import { getActiveSession, sessions } from "./session-state" @@ -44,6 +45,10 @@ let currentPlayback: let queueRunner: Promise | null = null let playbackErrorShown = false +serverEvents.onOpen(() => { + void syncConversationModesToServer() +}) + function getEntryKey(instanceId: string, sessionId: string, messageId: string, partId: string): string { return `${instanceId}:${sessionId}:${messageId}:${partId}` } @@ -532,3 +537,12 @@ function extractLeadingSpokenBlock(text: string): string { if (!match?.[1]) return "" return match[1].trim() } + +async function syncConversationModesToServer(): Promise { + const updates: Promise[] = [] + for (const [instanceId, enabled] of conversationModeInstances()) { + if (!enabled) continue + updates.push(serverApi.updateVoiceMode(instanceId, true)) + } + await Promise.allSettled(updates) +}