Add background process manager and UI panel

This commit is contained in:
Shantur Rathore
2025-12-24 00:59:41 +00:00
parent 28b66ed0af
commit 575f987b8f
11 changed files with 1446 additions and 5 deletions

View File

@@ -0,0 +1,157 @@
import { tool } from "@opencode-ai/plugin/tool"
import { getCodeNomadConfig } from "./lib/client"
type BackgroundProcess = {
id: string
title: string
command: string
status: "running" | "stopped" | "error"
startedAt: string
stoppedAt?: string
exitCode?: number
outputSizeBytes?: number
}
export async function BackgroundProcessPlugin() {
const config = getCodeNomadConfig()
const request = async <T>(path: string, init?: RequestInit): Promise<T> => {
const base = config.baseUrl.replace(/\/+$/, "")
const url = `${base}/workspaces/${config.instanceId}/plugin/background-processes${path}`
const headers = normalizeHeaders(init?.headers)
if (init?.body !== undefined) {
headers["Content-Type"] = "application/json"
}
const response = await fetch(url, {
...init,
headers,
})
if (!response.ok) {
const message = await response.text()
throw new Error(message || `Request failed with ${response.status}`)
}
if (response.status === 204) {
return undefined as T
}
return (await response.json()) as T
}
return {
tool: {
run_background_process: tool({
description: "Run a long-lived background process (dev servers, DBs, watchers) so it keeps running while you do other tasks. Use it for running processes that timeout otherwise or produce a lot of output.",
args: {
title: tool.schema.string().describe("Short label for the process (e.g. Dev server, DB server)"),
command: tool.schema.string().describe("Shell command to run in the workspace"),
},
async execute(args) {
const process = await request<BackgroundProcess>("", {
method: "POST",
body: JSON.stringify({ title: args.title, command: args.command }),
})
return `Started background process ${process.id} (${process.title})\nStatus: ${process.status}\nCommand: ${process.command}`
},
}),
list_background_processes: tool({
description: "List background processes running for this workspace.",
args: {},
async execute() {
const response = await request<{ processes: BackgroundProcess[] }>("")
if (response.processes.length === 0) {
return "No background processes running."
}
return response.processes
.map((process) => {
const status = process.status === "running" ? "running" : process.status
const exit = process.exitCode !== undefined ? ` (exit ${process.exitCode})` : ""
const size = typeof process.outputSizeBytes === "number" ? ` | ${Math.round(process.outputSizeBytes / 1024)}KB` : ""
return `- ${process.id} | ${process.title} | ${status}${exit}${size}\n ${process.command}`
})
.join("\n")
},
}),
read_background_process_output: tool({
description: "Read output from a background process. Use full, grep, head, or tail.",
args: {
id: tool.schema.string().describe("Background process ID"),
method: tool.schema
.enum(["full", "grep", "head", "tail"])
.default("full")
.describe("Method to read output"),
pattern: tool.schema.string().optional().describe("Pattern for grep method"),
lines: tool.schema.number().optional().describe("Number of lines for head/tail methods"),
},
async execute(args) {
if (args.method === "grep" && !args.pattern) {
return "Pattern is required for grep method."
}
const params = new URLSearchParams({ method: args.method })
if (args.pattern) {
params.set("pattern", args.pattern)
}
if (args.lines) {
params.set("lines", String(args.lines))
}
const response = await request<{ id: string; content: string; truncated: boolean; sizeBytes: number }>(
`/${args.id}/output?${params.toString()}`,
)
const header = response.truncated
? `Output (truncated, ${Math.round(response.sizeBytes / 1024)}KB):`
: `Output (${Math.round(response.sizeBytes / 1024)}KB):`
return `${header}\n\n${response.content}`
},
}),
stop_background_process: tool({
description: "Stop a background process (SIGTERM) but keep its output and entry.",
args: {
id: tool.schema.string().describe("Background process ID"),
},
async execute(args) {
const process = await request<BackgroundProcess>(`/${args.id}/stop`, { method: "POST" })
return `Stopped background process ${process.id} (${process.title}). Status: ${process.status}`
},
}),
terminate_background_process: tool({
description: "Terminate a background process and delete its output + entry.",
args: {
id: tool.schema.string().describe("Background process ID"),
},
async execute(args) {
await request<void>(`/${args.id}/terminate`, { method: "POST" })
return `Terminated background process ${args.id} and removed its output.`
},
}),
},
}
}
function normalizeHeaders(headers: HeadersInit | undefined): Record<string, string> {
const output: Record<string, string> = {}
if (!headers) return output
if (headers instanceof Headers) {
headers.forEach((value, key) => {
output[key] = value
})
return output
}
if (Array.isArray(headers)) {
for (const [key, value] of headers) {
output[key] = value
}
return output
}
return { ...headers }
}

View File

@@ -0,0 +1,341 @@
import { tool } from "@opencode-ai/plugin"
import { spawn, ChildProcess, exec } from "child_process"
import { promisify } from "util"
import { promises as fs } from "fs"
import { existsSync, mkdirSync } from "fs"
import { join } from "path"
import { randomBytes } from "crypto"
const execAsync = promisify(exec)
// Global registry to track running processes
const runningProcesses = new Map<string, ChildProcess>()
const SHELLS_DIR = ".opencode/backgroundShells"
const INDEX_FILE = join(SHELLS_DIR, "index.json")
interface ShellMetadata {
id: string
command: string
startTime: number
status: "running" | "finished"
exitCode?: number
workingDir: string
}
// Cleanup function called on startup
async function cleanupStaleShells() {
try {
if (existsSync(SHELLS_DIR)) {
const dirs = await fs.readdir(SHELLS_DIR)
for (const dir of dirs) {
if (dir !== "index.json") {
await fs.rm(join(SHELLS_DIR, dir), { recursive: true, force: true })
}
}
if (existsSync(INDEX_FILE)) {
await fs.unlink(INDEX_FILE)
}
}
} catch (error) {
console.error("Failed to cleanup stale shells:", error)
}
}
// Initialize cleanup on first import
cleanupStaleShells()
async function ensureShellsDir() {
if (!existsSync(SHELLS_DIR)) {
mkdirSync(SHELLS_DIR, { recursive: true })
}
}
async function loadIndex(): Promise<ShellMetadata[]> {
try {
if (existsSync(INDEX_FILE)) {
const content = await fs.readFile(INDEX_FILE, "utf-8")
return JSON.parse(content)
}
} catch (error) {
// Index file corrupted or missing, return empty array
}
return []
}
async function saveIndex(shells: ShellMetadata[]) {
await ensureShellsDir()
await fs.writeFile(INDEX_FILE, JSON.stringify(shells, null, 2))
}
function generateId(): string {
const timestamp = new Date().toISOString().replace(/[:.]/g, "").slice(0, 15)
const random = randomBytes(3).toString("hex")
return `task_${timestamp}_${random}`
}
function formatRuntime(startTime: number): string {
const elapsed = Date.now() - startTime
const seconds = Math.floor(elapsed / 1000)
const minutes = Math.floor(seconds / 60)
const hours = Math.floor(minutes / 60)
if (hours > 0) {
return `${hours}h ${minutes % 60}m ${seconds % 60}s`
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`
} else {
return `${seconds}s`
}
}
async function getShellsList(): Promise<string> {
const shells = await loadIndex()
if (shells.length === 0) {
return "Currently running Background Shells:\nNo background shells running."
}
let output = "Currently running Background Shells:\n"
for (const shell of shells) {
const runtime = shell.status === "running" ? formatRuntime(shell.startTime) : "finished"
const outputPath = join(SHELLS_DIR, shell.id, "output.txt")
let outputSize = 0
try {
const stats = await fs.stat(outputPath)
outputSize = stats.size
} catch {
// File doesn't exist yet
}
output += `- ShellId: ${shell.id}\n`
output += ` Command: ${shell.command}\n`
output += ` Status: ${shell.status === "running" ? "Running" : "Finished"}\n`
output += ` ${shell.status === "running" ? "RunningFor" : "CompletedAfter"}: ${runtime}\n`
output += ` Current output size: ${outputSize} bytes\n`
if (shell.status === "finished" && shell.exitCode !== undefined) {
output += ` Exit code: ${shell.exitCode}\n`
}
output += "\n"
}
return output.trim()
}
export const start = tool({
description: `Start a background shell command. Only use it to run supporting processes like dev servers etc. ${await getShellsList()}`,
args: {
command: tool.schema.string().describe("Bash command to execute in background"),
},
async execute(args) {
await ensureShellsDir()
const id = generateId()
const shellDir = join(SHELLS_DIR, id)
const outputFile = join(shellDir, "output.txt")
// Create directory for this shell
mkdirSync(shellDir, { recursive: true })
// Create metadata
const metadata: ShellMetadata = {
id,
command: args.command,
startTime: Date.now(),
status: "running",
workingDir: process.cwd()
}
// Start the process
const childProcess = spawn("bash", ["-c", args.command], {
detached: false, // Keep it in opencode process space
stdio: ["ignore", "pipe", "pipe"],
cwd: process.cwd()
})
// Store process reference
runningProcesses.set(id, childProcess)
// Create output stream
const outputStream = await fs.open(outputFile, "w")
// Pipe stdout and stderr to output file
childProcess.stdout?.on("data", async (data) => {
await outputStream.write(data)
})
childProcess.stderr?.on("data", async (data) => {
await outputStream.write(data)
})
// Handle process completion
childProcess.on("close", async (code) => {
await outputStream.close()
runningProcesses.delete(id)
// Update metadata
metadata.status = "finished"
metadata.exitCode = code || 0
// Update index
const shells = await loadIndex()
const index = shells.findIndex(s => s.id === id)
if (index >= 0) {
shells[index] = metadata
}
await saveIndex(shells)
})
// Add to index
const shells = await loadIndex()
shells.push(metadata)
await saveIndex(shells)
return `Started background shell with ID: ${id}\nCommand: ${args.command}\n\n${await getShellsList()}`
},
})
export const list = tool({
description: "List all background shell processes with their status",
args: {},
async execute(args) {
return await getShellsList()
},
})
export const read = tool({
description: `Read output from a background shell process. ${await getShellsList()}`,
args: {
id: tool.schema.string().describe("Background shell ID"),
method: tool.schema.enum(["full", "grep", "head", "tail"]).default("full").describe("Method to read output"),
pattern: tool.schema.string().optional().describe("Pattern for grep method"),
lines: tool.schema.number().optional().describe("Number of lines for head/tail methods"),
},
async execute(args) {
const shells = await loadIndex()
const shell = shells.find(s => s.id === args.id)
if (!shell) {
return `Background shell with ID ${args.id} not found.\n\n${await getShellsList()}`
}
const outputFile = join(SHELLS_DIR, args.id, "output.txt")
if (!existsSync(outputFile)) {
return `Output file for ${args.id} not found or not created yet.`
}
try {
// Check file size first
const stats = await fs.stat(outputFile)
const fileSizeBytes = stats.size
const fileSizeKB = (fileSizeBytes / 1024).toFixed(2)
let content: string
let command: string
switch (args.method) {
case "full":
// For full read, check size and warn if too large
if (fileSizeBytes > 102400) { // 100KB
return `Output file is too large (${fileSizeKB}KB). Please use grep, head, or tail methods to filter the content.`
}
content = await fs.readFile(outputFile, "utf-8")
break
case "grep":
if (!args.pattern) {
return "Pattern is required for grep method"
}
// Use actual grep command - escape the pattern for shell safety
const escapedPattern = args.pattern.replace(/"/g, '\\"')
command = `grep "${escapedPattern}" "${outputFile}"`
try {
const { stdout } = await execAsync(command)
content = stdout
} catch (error: any) {
// grep returns exit code 1 when no matches found
if (error.code === 1) {
content = ""
} else {
throw error
}
}
break
case "head":
const headLines = args.lines || 10
// Use actual head command
command = `head -n ${headLines} "${outputFile}"`
const headResult = await execAsync(command)
content = headResult.stdout
break
case "tail":
const tailLines = args.lines || 10
// Use actual tail command
command = `tail -n ${tailLines} "${outputFile}"`
const tailResult = await execAsync(command)
content = tailResult.stdout
break
default:
if (fileSizeBytes > 102400) { // 100KB
return `Output file is too large (${fileSizeKB}KB). Please use grep, head, or tail methods to filter the content.`
}
content = await fs.readFile(outputFile, "utf-8")
}
// Check the final content size after grep/head/tail processing
if (content.length > 102400) { // 100KB in characters (roughly)
return `Filtered output is still too large (${(content.length / 1024).toFixed(2)}KB). Try using more specific grep patterns or fewer lines for head/tail.`
}
const methodDesc = args.method === "grep" ? `grep pattern: "${args.pattern}"` :
args.method === "head" ? `head ${args.lines || 10} lines` :
args.method === "tail" ? `tail ${args.lines || 10} lines` :
"full output"
return `Output for ${args.id} (${methodDesc}) - File size: ${fileSizeKB}KB:\n\n${content}`
} catch (error) {
return `Error reading output for ${args.id}: ${error}`
}
},
})
export const kill = tool({
description: `Kill a background shell process and clean up its files. ${await getShellsList()}`,
args: {
id: tool.schema.string().describe("Background shell ID to kill"),
},
async execute(args) {
const shells = await loadIndex()
const shell = shells.find(s => s.id === args.id)
if (!shell) {
return `Background shell with ID ${args.id} not found.\n\n${await getShellsList()}`
}
// Kill the process if it's still running
const process = runningProcesses.get(args.id)
if (process) {
process.kill("SIGTERM")
runningProcesses.delete(args.id)
}
// Remove from index
const updatedShells = shells.filter(s => s.id !== args.id)
await saveIndex(updatedShells)
// Remove directory
const shellDir = join(SHELLS_DIR, args.id)
try {
await fs.rm(shellDir, { recursive: true, force: true })
} catch (error) {
// Directory might not exist, continue
}
return `Killed and cleaned up background shell ${args.id}\n\n${await getShellsList()}`
},
})

View File

@@ -219,6 +219,33 @@ export interface ServerMeta {
latestRelease?: LatestReleaseInfo
}
export type BackgroundProcessStatus = "running" | "stopped" | "error"
export interface BackgroundProcess {
id: string
workspaceId: string
title: string
command: string
cwd: string
status: BackgroundProcessStatus
pid?: number
startedAt: string
stoppedAt?: string
exitCode?: number
outputSizeBytes?: number
}
export interface BackgroundProcessListResponse {
processes: BackgroundProcess[]
}
export interface BackgroundProcessOutputResponse {
id: string
content: string
truncated: boolean
sizeBytes: number
}
export type {
Preferences,
ModelPreference,

View File

@@ -0,0 +1,437 @@
import { spawn, type ChildProcess } from "child_process"
import { createWriteStream, existsSync, promises as fs } from "fs"
import path from "path"
import { randomBytes } from "crypto"
import type { EventBus } from "../events/bus"
import type { WorkspaceManager } from "../workspaces/manager"
import type { Logger } from "../logger"
import type { BackgroundProcess, BackgroundProcessStatus } from "../api-types"
const ROOT_DIR = ".codenomad/background_processes"
const INDEX_FILE = "index.json"
const OUTPUT_FILE = "output.txt"
const STOP_TIMEOUT_MS = 2000
const MAX_OUTPUT_BYTES = 20 * 1024
const OUTPUT_PUBLISH_INTERVAL_MS = 1000
interface ManagerDeps {
workspaceManager: WorkspaceManager
eventBus: EventBus
logger: Logger
}
interface RunningProcess {
child: ChildProcess
outputPath: string
exitPromise: Promise<void>
workspaceId: string
}
export class BackgroundProcessManager {
private readonly running = new Map<string, RunningProcess>()
constructor(private readonly deps: ManagerDeps) {
this.deps.eventBus.on("workspace.stopped", (event) => this.cleanupWorkspace(event.workspaceId))
this.deps.eventBus.on("workspace.error", (event) => this.cleanupWorkspace(event.workspace.id))
}
async list(workspaceId: string): Promise<BackgroundProcess[]> {
const records = await this.readIndex(workspaceId)
const enriched = await Promise.all(
records.map(async (record) => ({
...record,
outputSizeBytes: await this.getOutputSize(workspaceId, record.id),
})),
)
return enriched
}
async start(workspaceId: string, title: string, command: string): Promise<BackgroundProcess> {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
throw new Error("Workspace not found")
}
const id = this.generateId()
const processDir = await this.ensureProcessDir(workspaceId, id)
const outputPath = path.join(processDir, OUTPUT_FILE)
const outputStream = createWriteStream(outputPath, { flags: "a" })
const child = spawn("bash", ["-c", command], {
cwd: workspace.path,
stdio: ["ignore", "pipe", "pipe"],
})
const record: BackgroundProcess = {
id,
workspaceId,
title,
command,
cwd: workspace.path,
status: "running",
pid: child.pid,
startedAt: new Date().toISOString(),
outputSizeBytes: 0,
}
const exitPromise = new Promise<void>((resolve) => {
child.on("close", async (code) => {
await new Promise<void>((resolve) => outputStream.end(resolve))
this.running.delete(id)
record.status = this.statusFromExit(code)
record.exitCode = code === null ? undefined : code
record.stoppedAt = new Date().toISOString()
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
resolve()
})
})
this.running.set(id, { child, outputPath, exitPromise, workspaceId })
let lastPublishAt = 0
const maybePublishSize = () => {
const now = Date.now()
if (now - lastPublishAt < OUTPUT_PUBLISH_INTERVAL_MS) {
return
}
lastPublishAt = now
this.publishUpdate(workspaceId, record)
}
child.stdout?.on("data", (data) => {
outputStream.write(data)
record.outputSizeBytes = (record.outputSizeBytes ?? 0) + data.length
maybePublishSize()
})
child.stderr?.on("data", (data) => {
outputStream.write(data)
record.outputSizeBytes = (record.outputSizeBytes ?? 0) + data.length
maybePublishSize()
})
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
return record
}
async stop(workspaceId: string, processId: string): Promise<BackgroundProcess | null> {
const record = await this.findProcess(workspaceId, processId)
if (!record) {
return null
}
const running = this.running.get(processId)
if (running?.child && !running.child.killed) {
running.child.kill("SIGTERM")
await this.waitForExit(running)
}
if (record.status === "running") {
record.status = "stopped"
record.stoppedAt = new Date().toISOString()
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
}
return record
}
async terminate(workspaceId: string, processId: string): Promise<void> {
const record = await this.findProcess(workspaceId, processId)
if (!record) return
const running = this.running.get(processId)
if (running?.child && !running.child.killed) {
running.child.kill("SIGTERM")
await this.waitForExit(running)
}
await this.removeFromIndex(workspaceId, processId)
await this.removeProcessDir(workspaceId, processId)
this.deps.eventBus.publish({
type: "instance.event",
instanceId: workspaceId,
event: { type: "background.process.removed", properties: { processId } },
})
}
async readOutput(
workspaceId: string,
processId: string,
options: { method?: "full" | "tail" | "head" | "grep"; pattern?: string; lines?: number },
) {
const outputPath = this.getOutputPath(workspaceId, processId)
if (!existsSync(outputPath)) {
return { id: processId, content: "", truncated: false, sizeBytes: 0 }
}
const stats = await fs.stat(outputPath)
const sizeBytes = stats.size
const method = options.method ?? "full"
const lineCount = options.lines ?? 10
const raw = await this.readOutputBytes(outputPath, sizeBytes)
let content = raw
switch (method) {
case "head":
content = this.headLines(raw, lineCount)
break
case "tail":
content = this.tailLines(raw, lineCount)
break
case "grep":
if (!options.pattern) {
throw new Error("Pattern is required for grep output")
}
content = this.grepLines(raw, options.pattern)
break
default:
content = raw
}
return {
id: processId,
content,
truncated: sizeBytes > MAX_OUTPUT_BYTES,
sizeBytes,
}
}
async streamOutput(workspaceId: string, processId: string, reply: any) {
const outputPath = this.getOutputPath(workspaceId, processId)
if (!existsSync(outputPath)) {
reply.code(404).send({ error: "Output not found" })
return
}
reply.raw.setHeader("Content-Type", "text/event-stream")
reply.raw.setHeader("Cache-Control", "no-cache")
reply.raw.setHeader("Connection", "keep-alive")
reply.raw.flushHeaders?.()
reply.hijack()
const file = await fs.open(outputPath, "r")
let position = (await file.stat()).size
const tick = async () => {
const stats = await file.stat()
if (stats.size <= position) return
const length = stats.size - position
const buffer = Buffer.alloc(length)
await file.read(buffer, 0, length, position)
position = stats.size
const content = buffer.toString("utf-8")
reply.raw.write(`data: ${JSON.stringify({ type: "chunk", content })}\n\n`)
}
const interval = setInterval(() => {
tick().catch((error) => {
this.deps.logger.warn({ err: error }, "Failed to stream background process output")
})
}, 1000)
const close = () => {
clearInterval(interval)
file.close().catch(() => undefined)
reply.raw.end?.()
}
reply.raw.on("close", close)
reply.raw.on("error", close)
}
private async cleanupWorkspace(workspaceId: string) {
for (const [, running] of this.running.entries()) {
if (running.workspaceId !== workspaceId) continue
running.child.kill("SIGTERM")
await this.waitForExit(running)
}
await this.removeWorkspaceDir(workspaceId)
}
private async waitForExit(running: RunningProcess) {
let resolved = false
const timeout = setTimeout(() => {
if (!resolved) {
running.child.kill("SIGKILL")
}
}, STOP_TIMEOUT_MS)
await running.exitPromise.finally(() => {
resolved = true
clearTimeout(timeout)
})
}
private statusFromExit(code: number | null): BackgroundProcessStatus {
if (code === null) return "stopped"
if (code === 0) return "stopped"
return "error"
}
private async readOutputBytes(outputPath: string, sizeBytes: number): Promise<string> {
if (sizeBytes <= MAX_OUTPUT_BYTES) {
return await fs.readFile(outputPath, "utf-8")
}
const start = Math.max(0, sizeBytes - MAX_OUTPUT_BYTES)
const file = await fs.open(outputPath, "r")
const buffer = Buffer.alloc(sizeBytes - start)
await file.read(buffer, 0, buffer.length, start)
await file.close()
return buffer.toString("utf-8")
}
private headLines(input: string, lines: number): string {
const parts = input.split(/\r?\n/)
return parts.slice(0, Math.max(0, lines)).join("\n")
}
private tailLines(input: string, lines: number): string {
const parts = input.split(/\r?\n/)
return parts.slice(Math.max(0, parts.length - lines)).join("\n")
}
private grepLines(input: string, pattern: string): string {
let matcher: RegExp
try {
matcher = new RegExp(pattern)
} catch {
throw new Error("Invalid grep pattern")
}
return input
.split(/\r?\n/)
.filter((line) => matcher.test(line))
.join("\n")
}
private async ensureProcessDir(workspaceId: string, processId: string) {
const root = await this.ensureWorkspaceDir(workspaceId)
const processDir = path.join(root, processId)
await fs.mkdir(processDir, { recursive: true })
return processDir
}
private async ensureWorkspaceDir(workspaceId: string) {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
throw new Error("Workspace not found")
}
const root = path.join(workspace.path, ROOT_DIR, workspaceId)
await fs.mkdir(root, { recursive: true })
return root
}
private getOutputPath(workspaceId: string, processId: string) {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
throw new Error("Workspace not found")
}
return path.join(workspace.path, ROOT_DIR, workspaceId, processId, OUTPUT_FILE)
}
private async findProcess(workspaceId: string, processId: string): Promise<BackgroundProcess | null> {
const records = await this.readIndex(workspaceId)
return records.find((entry) => entry.id === processId) ?? null
}
private async readIndex(workspaceId: string): Promise<BackgroundProcess[]> {
const indexPath = await this.getIndexPath(workspaceId)
if (!existsSync(indexPath)) return []
try {
const raw = await fs.readFile(indexPath, "utf-8")
const parsed = JSON.parse(raw)
return Array.isArray(parsed) ? (parsed as BackgroundProcess[]) : []
} catch {
return []
}
}
private async upsertIndex(workspaceId: string, record: BackgroundProcess) {
const records = await this.readIndex(workspaceId)
const index = records.findIndex((entry) => entry.id === record.id)
if (index >= 0) {
records[index] = record
} else {
records.push(record)
}
await this.writeIndex(workspaceId, records)
}
private async removeFromIndex(workspaceId: string, processId: string) {
const records = await this.readIndex(workspaceId)
const next = records.filter((entry) => entry.id !== processId)
await this.writeIndex(workspaceId, next)
}
private async writeIndex(workspaceId: string, records: BackgroundProcess[]) {
const indexPath = await this.getIndexPath(workspaceId)
await fs.mkdir(path.dirname(indexPath), { recursive: true })
await fs.writeFile(indexPath, JSON.stringify(records, null, 2))
}
private async getIndexPath(workspaceId: string) {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
throw new Error("Workspace not found")
}
return path.join(workspace.path, ROOT_DIR, workspaceId, INDEX_FILE)
}
private async removeProcessDir(workspaceId: string, processId: string) {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
return
}
const processDir = path.join(workspace.path, ROOT_DIR, workspaceId, processId)
await fs.rm(processDir, { recursive: true, force: true })
}
private async removeWorkspaceDir(workspaceId: string) {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
return
}
const workspaceDir = path.join(workspace.path, ROOT_DIR, workspaceId)
await fs.rm(workspaceDir, { recursive: true, force: true })
}
private async getOutputSize(workspaceId: string, processId: string): Promise<number> {
const outputPath = this.getOutputPath(workspaceId, processId)
if (!existsSync(outputPath)) {
return 0
}
try {
const stats = await fs.stat(outputPath)
return stats.size
} catch {
return 0
}
}
private publishUpdate(workspaceId: string, record: BackgroundProcess) {
this.deps.eventBus.publish({
type: "instance.event",
instanceId: workspaceId,
event: { type: "background.process.updated", properties: { process: record } },
})
}
private generateId(): string {
const timestamp = new Date().toISOString().replace(/[:.]/g, "").slice(0, 15)
const random = randomBytes(3).toString("hex")
return `proc_${timestamp}_${random}`
}
}

View File

@@ -19,8 +19,10 @@ import { registerMetaRoutes } from "./routes/meta"
import { registerEventRoutes } from "./routes/events"
import { registerStorageRoutes } from "./routes/storage"
import { registerPluginRoutes } from "./routes/plugin"
import { registerBackgroundProcessRoutes } from "./routes/background-processes"
import { ServerMeta } from "../api-types"
import { InstanceStore } from "../storage/instance-store"
import { BackgroundProcessManager } from "../background-processes/manager"
interface HttpServerDeps {
host: string
@@ -101,6 +103,12 @@ export function createHttpServer(deps: HttpServerDeps) {
},
})
const backgroundProcessManager = new BackgroundProcessManager({
workspaceManager: deps.workspaceManager,
eventBus: deps.eventBus,
logger: deps.logger.child({ component: "background-processes" }),
})
registerWorkspaceRoutes(app, { workspaceManager: deps.workspaceManager })
registerConfigRoutes(app, { configStore: deps.configStore, binaryRegistry: deps.binaryRegistry })
registerFilesystemRoutes(app, { fileSystemBrowser: deps.fileSystemBrowser })
@@ -112,6 +120,7 @@ export function createHttpServer(deps: HttpServerDeps) {
workspaceManager: deps.workspaceManager,
})
registerPluginRoutes(app, { workspaceManager: deps.workspaceManager, eventBus: deps.eventBus, logger: proxyLogger })
registerBackgroundProcessRoutes(app, { backgroundProcessManager })
registerInstanceProxyRoutes(app, { workspaceManager: deps.workspaceManager, logger: proxyLogger })

View File

@@ -0,0 +1,83 @@
import { FastifyInstance } from "fastify"
import { z } from "zod"
import type { BackgroundProcessManager } from "../../background-processes/manager"
interface RouteDeps {
backgroundProcessManager: BackgroundProcessManager
}
const StartSchema = z.object({
title: z.string().trim().min(1),
command: z.string().trim().min(1),
})
const OutputQuerySchema = z.object({
method: z.enum(["full", "tail", "head", "grep"]).optional(),
mode: z.enum(["full", "tail", "head", "grep"]).optional(),
pattern: z.string().optional(),
lines: z.coerce.number().int().positive().max(2000).optional(),
})
export function registerBackgroundProcessRoutes(app: FastifyInstance, deps: RouteDeps) {
app.get<{ Params: { id: string } }>("/workspaces/:id/plugin/background-processes", async (request) => {
const processes = await deps.backgroundProcessManager.list(request.params.id)
return { processes }
})
app.post<{ Params: { id: string } }>("/workspaces/:id/plugin/background-processes", async (request, reply) => {
const payload = StartSchema.parse(request.body ?? {})
const process = await deps.backgroundProcessManager.start(request.params.id, payload.title, payload.command)
reply.code(201)
return process
})
app.post<{ Params: { id: string; processId: string } }>(
"/workspaces/:id/plugin/background-processes/:processId/stop",
async (request, reply) => {
const process = await deps.backgroundProcessManager.stop(request.params.id, request.params.processId)
if (!process) {
reply.code(404)
return { error: "Process not found" }
}
return process
},
)
app.post<{ Params: { id: string; processId: string } }>(
"/workspaces/:id/plugin/background-processes/:processId/terminate",
async (request, reply) => {
await deps.backgroundProcessManager.terminate(request.params.id, request.params.processId)
reply.code(204)
return undefined
},
)
app.get<{ Params: { id: string; processId: string } }>(
"/workspaces/:id/plugin/background-processes/:processId/output",
async (request, reply) => {
const query = OutputQuerySchema.parse(request.query ?? {})
const method = query.method ?? query.mode
if (method === "grep" && !query.pattern) {
reply.code(400)
return { error: "Pattern is required for grep output" }
}
try {
return await deps.backgroundProcessManager.readOutput(request.params.id, request.params.processId, {
method,
pattern: query.pattern,
lines: query.lines,
})
} catch (error) {
reply.code(400)
return { error: error instanceof Error ? error.message : "Invalid output request" }
}
},
)
app.get<{ Params: { id: string; processId: string } }>(
"/workspaces/:id/plugin/background-processes/:processId/stream",
async (request, reply) => {
await deps.backgroundProcessManager.streamOutput(request.params.id, request.params.processId, reply)
},
)
}

View File

@@ -0,0 +1,101 @@
import { Dialog } from "@kobalte/core/dialog"
import { Show, createEffect, createSignal, onCleanup } from "solid-js"
import type { BackgroundProcess } from "../../../server/src/api-types"
import { buildBackgroundProcessStreamUrl, serverApi } from "../lib/api-client"
interface BackgroundProcessOutputDialogProps {
open: boolean
instanceId: string
process: BackgroundProcess | null
onClose: () => void
}
export function BackgroundProcessOutputDialog(props: BackgroundProcessOutputDialogProps) {
const [output, setOutput] = createSignal("")
const [truncated, setTruncated] = createSignal(false)
const [loading, setLoading] = createSignal(false)
createEffect(() => {
const process = props.process
if (!props.open || !process) {
return
}
let eventSource: EventSource | null = null
let active = true
setLoading(true)
serverApi
.fetchBackgroundProcessOutput(props.instanceId, process.id, { method: "full" })
.then((response) => {
if (!active) return
setOutput(response.content)
setTruncated(response.truncated)
})
.catch(() => {
if (!active) return
setOutput("Failed to load output.")
})
.finally(() => {
if (!active) return
setLoading(false)
})
eventSource = new EventSource(buildBackgroundProcessStreamUrl(props.instanceId, process.id))
eventSource.onmessage = (event) => {
try {
const payload = JSON.parse(event.data) as { type?: string; content?: string }
if (payload?.type === "chunk" && typeof payload.content === "string") {
setOutput((prev) => `${prev}${payload.content}`)
}
} catch {
// ignore parse errors
}
}
onCleanup(() => {
active = false
eventSource?.close()
})
})
return (
<Dialog open={props.open} onOpenChange={(open) => !open && props.onClose()} modal>
<Dialog.Portal>
<Dialog.Overlay class="modal-overlay" />
<div class="fixed inset-0 z-50 flex items-center justify-center p-4">
<Dialog.Content class="modal-surface w-full max-w-5xl max-h-[90vh] flex flex-col overflow-hidden">
<div class="flex items-center justify-between px-6 py-4 border-b border-base">
<div class="flex flex-col">
<Dialog.Title class="text-lg font-semibold text-primary">Background Output</Dialog.Title>
<Show when={props.process}>
<span class="text-xs text-secondary">
{props.process?.title} · {props.process?.id}
</span>
<span class="text-xs text-secondary mt-1 break-words">{props.process?.command}</span>
</Show>
</div>
<button type="button" class="button-tertiary" onClick={props.onClose}>
Close
</button>
</div>
<div class="flex-1 overflow-auto p-6">
<Show when={loading()}>
<p class="text-xs text-secondary">Loading output...</p>
</Show>
<Show when={!loading()}>
<Show when={truncated()}>
<p class="text-xs text-secondary mb-2">Output truncated for display.</p>
</Show>
<pre class="text-xs whitespace-pre-wrap break-words text-primary bg-surface-secondary border border-base rounded-md p-4">
{output()}
</pre>
</Show>
</div>
</Dialog.Content>
</div>
</Dialog.Portal>
</Dialog>
)
}

View File

@@ -12,7 +12,7 @@ import {
} from "solid-js"
import type { ToolState } from "@opencode-ai/sdk"
import { Accordion } from "@kobalte/core"
import { ChevronDown } from "lucide-solid"
import { ChevronDown, TerminalSquare, Trash2, XOctagon } from "lucide-solid"
import AppBar from "@suid/material/AppBar"
import Box from "@suid/material/Box"
import Divider from "@suid/material/Divider"
@@ -28,6 +28,7 @@ import PushPinIcon from "@suid/icons-material/PushPin"
import PushPinOutlinedIcon from "@suid/icons-material/PushPinOutlined"
import type { Instance } from "../../types/instance"
import type { Command } from "../../lib/commands"
import type { BackgroundProcess } from "../../../../server/src/api-types"
import {
activeParentSessionId,
activeSessionId as activeSessionMap,
@@ -56,6 +57,9 @@ import SessionView from "../session/session-view"
import { formatTokenTotal } from "../../lib/formatters"
import { sseManager } from "../../lib/sse-manager"
import { getLogger } from "../../lib/logger"
import { serverApi } from "../../lib/api-client"
import { getBackgroundProcesses, loadBackgroundProcesses } from "../../stores/background-processes"
import { BackgroundProcessOutputDialog } from "../background-process-output-dialog"
import {
SESSION_SIDEBAR_EVENT,
type SessionSidebarRequestAction,
@@ -128,7 +132,15 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
const [activeResizeSide, setActiveResizeSide] = createSignal<"left" | "right" | null>(null)
const [resizeStartX, setResizeStartX] = createSignal(0)
const [resizeStartWidth, setResizeStartWidth] = createSignal(0)
const [rightPanelExpandedItems, setRightPanelExpandedItems] = createSignal<string[]>(["plan", "mcp", "lsp", "plugins"])
const [rightPanelExpandedItems, setRightPanelExpandedItems] = createSignal<string[]>([
"plan",
"background-processes",
"mcp",
"lsp",
"plugins",
])
const [selectedBackgroundProcess, setSelectedBackgroundProcess] = createSignal<BackgroundProcess | null>(null)
const [showBackgroundOutput, setShowBackgroundOutput] = createSignal(false)
const messageStore = createMemo(() => messageStoreBus.getOrCreate(props.instance.id))
@@ -152,6 +164,13 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
persistPinState(side, value)
}
createEffect(() => {
const instanceId = props.instance.id
loadBackgroundProcesses(instanceId).catch((error) => {
log.warn("Failed to load background processes", error)
})
})
createEffect(() => {
switch (layoutMode()) {
case "desktop": {
@@ -314,6 +333,8 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
return state
})
const backgroundProcessList = createMemo(() => getBackgroundProcesses(props.instance.id))
const connectionStatus = () => sseManager.getStatus(props.instance.id)
const connectionStatusClass = () => {
const status = connectionStatus()
@@ -326,6 +347,32 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
showCommandPalette(props.instance.id)
}
const openBackgroundOutput = (process: BackgroundProcess) => {
setSelectedBackgroundProcess(process)
setShowBackgroundOutput(true)
}
const closeBackgroundOutput = () => {
setShowBackgroundOutput(false)
setSelectedBackgroundProcess(null)
}
const stopBackgroundProcess = async (processId: string) => {
try {
await serverApi.stopBackgroundProcess(props.instance.id, processId)
} catch (error) {
log.warn("Failed to stop background process", error)
}
}
const terminateBackgroundProcess = async (processId: string) => {
try {
await serverApi.terminateBackgroundProcess(props.instance.id, processId)
} catch (error) {
log.warn("Failed to terminate background process", error)
}
}
const customCommands = createMemo(() => buildCustomCommandEntries(props.instance.id, getInstanceCommands(props.instance.id)))
const instancePaletteCommands = createMemo(() => [...props.paletteCommands(), ...customCommands()])
@@ -853,12 +900,74 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
return <TodoListView state={todoState} emptyLabel="Nothing planned yet." showStatusLabel={false} />
}
const renderBackgroundProcesses = () => {
const processes = backgroundProcessList()
if (processes.length === 0) {
return <p class="text-xs text-secondary">No background processes.</p>
}
return (
<div class="flex flex-col gap-2">
<For each={processes}>
{(process) => (
<div class="rounded-md border border-base bg-surface-secondary p-2 flex flex-col gap-2">
<div class="flex flex-col gap-1">
<span class="text-xs font-semibold text-primary">{process.title}</span>
<div class="flex flex-wrap gap-2 text-[11px] text-secondary">
<span>Status: {process.status}</span>
<Show when={typeof process.outputSizeBytes === "number"}>
<span>Output: {Math.round((process.outputSizeBytes ?? 0) / 1024)}KB</span>
</Show>
</div>
</div>
<div class="grid grid-cols-3 gap-2">
<button
type="button"
class="button-tertiary w-full p-1 inline-flex items-center justify-center"
onClick={() => openBackgroundOutput(process)}
aria-label="Output"
title="Output"
>
<TerminalSquare class="h-4 w-4" />
</button>
<button
type="button"
class="button-tertiary w-full p-1 inline-flex items-center justify-center"
disabled={process.status !== "running"}
onClick={() => stopBackgroundProcess(process.id)}
aria-label="Stop"
title="Stop"
>
<XOctagon class="h-4 w-4" />
</button>
<button
type="button"
class="button-tertiary w-full p-1 inline-flex items-center justify-center"
onClick={() => terminateBackgroundProcess(process.id)}
aria-label="Terminate"
title="Terminate"
>
<Trash2 class="h-4 w-4" />
</button>
</div>
</div>
)}
</For>
</div>
)
}
const sections = [
{
id: "plan",
label: "Plan",
render: renderPlanSectionContent,
},
{
id: "background-processes",
label: "Background Shells",
render: renderBackgroundProcesses,
},
{
id: "mcp",
label: "MCP Servers",
@@ -1313,6 +1422,13 @@ const InstanceShell2: Component<InstanceShellProps> = (props) => {
commands={instancePaletteCommands()}
onExecute={props.onExecuteCommand}
/>
<BackgroundProcessOutputDialog
open={showBackgroundOutput()}
instanceId={props.instance.id}
process={selectedBackgroundProcess()}
onClose={closeBackgroundOutput}
/>
</>
)
}

View File

@@ -1,5 +1,8 @@
import type {
AppConfig,
BackgroundProcess,
BackgroundProcessListResponse,
BackgroundProcessOutputResponse,
BinaryCreateRequest,
BinaryListResponse,
BinaryUpdateRequest,
@@ -28,6 +31,12 @@ const EVENTS_URL = buildEventsUrl(API_BASE, DEFAULT_EVENTS_PATH)
export const CODENOMAD_API_BASE = API_BASE
export function buildBackgroundProcessStreamUrl(instanceId: string, processId: string): string {
const encodedInstanceId = encodeURIComponent(instanceId)
const encodedProcessId = encodeURIComponent(processId)
return buildAbsoluteUrl(`/workspaces/${encodedInstanceId}/plugin/background-processes/${encodedProcessId}/stream`)
}
function buildEventsUrl(base: string | undefined, path: string): string {
if (path.startsWith("http://") || path.startsWith("https://")) {
return path
@@ -39,9 +48,41 @@ function buildEventsUrl(base: string | undefined, path: string): string {
return path
}
function buildAbsoluteUrl(path: string): string {
if (path.startsWith("http://") || path.startsWith("https://")) {
return path
}
if (!API_BASE) {
return path
}
const normalized = path.startsWith("/") ? path : `/${path}`
return `${API_BASE}${normalized}`
}
const httpLogger = getLogger("api")
const sseLogger = getLogger("sse")
function normalizeHeaders(headers: HeadersInit | undefined): Record<string, string> {
const output: Record<string, string> = {}
if (!headers) return output
if (headers instanceof Headers) {
headers.forEach((value, key) => {
output[key] = value
})
return output
}
if (Array.isArray(headers)) {
for (const [key, value] of headers) {
output[key] = value
}
return output
}
return { ...headers }
}
function logHttp(message: string, context?: Record<string, unknown>) {
if (context) {
httpLogger.info(message, context)
@@ -52,9 +93,9 @@ function logHttp(message: string, context?: Record<string, unknown>) {
async function request<T>(path: string, init?: RequestInit): Promise<T> {
const url = API_BASE ? new URL(path, API_BASE).toString() : path
const headers: HeadersInit = {
"Content-Type": "application/json",
...(init?.headers ?? {}),
const headers = normalizeHeaders(init?.headers)
if (init?.body !== undefined) {
headers["Content-Type"] = "application/json"
}
const method = (init?.method ?? "GET").toUpperCase()
@@ -186,6 +227,44 @@ export const serverApi = {
deleteInstanceData(id: string): Promise<void> {
return request(`/api/storage/instances/${encodeURIComponent(id)}`, { method: "DELETE" })
},
listBackgroundProcesses(instanceId: string): Promise<BackgroundProcessListResponse> {
return request<BackgroundProcessListResponse>(
`/workspaces/${encodeURIComponent(instanceId)}/plugin/background-processes`,
)
},
stopBackgroundProcess(instanceId: string, processId: string): Promise<BackgroundProcess> {
return request<BackgroundProcess>(
`/workspaces/${encodeURIComponent(instanceId)}/plugin/background-processes/${encodeURIComponent(processId)}/stop`,
{ method: "POST" },
)
},
terminateBackgroundProcess(instanceId: string, processId: string): Promise<void> {
return request(
`/workspaces/${encodeURIComponent(instanceId)}/plugin/background-processes/${encodeURIComponent(processId)}/terminate`,
{ method: "POST" },
)
},
fetchBackgroundProcessOutput(
instanceId: string,
processId: string,
options?: { method?: "full" | "tail" | "head" | "grep"; pattern?: string; lines?: number },
): Promise<BackgroundProcessOutputResponse> {
const params = new URLSearchParams()
if (options?.method) {
params.set("method", options.method)
}
if (options?.pattern) {
params.set("pattern", options.pattern)
}
if (options?.lines) {
params.set("lines", String(options.lines))
}
const query = params.toString()
const suffix = query ? `?${query}` : ""
return request<BackgroundProcessOutputResponse>(
`/workspaces/${encodeURIComponent(instanceId)}/plugin/background-processes/${encodeURIComponent(processId)}/output${suffix}`,
)
},
connectEvents(onEvent: (event: WorkspaceEventPayload) => void, onError?: () => void) {
sseLogger.info(`Connecting to ${EVENTS_URL}`)
const source = new EventSource(EVENTS_URL)

View File

@@ -16,6 +16,7 @@ import type {
} from "@opencode-ai/sdk"
import { serverEvents } from "./server-events"
import type {
BackgroundProcess,
InstanceStreamEvent,
InstanceStreamStatus,
WorkspaceEventPayload,
@@ -37,6 +38,20 @@ interface TuiToastEvent {
}
}
interface BackgroundProcessUpdatedEvent {
type: "background.process.updated"
properties: {
process: BackgroundProcess
}
}
interface BackgroundProcessRemovedEvent {
type: "background.process.removed"
properties: {
processId: string
}
}
type SSEEvent =
| MessageUpdateEvent
| MessageRemovedEvent
@@ -50,6 +65,8 @@ type SSEEvent =
| EventPermissionReplied
| EventLspUpdated
| TuiToastEvent
| BackgroundProcessUpdatedEvent
| BackgroundProcessRemovedEvent
| { type: string; properties?: Record<string, unknown> }
type ConnectionStatus = InstanceStreamStatus
@@ -126,6 +143,12 @@ class SSEManager {
case "lsp.updated":
this.onLspUpdated?.(instanceId, event as EventLspUpdated)
break
case "background.process.updated":
this.onBackgroundProcessUpdated?.(instanceId, event as BackgroundProcessUpdatedEvent)
break
case "background.process.removed":
this.onBackgroundProcessRemoved?.(instanceId, event as BackgroundProcessRemovedEvent)
break
default:
log.warn("Unknown SSE event type", { type: event.type })
}
@@ -151,6 +174,8 @@ class SSEManager {
onPermissionUpdated?: (instanceId: string, event: EventPermissionUpdated) => void
onPermissionReplied?: (instanceId: string, event: EventPermissionReplied) => void
onLspUpdated?: (instanceId: string, event: EventLspUpdated) => void
onBackgroundProcessUpdated?: (instanceId: string, event: BackgroundProcessUpdatedEvent) => void
onBackgroundProcessRemoved?: (instanceId: string, event: BackgroundProcessRemovedEvent) => void
onConnectionLost?: (instanceId: string, reason: string) => void | Promise<void>
getStatus(instanceId: string): ConnectionStatus | null {

View File

@@ -0,0 +1,66 @@
import { createSignal } from "solid-js"
import type { BackgroundProcess } from "../../../server/src/api-types"
import { serverApi } from "../lib/api-client"
import { sseManager } from "../lib/sse-manager"
const [backgroundProcesses, setBackgroundProcesses] = createSignal<Map<string, BackgroundProcess[]>>(new Map())
function setProcesses(instanceId: string, processes: BackgroundProcess[]) {
setBackgroundProcesses((prev) => {
const next = new Map(prev)
next.set(instanceId, processes)
return next
})
}
function updateProcess(instanceId: string, process: BackgroundProcess) {
setBackgroundProcesses((prev) => {
const next = new Map(prev)
const current = next.get(instanceId) ?? []
const index = current.findIndex((entry) => entry.id === process.id)
const updated = index >= 0 ? [...current.slice(0, index), process, ...current.slice(index + 1)] : [...current, process]
next.set(instanceId, updated)
return next
})
}
function removeProcess(instanceId: string, processId: string) {
setBackgroundProcesses((prev) => {
const next = new Map(prev)
const current = next.get(instanceId) ?? []
next.set(
instanceId,
current.filter((entry) => entry.id !== processId),
)
return next
})
}
async function loadBackgroundProcesses(instanceId: string) {
const response = await serverApi.listBackgroundProcesses(instanceId)
setProcesses(instanceId, response.processes)
}
function getBackgroundProcesses(instanceId: string): BackgroundProcess[] {
return backgroundProcesses().get(instanceId) ?? []
}
sseManager.onBackgroundProcessUpdated = (instanceId, event) => {
const process = event.properties?.process
if (!process) return
updateProcess(instanceId, process)
}
sseManager.onBackgroundProcessRemoved = (instanceId, event) => {
const processId = event.properties?.processId
if (!processId) return
removeProcess(instanceId, processId)
}
export {
backgroundProcesses,
getBackgroundProcesses,
loadBackgroundProcesses,
removeProcess as removeBackgroundProcess,
updateProcess as updateBackgroundProcess,
}