improve server logging for sse and http
This commit is contained in:
@@ -52,9 +52,10 @@ export class ConfigStore {
|
||||
this.cache = next
|
||||
this.loaded = true
|
||||
this.persist()
|
||||
const published = Boolean(this.eventBus)
|
||||
this.eventBus?.publish({ type: "config.appChanged", config: this.cache })
|
||||
this.logger.info("Config updated")
|
||||
this.logger.debug({ config: this.cache }, "Config payload")
|
||||
this.logger.debug({ broadcast: published }, "Config SSE event emitted")
|
||||
this.logger.trace({ config: this.cache }, "Config payload")
|
||||
}
|
||||
|
||||
private persist() {
|
||||
|
||||
@@ -9,7 +9,10 @@ export class EventBus extends EventEmitter {
|
||||
|
||||
publish(event: WorkspaceEventPayload): boolean {
|
||||
if (event.type !== "instance.event" && event.type !== "instance.eventStatus") {
|
||||
this.logger?.debug({ event }, "Publishing workspace event")
|
||||
this.logger?.debug({ type: event.type }, "Publishing workspace event")
|
||||
if (this.logger?.isLevelEnabled("trace")) {
|
||||
this.logger.trace({ event }, "Workspace event payload")
|
||||
}
|
||||
}
|
||||
return super.emit(event.type, event)
|
||||
}
|
||||
|
||||
@@ -47,6 +47,8 @@ const DEFAULT_HTTP_PORT = 9898
|
||||
export function createHttpServer(deps: HttpServerDeps) {
|
||||
const app = Fastify({ logger: false })
|
||||
const proxyLogger = deps.logger.child({ component: "proxy" })
|
||||
const apiLogger = deps.logger.child({ component: "http" })
|
||||
const sseLogger = deps.logger.child({ component: "sse" })
|
||||
|
||||
const sseClients = new Set<() => void>()
|
||||
const registerSseClient = (cleanup: () => void) => {
|
||||
@@ -60,6 +62,29 @@ export function createHttpServer(deps: HttpServerDeps) {
|
||||
sseClients.clear()
|
||||
}
|
||||
|
||||
app.addHook("onRequest", (request, _reply, done) => {
|
||||
;(request as FastifyRequest & { __logMeta?: { start: bigint } }).__logMeta = {
|
||||
start: process.hrtime.bigint(),
|
||||
}
|
||||
done()
|
||||
})
|
||||
|
||||
app.addHook("onResponse", (request, reply, done) => {
|
||||
const meta = (request as FastifyRequest & { __logMeta?: { start: bigint } }).__logMeta
|
||||
const durationMs = meta ? Number((process.hrtime.bigint() - meta.start) / BigInt(1_000_000)) : undefined
|
||||
const base = {
|
||||
method: request.method,
|
||||
url: request.url,
|
||||
status: reply.statusCode,
|
||||
durationMs,
|
||||
}
|
||||
apiLogger.debug(base, "HTTP request completed")
|
||||
if (apiLogger.isLevelEnabled("trace")) {
|
||||
apiLogger.trace({ ...base, params: request.params, query: request.query, body: request.body }, "HTTP request payload")
|
||||
}
|
||||
done()
|
||||
})
|
||||
|
||||
app.register(cors, {
|
||||
origin: true,
|
||||
credentials: true,
|
||||
@@ -79,7 +104,7 @@ export function createHttpServer(deps: HttpServerDeps) {
|
||||
registerConfigRoutes(app, { configStore: deps.configStore, binaryRegistry: deps.binaryRegistry })
|
||||
registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser })
|
||||
registerMetaRoutes(app, { serverMeta: deps.serverMeta })
|
||||
registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient })
|
||||
registerEventRoutes(app, { eventBus: deps.eventBus, registerClient: registerSseClient, logger: sseLogger })
|
||||
registerStorageRoutes(app, {
|
||||
instanceStore: deps.instanceStore,
|
||||
eventBus: deps.eventBus,
|
||||
@@ -225,6 +250,11 @@ async function proxyWorkspaceRequest(args: {
|
||||
const search = queryIndex >= 0 ? (request.raw.url ?? "").slice(queryIndex) : ""
|
||||
const targetUrl = `http://${INSTANCE_PROXY_HOST}:${port}${normalizedSuffix}${search}`
|
||||
|
||||
logger.debug({ workspaceId, method: request.method, targetUrl }, "Proxying request to instance")
|
||||
if (logger.isLevelEnabled("trace")) {
|
||||
logger.trace({ workspaceId, targetUrl, body: request.body }, "Instance proxy payload")
|
||||
}
|
||||
|
||||
return reply.from(targetUrl, {
|
||||
onError: (proxyReply, { error }) => {
|
||||
logger.error({ err: error, workspaceId, targetUrl }, "Failed to proxy workspace request")
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
import { FastifyInstance } from "fastify"
|
||||
import { EventBus } from "../../events/bus"
|
||||
import { WorkspaceEventPayload } from "../../api-types"
|
||||
import { Logger } from "../../logger"
|
||||
|
||||
interface RouteDeps {
|
||||
eventBus: EventBus
|
||||
registerClient: (cleanup: () => void) => () => void
|
||||
logger: Logger
|
||||
}
|
||||
|
||||
let nextClientId = 0
|
||||
|
||||
export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) {
|
||||
app.get("/api/events", (request, reply) => {
|
||||
const clientId = ++nextClientId
|
||||
deps.logger.debug({ clientId }, "SSE client connected")
|
||||
|
||||
const origin = request.headers.origin ?? "*"
|
||||
reply.raw.setHeader("Access-Control-Allow-Origin", origin)
|
||||
reply.raw.setHeader("Access-Control-Allow-Credentials", "true")
|
||||
@@ -19,6 +26,10 @@ export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) {
|
||||
reply.hijack()
|
||||
|
||||
const send = (event: WorkspaceEventPayload) => {
|
||||
deps.logger.debug({ clientId, type: event.type }, "SSE event dispatched")
|
||||
if (deps.logger.isLevelEnabled("trace")) {
|
||||
deps.logger.trace({ clientId, event }, "SSE event payload")
|
||||
}
|
||||
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`)
|
||||
}
|
||||
|
||||
@@ -34,6 +45,7 @@ export function registerEventRoutes(app: FastifyInstance, deps: RouteDeps) {
|
||||
clearInterval(heartbeat)
|
||||
unsubscribe()
|
||||
reply.raw.end?.()
|
||||
deps.logger.debug({ clientId }, "SSE client disconnected")
|
||||
}
|
||||
|
||||
const unregister = deps.registerClient(close)
|
||||
|
||||
@@ -159,6 +159,10 @@ export class InstanceEventBridge {
|
||||
|
||||
try {
|
||||
const event = JSON.parse(payload) as InstanceStreamEvent
|
||||
this.options.logger.debug({ workspaceId, eventType: event.type }, "Instance SSE event received")
|
||||
if (this.options.logger.isLevelEnabled("trace")) {
|
||||
this.options.logger.trace({ workspaceId, event }, "Instance SSE event payload")
|
||||
}
|
||||
this.options.eventBus.publish({ type: "instance.event", instanceId: workspaceId, event })
|
||||
} catch (error) {
|
||||
this.options.logger.warn({ workspaceId, chunk: payload, err: error }, "Failed to parse instance SSE payload")
|
||||
@@ -166,6 +170,7 @@ export class InstanceEventBridge {
|
||||
}
|
||||
|
||||
private publishStatus(instanceId: string, status: InstanceStreamStatus, reason?: string) {
|
||||
this.options.logger.debug({ instanceId, status, reason }, "Instance SSE status updated")
|
||||
this.options.eventBus.publish({ type: "instance.eventStatus", instanceId, status, reason })
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user