Integrate reply-from for workspace proxy

This commit is contained in:
Shantur Rathore
2025-11-19 02:27:07 +00:00
parent 146eae5220
commit 7aa94e7a88
3 changed files with 51 additions and 74 deletions

View File

@@ -16,6 +16,7 @@
},
"dependencies": {
"@fastify/cors": "^8.5.0",
"@fastify/reply-from": "^9.8.0",
"@fastify/static": "^7.0.4",
"commander": "^12.1.0",
"fastify": "^4.28.1",

View File

@@ -1,10 +1,9 @@
import Fastify, { type FastifyInstance, type FastifyReply, type FastifyRequest } from "fastify"
import cors from "@fastify/cors"
import fastifyStatic from "@fastify/static"
import replyFrom, { type FastifyReplyFromOptions } from "@fastify/reply-from"
import fs from "fs"
import path from "path"
import { Readable } from "node:stream"
import type { ReadableStream as NodeReadableStream } from "node:stream/web"
import { fetch } from "undici"
import type { Logger } from "../logger"
import { WorkspaceManager } from "../workspaces/manager"
@@ -59,6 +58,10 @@ export function createHttpServer(deps: HttpServerDeps) {
credentials: true,
})
app.register(replyFrom, {
contentTypesToEncode: [],
})
registerWorkspaceRoutes(app, { workspaceManager: deps.workspaceManager })
registerConfigRoutes(app, { configStore: deps.configStore, binaryRegistry: deps.binaryRegistry })
registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser })
@@ -91,7 +94,7 @@ interface InstanceProxyDeps {
function registerInstanceProxyRoutes(app: FastifyInstance, deps: InstanceProxyDeps) {
app.register(async (instance) => {
instance.removeAllContentTypeParsers()
instance.addContentTypeParser("*", { parseAs: "buffer" }, (req, body, done) => done(null, body))
instance.addContentTypeParser("*", (req, body, done) => done(null, body))
const proxyBaseHandler = async (request: FastifyRequest<{ Params: { id: string } }>, reply: FastifyReply) => {
await proxyWorkspaceRequest({
@@ -122,7 +125,6 @@ function registerInstanceProxyRoutes(app: FastifyInstance, deps: InstanceProxyDe
}
const INSTANCE_PROXY_HOST = "127.0.0.1"
const METHODS_WITHOUT_BODY = new Set(["GET", "HEAD", "OPTIONS"])
async function proxyWorkspaceRequest(args: {
request: FastifyRequest
@@ -151,74 +153,14 @@ async function proxyWorkspaceRequest(args: {
const search = queryIndex >= 0 ? (request.raw.url ?? "").slice(queryIndex) : ""
const targetUrl = `http://${INSTANCE_PROXY_HOST}:${port}${normalizedSuffix}${search}`
try {
const abortController = new AbortController()
const bodyPayload = METHODS_WITHOUT_BODY.has(request.method.toUpperCase())
? undefined
: (request.body as Buffer | undefined)
const headers = buildProxyHeaders(request.headers)
if (bodyPayload && bodyPayload.byteLength > 0) {
headers["content-length"] = String(bodyPayload.byteLength)
} else {
delete headers["content-length"]
}
const response = await fetch(targetUrl, {
method: request.method,
headers,
body: bodyPayload,
signal: abortController.signal,
})
const headersToForward: Record<string, string> = {}
response.headers.forEach((value, key) => {
if (key.toLowerCase() === "content-length") {
return
return reply.from(targetUrl, {
onError: (proxyReply, { error }) => {
logger.error({ err: error, workspaceId, targetUrl }, "Failed to proxy workspace request")
if (!proxyReply.sent) {
proxyReply.code(502).send({ error: "Workspace instance proxy failed" })
}
headersToForward[key] = value
})
const contentType = (response.headers.get("content-type") ?? "").toLowerCase()
const isEventStream = contentType.includes("text/event-stream")
if (isEventStream && response.body) {
reply.hijack()
Object.entries(headersToForward).forEach(([key, value]) => reply.raw.setHeader(key, value))
reply.raw.setHeader("Cache-Control", "no-cache")
reply.raw.setHeader("Connection", "keep-alive")
reply.raw.setHeader("Content-Type", "text/event-stream")
reply.raw.writeHead(response.status)
const stream = Readable.fromWeb(response.body as NodeReadableStream)
const cleanup = () => {
stream.destroy()
abortController.abort()
}
request.raw.on("close", cleanup)
request.raw.on("error", cleanup)
stream.on("error", cleanup)
stream.pipe(reply.raw)
return
}
Object.entries(headersToForward).forEach(([key, value]) => reply.header(key, value))
reply.code(response.status)
if (request.method === "HEAD") {
reply.send()
abortController.abort()
return
}
const bodyBuffer = Buffer.from(await response.arrayBuffer())
reply.header("content-length", String(bodyBuffer.byteLength))
reply.send(bodyBuffer)
abortController.abort()
} catch (error) {
logger.error({ err: error, workspaceId, targetUrl }, "Failed to proxy workspace request")
if (!reply.sent) {
reply.code(502).send({ error: "Workspace instance proxy failed" })
}
}
},
})
}
function normalizeInstanceSuffix(pathSuffix: string | undefined) {