feat(background-process): notify sessions when tasks end

Send synthetic session notifications when background processes finish, fail, stop, or terminate so the originating agent can react without polling. Hide synthetic text-only prompts from the UI stream so operational notifications stay out of the visible transcript.
This commit is contained in:
Shantur Rathore
2026-04-08 15:48:50 +01:00
parent 403a3ff189
commit 1130066a33
10 changed files with 310 additions and 63 deletions

View File

@@ -5,7 +5,7 @@ import { randomBytes } from "crypto"
import type { EventBus } from "../events/bus"
import type { WorkspaceManager } from "../workspaces/manager"
import type { Logger } from "../logger"
import type { BackgroundProcess, BackgroundProcessStatus } from "../api-types"
import type { BackgroundProcess, BackgroundProcessStatus, BackgroundProcessTerminalReason } from "../api-types"
const ROOT_DIR = ".codenomad/background_processes"
const INDEX_FILE = "index.json"
@@ -27,6 +27,31 @@ interface RunningProcess {
outputPath: string
exitPromise: Promise<void>
workspaceId: string
completion?: ProcessCompletion
}
interface ProcessCompletion {
reason: BackgroundProcessTerminalReason
endContext: "normal" | "workspace_cleanup"
removeAfterFinalize?: boolean
}
interface BackgroundProcessNotificationState {
sessionID: string
directory: string
sentAt?: string
}
interface PersistedBackgroundProcess extends BackgroundProcess {
notify?: BackgroundProcessNotificationState
}
interface StartOptions {
notify?: boolean
notification?: {
sessionID: string
directory: string
}
}
export class BackgroundProcessManager {
@@ -41,14 +66,14 @@ export class BackgroundProcessManager {
const records = await this.readIndex(workspaceId)
const enriched = await Promise.all(
records.map(async (record) => ({
...record,
...this.toPublicProcess(record),
outputSizeBytes: await this.getOutputSize(workspaceId, record.id),
})),
)
return enriched
}
async start(workspaceId: string, title: string, command: string): Promise<BackgroundProcess> {
async start(workspaceId: string, title: string, command: string, options: StartOptions = {}): Promise<BackgroundProcess> {
const workspace = this.deps.workspaceManager.get(workspaceId)
if (!workspace) {
throw new Error("Workspace not found")
@@ -73,8 +98,7 @@ export class BackgroundProcessManager {
this.killProcessTree(child, "SIGTERM")
})
const record: BackgroundProcess = {
const record: PersistedBackgroundProcess = {
id,
workspaceId,
title,
@@ -84,6 +108,20 @@ export class BackgroundProcessManager {
pid: child.pid,
startedAt: new Date().toISOString(),
outputSizeBytes: 0,
notify: options.notify && options.notification
? {
sessionID: options.notification.sessionID,
directory: options.notification.directory,
}
: undefined,
}
const runningState: RunningProcess = {
id,
child,
outputPath,
exitPromise: Promise.resolve(),
workspaceId,
}
const exitPromise = new Promise<void>((resolve) => {
@@ -91,18 +129,21 @@ export class BackgroundProcessManager {
await new Promise<void>((resolve) => outputStream.end(resolve))
this.running.delete(id)
record.status = this.statusFromExit(code)
const completion = runningState.completion ?? this.completionFromExit(code)
record.terminalReason = completion.reason
record.status = this.statusFromReason(completion.reason)
record.exitCode = code === null ? undefined : code
record.stoppedAt = new Date().toISOString()
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
await this.finalizeRecord(workspaceId, record, completion)
resolve()
})
})
this.running.set(id, { id, child, outputPath, exitPromise, workspaceId })
runningState.exitPromise = exitPromise
this.running.set(id, runningState)
let lastPublishAt = 0
const maybePublishSize = () => {
@@ -128,7 +169,7 @@ export class BackgroundProcessManager {
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
return record
return this.toPublicProcess(record)
}
async stop(workspaceId: string, processId: string): Promise<BackgroundProcess | null> {
@@ -139,19 +180,21 @@ export class BackgroundProcessManager {
const running = this.running.get(processId)
if (running?.child && !running.child.killed) {
running.completion = { reason: "user_stopped", endContext: "normal" }
this.killProcessTree(running.child, "SIGTERM")
await this.waitForExit(running)
const updated = await this.findProcess(workspaceId, processId)
return updated ? this.toPublicProcess(updated) : this.toPublicProcess(record)
}
if (record.status === "running") {
record.status = "stopped"
record.terminalReason = "user_stopped"
record.stoppedAt = new Date().toISOString()
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
await this.finalizeRecord(workspaceId, record, { reason: "user_stopped", endContext: "normal" })
}
return record
return this.toPublicProcess(record)
}
async terminate(workspaceId: string, processId: string): Promise<void> {
@@ -160,17 +203,19 @@ export class BackgroundProcessManager {
const running = this.running.get(processId)
if (running?.child && !running.child.killed) {
running.completion = { reason: "user_terminated", endContext: "normal", removeAfterFinalize: true }
this.killProcessTree(running.child, "SIGTERM")
await this.waitForExit(running)
return
}
await this.removeFromIndex(workspaceId, processId)
await this.removeProcessDir(workspaceId, processId)
this.deps.eventBus.publish({
type: "instance.event",
instanceId: workspaceId,
event: { type: "background.process.removed", properties: { processId } },
record.status = "stopped"
record.terminalReason = "user_terminated"
record.stoppedAt = new Date().toISOString()
await this.finalizeRecord(workspaceId, record, {
reason: "user_terminated",
endContext: "normal",
removeAfterFinalize: true,
})
}
@@ -266,6 +311,11 @@ export class BackgroundProcessManager {
private async cleanupWorkspace(workspaceId: string) {
for (const [, running] of this.running.entries()) {
if (running.workspaceId !== workspaceId) continue
running.completion = {
reason: "user_terminated",
endContext: "workspace_cleanup",
removeAfterFinalize: true,
}
this.killProcessTree(running.child, "SIGTERM")
await this.waitForExit(running)
}
@@ -356,10 +406,17 @@ export class BackgroundProcessManager {
return args
}
private statusFromExit(code: number | null): BackgroundProcessStatus {
if (code === null) return "stopped"
if (code === 0) return "stopped"
return "error"
private completionFromExit(code: number | null): ProcessCompletion {
if (code === 0) {
return { reason: "finished", endContext: "normal" }
}
return { reason: "failed", endContext: "normal" }
}
private statusFromReason(reason: BackgroundProcessTerminalReason): BackgroundProcessStatus {
if (reason === "failed") return "error"
return "stopped"
}
private async readOutputBytes(outputPath: string, sizeBytes: number, maxBytes?: number): Promise<string> {
@@ -423,25 +480,25 @@ export class BackgroundProcessManager {
return path.join(workspace.path, ROOT_DIR, workspaceId, processId, OUTPUT_FILE)
}
private async findProcess(workspaceId: string, processId: string): Promise<BackgroundProcess | null> {
private async findProcess(workspaceId: string, processId: string): Promise<PersistedBackgroundProcess | null> {
const records = await this.readIndex(workspaceId)
return records.find((entry) => entry.id === processId) ?? null
}
private async readIndex(workspaceId: string): Promise<BackgroundProcess[]> {
private async readIndex(workspaceId: string): Promise<PersistedBackgroundProcess[]> {
const indexPath = await this.getIndexPath(workspaceId)
if (!existsSync(indexPath)) return []
try {
const raw = await fs.readFile(indexPath, "utf-8")
const parsed = JSON.parse(raw)
return Array.isArray(parsed) ? (parsed as BackgroundProcess[]) : []
return Array.isArray(parsed) ? (parsed as PersistedBackgroundProcess[]) : []
} catch {
return []
}
}
private async upsertIndex(workspaceId: string, record: BackgroundProcess) {
private async upsertIndex(workspaceId: string, record: PersistedBackgroundProcess) {
const records = await this.readIndex(workspaceId)
const index = records.findIndex((entry) => entry.id === record.id)
if (index >= 0) {
@@ -458,7 +515,7 @@ export class BackgroundProcessManager {
await this.writeIndex(workspaceId, next)
}
private async writeIndex(workspaceId: string, records: BackgroundProcess[]) {
private async writeIndex(workspaceId: string, records: PersistedBackgroundProcess[]) {
const indexPath = await this.getIndexPath(workspaceId)
await fs.mkdir(path.dirname(indexPath), { recursive: true })
await fs.writeFile(indexPath, JSON.stringify(records, null, 2))
@@ -503,14 +560,138 @@ export class BackgroundProcessManager {
}
}
private publishUpdate(workspaceId: string, record: BackgroundProcess) {
private publishUpdate(workspaceId: string, record: PersistedBackgroundProcess) {
this.deps.eventBus.publish({
type: "instance.event",
instanceId: workspaceId,
event: { type: "background.process.updated", properties: { process: record } },
event: { type: "background.process.updated", properties: { process: this.toPublicProcess(record) } },
})
}
private toPublicProcess(record: PersistedBackgroundProcess): BackgroundProcess {
return {
id: record.id,
workspaceId: record.workspaceId,
title: record.title,
command: record.command,
cwd: record.cwd,
status: record.status,
pid: record.pid,
startedAt: record.startedAt,
stoppedAt: record.stoppedAt,
exitCode: record.exitCode,
outputSizeBytes: record.outputSizeBytes,
terminalReason: record.terminalReason,
}
}
private async finalizeRecord(workspaceId: string, record: PersistedBackgroundProcess, completion: ProcessCompletion) {
if (this.shouldSendCompletionPrompt(record, completion)) {
try {
await this.sendCompletionPrompt(workspaceId, record)
if (record.notify) {
record.notify.sentAt = new Date().toISOString()
}
} catch (error) {
this.deps.logger.warn({ err: error, workspaceId, processId: record.id }, "Failed to send background process completion prompt")
}
}
if (completion.removeAfterFinalize) {
await this.removeFromIndex(workspaceId, record.id)
await this.removeProcessDir(workspaceId, record.id)
this.deps.eventBus.publish({
type: "instance.event",
instanceId: workspaceId,
event: { type: "background.process.removed", properties: { processId: record.id } },
})
return
}
await this.upsertIndex(workspaceId, record)
record.outputSizeBytes = await this.getOutputSize(workspaceId, record.id)
this.publishUpdate(workspaceId, record)
}
private shouldSendCompletionPrompt(record: PersistedBackgroundProcess, completion: ProcessCompletion) {
if (completion.endContext === "workspace_cleanup") return false
if (!record.notify) return false
return !record.notify.sentAt
}
private async sendCompletionPrompt(workspaceId: string, record: PersistedBackgroundProcess) {
const notify = record.notify
if (!notify || !record.terminalReason) return
if (!this.deps.workspaceManager.get(workspaceId)) {
throw new Error("Workspace not found")
}
const port = this.deps.workspaceManager.getInstancePort(workspaceId)
if (!port) {
throw new Error("Workspace instance is not ready")
}
const targetUrl = `http://127.0.0.1:${port}/session/${encodeURIComponent(notify.sessionID)}/prompt_async`
const headers: Record<string, string> = {
"content-type": "application/json",
"x-opencode-directory": /[^\x00-\x7F]/.test(notify.directory) ? encodeURIComponent(notify.directory) : notify.directory,
}
const authorization = this.deps.workspaceManager.getInstanceAuthorizationHeader(workspaceId)
if (authorization) {
headers.authorization = authorization
}
const response = await fetch(targetUrl, {
method: "POST",
headers,
body: JSON.stringify({
parts: [
{
type: "text",
text: this.buildSyntheticCompletionPrompt(record),
synthetic: true,
},
],
}),
})
if (!response.ok) {
const message = await response.text().catch(() => "")
throw new Error(message || `Prompt request failed with ${response.status}`)
}
}
private buildCompletionPrompt(record: PersistedBackgroundProcess): string {
const ref = `Background process "${record.title}" (${record.id})`
switch (record.terminalReason) {
case "finished":
return `${ref} finished successfully.`
case "failed":
return record.exitCode === undefined ? `${ref} failed.` : `${ref} failed with exit code ${record.exitCode}.`
case "user_stopped":
return `${ref} was stopped by user.`
case "user_terminated":
return `${ref} was terminated by user.`
}
return `${ref} ended.`
}
private buildSyntheticCompletionPrompt(record: PersistedBackgroundProcess): string {
return `<system-message>${this.escapeTaggedText(this.buildCompletionPrompt(record))}</system-message>`
}
private escapeTaggedText(input: string): string {
return input
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
}
private generateId(): string {
const timestamp = new Date().toISOString().replace(/[:.]/g, "").slice(0, 15)
const random = randomBytes(3).toString("hex")