diff --git a/packages/opencode/src/acp/agent.ts b/packages/opencode/src/acp/agent.ts index d4d556485da..99312f94523 100644 --- a/packages/opencode/src/acp/agent.ts +++ b/packages/opencode/src/acp/agent.ts @@ -27,6 +27,7 @@ import { type ToolKind, } from "@agentclientprotocol/sdk" import { Log } from "../util/log" +import { withTimeout } from "../util/timeout" import { ACPSessionManager } from "./session" import type { ACPConfig } from "./types" import { Provider } from "../provider/provider" @@ -42,6 +43,7 @@ import { applyPatch } from "diff" export namespace ACP { const log = Log.create({ service: "acp-agent" }) + const SDK_TIMEOUT_MS = 60_000 export async function init({ sdk: _sdk }: { sdk: OpencodeClient }) { return { @@ -130,21 +132,21 @@ export namespace ACP { permissionID: permission.id, sessionID: permission.sessionID, }) - await this.sdk.permission.reply({ + await withTimeout(this.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, - }) + }), SDK_TIMEOUT_MS).catch(() => {}) return undefined }) if (!res) return if (res.outcome.outcome !== "selected") { - await this.sdk.permission.reply({ + await withTimeout(this.sdk.permission.reply({ requestID: permission.id, reply: "reject", directory, - }) + }), SDK_TIMEOUT_MS).catch(() => {}) return } @@ -165,11 +167,11 @@ export namespace ACP { } } - await this.sdk.permission.reply({ + await withTimeout(this.sdk.permission.reply({ requestID: permission.id, reply: res.outcome.optionId as "once" | "always" | "reject", directory, - }) + }), SDK_TIMEOUT_MS) }) .catch((error) => { log.error("failed to handle permission", { error, permissionID: permission.id }) @@ -192,20 +194,22 @@ export namespace ACP { const sessionId = session.id const directory = session.cwd - const message = await this.sdk.session - .message( - { - sessionID: part.sessionID, - messageID: part.messageID, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((error) => { - log.error("unexpected error when fetching message", { error }) - return undefined - }) + const message = await withTimeout( + this.sdk.session + .message( + { + sessionID: part.sessionID, + messageID: part.messageID, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data), + SDK_TIMEOUT_MS + ).catch((error) => { + log.error("unexpected error when fetching message", { error }) + return undefined + }) if (!message || message.info.role !== "assistant") return @@ -508,19 +512,21 @@ export namespace ACP { }) // Replay session history - const messages = await this.sdk.session - .messages( - { - sessionID: sessionId, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + const messages = await withTimeout( + this.sdk.session + .messages( + { + sessionID: sessionId, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data), + SDK_TIMEOUT_MS + ).catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) const lastUser = messages?.findLast((m) => m.info.role === "user")?.info if (lastUser?.role === "user") { @@ -557,15 +563,18 @@ export namespace ACP { const cursor = params.cursor ? Number(params.cursor) : undefined const limit = 100 - const sessions = await this.sdk.session - .list( - { - directory: params.cwd ?? undefined, - roots: true, - }, - { throwOnError: true }, - ) - .then((x) => x.data ?? []) + const sessions = await withTimeout( + this.sdk.session + .list( + { + directory: params.cwd ?? undefined, + roots: true, + }, + { throwOnError: true }, + ) + .then((x) => x.data ?? []), + SDK_TIMEOUT_MS + ) const sorted = sessions.toSorted((a, b) => b.time.updated - a.time.updated) const filtered = cursor ? sorted.filter((s) => s.time.updated < cursor) : sorted @@ -604,15 +613,18 @@ export namespace ACP { try { const model = await defaultModel(this.config, directory) - const forked = await this.sdk.session - .fork( - { - sessionID: params.sessionId, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) + const forked = await withTimeout( + this.sdk.session + .fork( + { + sessionID: params.sessionId, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data), + SDK_TIMEOUT_MS + ) if (!forked) { throw new Error("Fork session returned no data") @@ -629,19 +641,21 @@ export namespace ACP { sessionId, }) - const messages = await this.sdk.session - .messages( - { - sessionID: sessionId, - directory, - }, - { throwOnError: true }, - ) - .then((x) => x.data) - .catch((err) => { - log.error("unexpected error when fetching message", { error: err }) - return undefined - }) + const messages = await withTimeout( + this.sdk.session + .messages( + { + sessionID: sessionId, + directory, + }, + { throwOnError: true }, + ) + .then((x) => x.data), + SDK_TIMEOUT_MS + ).catch((err) => { + log.error("unexpected error when fetching message", { error: err }) + return undefined + }) for (const msg of messages ?? []) { log.debug("replay message", msg) @@ -961,7 +975,10 @@ export namespace ACP { const model = await defaultModel(this.config, directory) const sessionId = params.sessionId - const providers = await this.sdk.config.providers({ directory }).then((x) => x.data!.providers) + const providers = await withTimeout( + this.sdk.config.providers({ directory }).then((x) => x.data!.providers), + SDK_TIMEOUT_MS + ) const entries = providers.sort((a, b) => { const nameA = a.name.toLowerCase() const nameB = b.name.toLowerCase() @@ -977,23 +994,29 @@ export namespace ACP { })) }) - const agents = await this.config.sdk.app - .agents( - { - directory, - }, - { throwOnError: true }, - ) - .then((resp) => resp.data!) + const agents = await withTimeout( + this.config.sdk.app + .agents( + { + directory, + }, + { throwOnError: true }, + ) + .then((resp) => resp.data!), + SDK_TIMEOUT_MS + ) - const commands = await this.config.sdk.command - .list( - { - directory, - }, - { throwOnError: true }, - ) - .then((resp) => resp.data!) + const commands = await withTimeout( + this.config.sdk.command + .list( + { + directory, + }, + { throwOnError: true }, + ) + .then((resp) => resp.data!), + SDK_TIMEOUT_MS + ) const availableCommands = commands.map((command) => ({ name: command.name, @@ -1045,18 +1068,19 @@ export namespace ACP { await Promise.all( Object.entries(mcpServers).map(async ([key, mcp]) => { - await this.sdk.mcp - .add( + await withTimeout( + this.sdk.mcp.add( { directory, name: key, config: mcp, }, { throwOnError: true }, - ) - .catch((error) => { - log.error("failed to add mcp server", { name: key, error }) - }) + ), + SDK_TIMEOUT_MS + ).catch((error) => { + log.error("failed to add mcp server", { name: key, error }) + }) }), ) @@ -1101,12 +1125,15 @@ export namespace ACP { async setSessionMode(params: SetSessionModeRequest): Promise { this.sessionManager.get(params.sessionId) - await this.config.sdk.app - .agents({}, { throwOnError: true }) - .then((x) => x.data) - .then((agent) => { - if (!agent) throw new Error(`Agent not found: ${params.modeId}`) - }) + await withTimeout( + this.config.sdk.app + .agents({}, { throwOnError: true }) + .then((x) => x.data) + .then((agent) => { + if (!agent) throw new Error(`Agent not found: ${params.modeId}`) + }), + SDK_TIMEOUT_MS + ) this.sessionManager.setMode(params.sessionId, params.modeId) } @@ -1217,44 +1244,56 @@ export namespace ACP { } if (!cmd) { - await this.sdk.session.prompt({ - sessionID, - model: { - providerID: model.providerID, - modelID: model.modelID, - }, - parts, - agent, - directory, - }) + await withTimeout( + this.sdk.session.prompt({ + sessionID, + model: { + providerID: model.providerID, + modelID: model.modelID, + }, + parts, + agent, + directory, + }), + SDK_TIMEOUT_MS + ) return done } - const command = await this.config.sdk.command - .list({ directory }, { throwOnError: true }) - .then((x) => x.data!.find((c) => c.name === cmd.name)) + const command = await withTimeout( + this.config.sdk.command + .list({ directory }, { throwOnError: true }) + .then((x) => x.data!.find((c) => c.name === cmd.name)), + SDK_TIMEOUT_MS + ) if (command) { - await this.sdk.session.command({ - sessionID, - command: command.name, - arguments: cmd.args, - model: model.providerID + "/" + model.modelID, - agent, - directory, - }) + await withTimeout( + this.sdk.session.command({ + sessionID, + command: command.name, + arguments: cmd.args, + model: model.providerID + "/" + model.modelID, + agent, + directory, + }), + SDK_TIMEOUT_MS + ) return done } switch (cmd.name) { case "compact": - await this.config.sdk.session.summarize( - { - sessionID, - directory, - providerID: model.providerID, - modelID: model.modelID, - }, - { throwOnError: true }, + await withTimeout( + this.config.sdk.session.summarize( + { + sessionID, + directory, + providerID: model.providerID, + modelID: model.modelID, + }, + { throwOnError: true }, + ), + SDK_TIMEOUT_MS ) break } @@ -1264,12 +1303,15 @@ export namespace ACP { async cancel(params: CancelNotification) { const session = this.sessionManager.get(params.sessionId) - await this.config.sdk.session.abort( - { - sessionID: params.sessionId, - directory: session.cwd, - }, - { throwOnError: true }, + await withTimeout( + this.config.sdk.session.abort( + { + sessionID: params.sessionId, + directory: session.cwd, + }, + { throwOnError: true }, + ), + SDK_TIMEOUT_MS ) } } @@ -1328,29 +1370,33 @@ export namespace ACP { const directory = cwd ?? process.cwd() - const specified = await sdk.config - .get({ directory }, { throwOnError: true }) - .then((resp) => { - const cfg = resp.data - if (!cfg || !cfg.model) return undefined - const parsed = Provider.parseModel(cfg.model) - return { - providerID: parsed.providerID, - modelID: parsed.modelID, - } - }) - .catch((error) => { - log.error("failed to load user config for default model", { error }) - return undefined - }) - - const providers = await sdk.config - .providers({ directory }, { throwOnError: true }) - .then((x) => x.data?.providers ?? []) - .catch((error) => { - log.error("failed to list providers for default model", { error }) - return [] - }) + const specified = await withTimeout( + sdk.config + .get({ directory }, { throwOnError: true }) + .then((resp) => { + const cfg = resp.data + if (!cfg || !cfg.model) return undefined + const parsed = Provider.parseModel(cfg.model) + return { + providerID: parsed.providerID, + modelID: parsed.modelID, + } + }), + SDK_TIMEOUT_MS + ).catch((error) => { + log.error("failed to load user config for default model", { error }) + return undefined + }) + + const providers = await withTimeout( + sdk.config + .providers({ directory }, { throwOnError: true }) + .then((x) => x.data?.providers ?? []), + SDK_TIMEOUT_MS + ).catch((error) => { + log.error("failed to list providers for default model", { error }) + return [] + }) if (specified && providers.length) { const provider = providers.find((p) => p.id === specified.providerID) diff --git a/packages/opencode/src/acp/session.ts b/packages/opencode/src/acp/session.ts index 151fa5646ba..61666980ec8 100644 --- a/packages/opencode/src/acp/session.ts +++ b/packages/opencode/src/acp/session.ts @@ -1,6 +1,9 @@ import { RequestError, type McpServer } from "@agentclientprotocol/sdk" import type { ACPSessionState } from "./types" import { Log } from "@/util/log" +import { withTimeout } from "@/util/timeout" + +const SDK_TIMEOUT_MS = 60_000 import type { OpencodeClient } from "@opencode-ai/sdk/v2" const log = Log.create({ service: "acp-session-manager" }) @@ -18,15 +21,18 @@ export class ACPSessionManager { } async create(cwd: string, mcpServers: McpServer[], model?: ACPSessionState["model"]): Promise { - const session = await this.sdk.session - .create( - { - title: `ACP Session ${crypto.randomUUID()}`, - directory: cwd, - }, - { throwOnError: true }, - ) - .then((x) => x.data!) + const session = await withTimeout( + this.sdk.session + .create( + { + title: `ACP Session ${crypto.randomUUID()}`, + directory: cwd, + }, + { throwOnError: true }, + ) + .then((x) => x.data!), + SDK_TIMEOUT_MS + ) const sessionId = session.id const resolvedModel = model @@ -50,15 +56,18 @@ export class ACPSessionManager { mcpServers: McpServer[], model?: ACPSessionState["model"], ): Promise { - const session = await this.sdk.session - .get( - { - sessionID: sessionId, - directory: cwd, - }, - { throwOnError: true }, - ) - .then((x) => x.data!) + const session = await withTimeout( + this.sdk.session + .get( + { + sessionID: sessionId, + directory: cwd, + }, + { throwOnError: true }, + ) + .then((x) => x.data!), + SDK_TIMEOUT_MS + ) const resolvedModel = model diff --git a/packages/opencode/src/cli/cmd/acp.ts b/packages/opencode/src/cli/cmd/acp.ts index 30e919d999a..d7fdfdd8a2b 100644 --- a/packages/opencode/src/cli/cmd/acp.ts +++ b/packages/opencode/src/cli/cmd/acp.ts @@ -43,11 +43,20 @@ export const AcpCommand = cmd({ }) const output = new ReadableStream({ start(controller) { - process.stdin.on("data", (chunk: Buffer) => { + const onData = (chunk: Buffer) => { controller.enqueue(new Uint8Array(chunk)) - }) - process.stdin.on("end", () => controller.close()) - process.stdin.on("error", (err) => controller.error(err)) + } + const onEnd = () => controller.close() + const onError = (err: Error) => controller.error(err) + + process.stdin.on("data", onData) + process.stdin.on("end", onEnd) + process.stdin.on("error", onError) + + // Store references for cleanup + ;(controller as any)._onData = onData + ;(controller as any)._onEnd = onEnd + ;(controller as any)._onError = onError }, }) @@ -61,8 +70,22 @@ export const AcpCommand = cmd({ log.info("setup connection") process.stdin.resume() await new Promise((resolve, reject) => { - process.stdin.on("end", resolve) - process.stdin.on("error", reject) + const onEnd = () => { + cleanup() + resolve(undefined) + } + const onError = (err: Error) => { + cleanup() + reject(err) + } + + const cleanup = () => { + process.stdin.removeListener("end", onEnd) + process.stdin.removeListener("error", onError) + } + + process.stdin.once("end", onEnd) + process.stdin.once("error", onError) }) }) }, diff --git a/packages/opencode/src/cli/cmd/run.ts b/packages/opencode/src/cli/cmd/run.ts index 54248f96f3d..65b8b58eb5c 100644 --- a/packages/opencode/src/cli/cmd/run.ts +++ b/packages/opencode/src/cli/cmd/run.ts @@ -11,6 +11,9 @@ import { createOpencodeClient, type OpencodeClient } from "@opencode-ai/sdk/v2" import { Server } from "../../server/server" import { Provider } from "../../provider/provider" import { Agent } from "../../agent/agent" +import { withTimeout } from "../../util/timeout" + +const SDK_TIMEOUT_MS = 60_000 const TOOL: Record = { todowrite: ["Todo", UI.Style.TEXT_WARNING_BOLD], @@ -151,7 +154,7 @@ export const RunCommand = cmd({ return false } - const events = await sdk.event.subscribe() + const events = await withTimeout(sdk.event.subscribe(), SDK_TIMEOUT_MS) let errorMsg: string | undefined const eventProcessor = (async () => { @@ -219,11 +222,11 @@ export const RunCommand = cmd({ initialValue: "once", }).catch(() => "reject") const response = (result.toString().includes("cancel") ? "reject" : result) as "once" | "always" | "reject" - await sdk.permission.respond({ + await withTimeout(sdk.permission.respond({ sessionID, permissionID: permission.id, response, - }) + }), SDK_TIMEOUT_MS) } } })() @@ -252,23 +255,23 @@ export const RunCommand = cmd({ })() if (args.command) { - await sdk.session.command({ + await withTimeout(sdk.session.command({ sessionID, agent: resolvedAgent, model: args.model, command: args.command, arguments: message, variant: args.variant, - }) + }), SDK_TIMEOUT_MS) } else { const modelParam = args.model ? Provider.parseModel(args.model) : undefined - await sdk.session.prompt({ + await withTimeout(sdk.session.prompt({ sessionID, agent: resolvedAgent, model: modelParam, variant: args.variant, parts: [...fileParts, { type: "text", text: message }], - }) + }), SDK_TIMEOUT_MS) } await eventProcessor @@ -280,7 +283,7 @@ export const RunCommand = cmd({ const sessionID = await (async () => { if (args.continue) { - const result = await sdk.session.list() + const result = await withTimeout(sdk.session.list(), SDK_TIMEOUT_MS) return result.data?.find((s) => !s.parentID)?.id } if (args.session) return args.session @@ -292,7 +295,7 @@ export const RunCommand = cmd({ : args.title : undefined - const result = await sdk.session.create( + const result = await withTimeout(sdk.session.create( title ? { title, @@ -313,7 +316,7 @@ export const RunCommand = cmd({ }, ], }, - ) + ), SDK_TIMEOUT_MS) return result.data?.id })() @@ -322,9 +325,9 @@ export const RunCommand = cmd({ process.exit(1) } - const cfgResult = await sdk.config.get() + const cfgResult = await withTimeout(sdk.config.get(), SDK_TIMEOUT_MS) if (cfgResult.data && (cfgResult.data.share === "auto" || Flag.OPENCODE_AUTO_SHARE || args.share)) { - const shareResult = await sdk.session.share({ sessionID }).catch((error) => { + const shareResult = await withTimeout(sdk.session.share({ sessionID }), SDK_TIMEOUT_MS).catch((error) => { if (error instanceof Error && error.message.includes("disabled")) { UI.println(UI.Style.TEXT_DANGER_BOLD + "! " + error.message) } @@ -355,7 +358,7 @@ export const RunCommand = cmd({ const sessionID = await (async () => { if (args.continue) { - const result = await sdk.session.list() + const result = await withTimeout(sdk.session.list(), SDK_TIMEOUT_MS) return result.data?.find((s) => !s.parentID)?.id } if (args.session) return args.session @@ -367,7 +370,7 @@ export const RunCommand = cmd({ : args.title : undefined - const result = await sdk.session.create(title ? { title } : {}) + const result = await withTimeout(sdk.session.create(title ? { title } : {}), SDK_TIMEOUT_MS) return result.data?.id })() @@ -376,9 +379,9 @@ export const RunCommand = cmd({ process.exit(1) } - const cfgResult = await sdk.config.get() + const cfgResult = await withTimeout(sdk.config.get(), SDK_TIMEOUT_MS) if (cfgResult.data && (cfgResult.data.share === "auto" || Flag.OPENCODE_AUTO_SHARE || args.share)) { - const shareResult = await sdk.session.share({ sessionID }).catch((error) => { + const shareResult = await withTimeout(sdk.session.share({ sessionID }), SDK_TIMEOUT_MS).catch((error) => { if (error instanceof Error && error.message.includes("disabled")) { UI.println(UI.Style.TEXT_DANGER_BOLD + "! " + error.message) } diff --git a/packages/opencode/src/cli/cmd/tui/thread.ts b/packages/opencode/src/cli/cmd/tui/thread.ts index 05714268545..641e553810d 100644 --- a/packages/opencode/src/cli/cmd/tui/thread.ts +++ b/packages/opencode/src/cli/cmd/tui/thread.ts @@ -95,10 +95,24 @@ export const TuiThreadCommand = cmd({ Object.entries(process.env).filter((entry): entry is [string, string] => entry[1] !== undefined), ), }) + + const client = Rpc.client(worker) + + const handleWorkerCrash = (reason: string) => { + Log.Default.error(`Worker crashed: ${reason}`) + client.rejectAll(new Error(`Worker crashed: ${reason}`)) + process.exit(1) + } + worker.onerror = (e) => { - Log.Default.error(e) + handleWorkerCrash(e.message || "unknown error") } - const client = Rpc.client(worker) + + ;(worker as any).on?.("exit", (code: number) => { + if (code !== 0) { + handleWorkerCrash(`exit code ${code}`) + } + }) process.on("uncaughtException", (e) => { Log.Default.error(e) }) @@ -109,6 +123,15 @@ export const TuiThreadCommand = cmd({ await client.call("reload", undefined) }) + // Handle graceful shutdown on SIGINT (Ctrl+C) and SIGTERM (kill) + const handleShutdown = async () => { + await client.call("shutdown", undefined) + process.exit(0) + } + + process.on("SIGINT", handleShutdown) + process.on("SIGTERM", handleShutdown) + const prompt = await iife(async () => { const piped = !process.stdin.isTTY ? await Bun.stdin.text() : undefined if (!args.prompt) return piped diff --git a/packages/opencode/src/cli/cmd/tui/worker.ts b/packages/opencode/src/cli/cmd/tui/worker.ts index e63f10ba80c..b004f5b0280 100644 --- a/packages/opencode/src/cli/cmd/tui/worker.ts +++ b/packages/opencode/src/cli/cmd/tui/worker.ts @@ -33,9 +33,10 @@ process.on("uncaughtException", (e) => { }) // Subscribe to global events and forward them via RPC -GlobalBus.on("event", (event) => { +const globalBusHandler = (event: any) => { Rpc.emit("global.event", event) -}) +} +GlobalBus.on("event", globalBusHandler) let server: Bun.Server | undefined @@ -64,6 +65,10 @@ const startEventStream = (directory: string) => { }) ;(async () => { + let consecutiveErrors = 0 + const MAX_CONSECUTIVE_ERRORS = 10 + const ERROR_BACKOFF_MS = 1000 + while (!signal.aborted) { const events = await Promise.resolve( sdk.event.subscribe( @@ -72,15 +77,37 @@ const startEventStream = (directory: string) => { signal, }, ), - ).catch(() => undefined) + ).catch((error) => { + consecutiveErrors++ + if (consecutiveErrors <= 3 || consecutiveErrors % 10 === 0) { + Log.Default.warn("event subscription failed", { + error: error instanceof Error ? error.message : error, + attempt: consecutiveErrors, + }) + } + return undefined + }) if (!events) { - await Bun.sleep(250) + if (consecutiveErrors >= MAX_CONSECUTIVE_ERRORS) { + Log.Default.error("event stream: too many consecutive errors, backing off") + await Bun.sleep(ERROR_BACKOFF_MS * Math.min(consecutiveErrors, 30)) + } else { + await Bun.sleep(250) + } continue } - - for await (const event of events.stream) { - Rpc.emit("event", event as Event) + + consecutiveErrors = 0 + + try { + for await (const event of events.stream) { + Rpc.emit("event", event as Event) + } + } catch (streamError) { + Log.Default.warn("event stream iteration error", { + error: streamError instanceof Error ? streamError.message : streamError, + }) } if (!signal.aborted) { @@ -88,7 +115,7 @@ const startEventStream = (directory: string) => { } } })().catch((error) => { - Log.Default.error("event stream error", { + Log.Default.error("event stream fatal error", { error: error instanceof Error ? error.message : error, }) }) @@ -137,6 +164,8 @@ export const rpc = { async shutdown() { Log.Default.info("worker shutting down") if (eventStream.abort) eventStream.abort.abort() + GlobalBus.off("event", globalBusHandler) + Log.Default.debug("GlobalBus listener unsubscribed") await Instance.disposeAll() if (server) server.stop(true) }, diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index 045c58ef034..2dde0f3eb65 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -419,9 +419,12 @@ export namespace MCP { ...mcp.environment, }, }) - transport.stderr?.on("data", (chunk: Buffer) => { + + // Store handler reference for cleanup + const stderrHandler = (chunk: Buffer) => { log.info(`mcp stderr: ${chunk.toString()}`, { key }) - }) + } + transport.stderr?.on("data", stderrHandler) const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT try { @@ -446,6 +449,8 @@ export namespace MCP { status: "failed" as const, error: error instanceof Error ? error.message : String(error), } + // Clean up stderr listener on connection failure + transport.stderr?.removeListener("data", stderrHandler) } } @@ -812,16 +817,28 @@ export namespace MCP { await new Promise((resolve, reject) => { // Give the process a moment to fail if it's going to const timeout = setTimeout(() => resolve(), 500) - subprocess.on("error", (error) => { + + // Store handler references for cleanup + const errorHandler = (error: Error) => { clearTimeout(timeout) + cleanup() reject(error) - }) - subprocess.on("exit", (code) => { + } + const exitHandler = (code: number | null) => { if (code !== null && code !== 0) { clearTimeout(timeout) + cleanup() reject(new Error(`Browser open failed with exit code ${code}`)) } - }) + } + + const cleanup = () => { + subprocess.removeListener("error", errorHandler) + subprocess.removeListener("exit", exitHandler) + } + + subprocess.on("error", errorHandler) + subprocess.on("exit", exitHandler) }) } catch (error) { // Browser opening failed (e.g., in remote/headless sessions like SSH, devcontainers) diff --git a/packages/opencode/src/server/mdns.ts b/packages/opencode/src/server/mdns.ts index 953269de444..af33874e671 100644 --- a/packages/opencode/src/server/mdns.ts +++ b/packages/opencode/src/server/mdns.ts @@ -6,6 +6,9 @@ const log = Log.create({ service: "mdns" }) export namespace MDNS { let bonjour: Bonjour | undefined let currentPort: number | undefined + let currentService: any = undefined + let upHandler: (() => void) | undefined + let errorHandler: ((err: Error) => void) | undefined export function publish(port: number) { if (currentPort === port) return @@ -22,14 +25,18 @@ export namespace MDNS { txt: { path: "/" }, }) - service.on("up", () => { + // Store handler references for cleanup + upHandler = () => { log.info("mDNS service published", { name, port }) - }) - - service.on("error", (err) => { + } + errorHandler = (err: Error) => { log.error("mDNS service error", { error: err }) - }) + } + + service.on("up", upHandler) + service.on("error", errorHandler) + currentService = service currentPort = port } catch (err) { log.error("mDNS publish failed", { error: err }) @@ -40,10 +47,25 @@ export namespace MDNS { } bonjour = undefined currentPort = undefined + currentService = undefined + upHandler = undefined + errorHandler = undefined } } export function unpublish() { + if (currentService && upHandler && errorHandler) { + try { + currentService.removeListener("up", upHandler) + currentService.removeListener("error", errorHandler) + } catch (err) { + log.error("mDNS listener cleanup failed", { error: err }) + } + currentService = undefined + upHandler = undefined + errorHandler = undefined + } + if (bonjour) { try { bonjour.unpublishAll() diff --git a/packages/opencode/src/util/rpc.ts b/packages/opencode/src/util/rpc.ts index ebd8be40e45..c2d0facb4c9 100644 --- a/packages/opencode/src/util/rpc.ts +++ b/packages/opencode/src/util/rpc.ts @@ -7,8 +7,18 @@ export namespace Rpc { onmessage = async (evt) => { const parsed = JSON.parse(evt.data) if (parsed.type === "rpc.request") { - const result = await rpc[parsed.method](parsed.input) - postMessage(JSON.stringify({ type: "rpc.result", result, id: parsed.id })) + try { + const result = await rpc[parsed.method](parsed.input) + postMessage(JSON.stringify({ type: "rpc.result", result, id: parsed.id })) + } catch (error) { + // Send error back to client instead of silently failing + postMessage(JSON.stringify({ + type: "rpc.error", + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + id: parsed.id + })) + } } } } @@ -21,18 +31,55 @@ export namespace Rpc { postMessage: (data: string) => void | null onmessage: ((this: Worker, ev: MessageEvent) => any) | null }) { - const pending = new Map void>() + type PendingEntry = { + resolve: (result: any) => void + reject: (error: Error) => void + timeoutId: NodeJS.Timeout + } + + const pending = new Map() const listeners = new Map void>>() let id = 0 + + // Configuration + const REQUEST_TIMEOUT_MS = 60000 // 60 seconds + const MAX_PENDING_REQUESTS = 1000 + + // Clean up request + const cleanupRequest = (requestId: number) => { + const entry = pending.get(requestId) + if (entry) { + clearTimeout(entry.timeoutId) + pending.delete(requestId) + } + } + target.onmessage = async (evt) => { const parsed = JSON.parse(evt.data) + + // Handle successful response if (parsed.type === "rpc.result") { - const resolve = pending.get(parsed.id) - if (resolve) { - resolve(parsed.result) - pending.delete(parsed.id) + const entry = pending.get(parsed.id) + if (entry) { + entry.resolve(parsed.result) + cleanupRequest(parsed.id) + } + } + + // Handle error response - NEW! + if (parsed.type === "rpc.error") { + const entry = pending.get(parsed.id) + if (entry) { + const error = new Error(parsed.error || "RPC call failed") + if (parsed.stack) { + error.stack = parsed.stack + } + entry.reject(error) + cleanupRequest(parsed.id) } } + + // Handle events if (parsed.type === "rpc.event") { const handlers = listeners.get(parsed.event) if (handlers) { @@ -42,14 +89,39 @@ export namespace Rpc { } } } + return { call(method: Method, input: Parameters[0]): Promise> { const requestId = id++ - return new Promise((resolve) => { - pending.set(requestId, resolve) + + // Check if we've exceeded max pending requests + if (pending.size >= MAX_PENDING_REQUESTS) { + return Promise.reject(new Error(`RPC queue full: ${pending.size} pending requests`)) + } + + return new Promise((resolve, reject) => { + // Set timeout for this request + const timeoutId = setTimeout(() => { + if (pending.has(requestId)) { + cleanupRequest(requestId) + reject(new Error(`RPC call '${String(method)}' timed out after ${REQUEST_TIMEOUT_MS}ms`)) + } + }, REQUEST_TIMEOUT_MS) + + pending.set(requestId, { resolve, reject, timeoutId }) target.postMessage(JSON.stringify({ type: "rpc.request", method, input, id: requestId })) }) }, + + // Reject all pending requests (call on worker error) + rejectAll(error: Error) { + for (const [requestId, entry] of pending) { + entry.reject(error) + clearTimeout(entry.timeoutId) + } + pending.clear() + }, + on(event: string, handler: (data: Data) => void) { let handlers = listeners.get(event) if (!handlers) {