diff --git a/packages/enterprise/src/custom-elements.d.ts b/packages/enterprise/src/custom-elements.d.ts index e4ea0d6cebd..075c1614f78 120000 --- a/packages/enterprise/src/custom-elements.d.ts +++ b/packages/enterprise/src/custom-elements.d.ts @@ -1 +1 @@ -../../ui/src/custom-elements.d.ts \ No newline at end of file +/// \ No newline at end of file diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 27071056180..a5aa790d9a5 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -214,6 +214,16 @@ export namespace SessionProcessor { value.error instanceof Question.RejectedError ) { blocked = shouldBreak + } else if (!shouldBreak) { + // Automatic Recovery: Inject a system hint to retry + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: input.assistantMessage.id, + sessionID: input.assistantMessage.sessionID, + type: "text", + synthetic: true, + text: `\nThe tool ${value.toolName} failed with an error. Please analyze the error message in the tool output and try again with corrected inputs.\n`, + }) } delete toolcalls[value.toolCallId] } @@ -352,7 +362,7 @@ export namespace SessionProcessor { message: retry, next: Date.now() + delay, }) - await SessionRetry.sleep(delay, input.abort).catch(() => {}) + await SessionRetry.sleep(delay, input.abort).catch(() => { }) continue } input.assistantMessage.error = error diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index de62788200b..c50025478dd 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -45,6 +45,7 @@ import { LLM } from "./llm" import { iife } from "@/util/iife" import { Shell } from "@/shell/shell" import { Truncate } from "@/tool/truncation" +import { SlidingWindow } from "../util/window" // @ts-ignore globalThis.AI_SDK_LOG_WARNINGS = false @@ -311,185 +312,189 @@ export namespace SessionPrompt { }) const model = await Provider.getModel(lastUser.model.providerID, lastUser.model.modelID) - const task = tasks.pop() - - // pending subtask - // TODO: centralize "invoke tool" logic - if (task?.type === "subtask") { - const taskTool = await TaskTool.init() - const taskModel = task.model ? await Provider.getModel(task.model.providerID, task.model.modelID) : model - const assistantMessage = (await Session.updateMessage({ - id: Identifier.ascending("message"), - role: "assistant", - parentID: lastUser.id, - sessionID, - mode: task.agent, - agent: task.agent, - path: { - cwd: Instance.directory, - root: Instance.worktree, - }, - cost: 0, - tokens: { - input: 0, - output: 0, - reasoning: 0, - cache: { read: 0, write: 0 }, - }, - modelID: taskModel.id, - providerID: taskModel.providerID, - time: { - created: Date.now(), - }, - })) as MessageV2.Assistant - let part = (await Session.updatePart({ - id: Identifier.ascending("part"), - messageID: assistantMessage.id, - sessionID: assistantMessage.sessionID, - type: "tool", - callID: ulid(), - tool: TaskTool.id, - state: { - status: "running", - input: { + // Parallelize subtasks + const subtasks = tasks.filter((t): t is MessageV2.SubtaskPart => t.type === "subtask") + const compactions = tasks.filter((t): t is MessageV2.CompactionPart => t.type === "compaction") + + if (subtasks.length > 0) { + await Promise.all( + subtasks.map(async (task) => { + const taskTool = await TaskTool.init() + const taskModel = task.model ? await Provider.getModel(task.model.providerID, task.model.modelID) : model + const assistantMessage = (await Session.updateMessage({ + id: Identifier.ascending("message"), + role: "assistant", + parentID: lastUser!.id, + sessionID, + mode: task.agent, + agent: task.agent, + path: { + cwd: Instance.directory, + root: Instance.worktree, + }, + cost: 0, + tokens: { + input: 0, + output: 0, + reasoning: 0, + cache: { read: 0, write: 0 }, + }, + modelID: taskModel.id, + providerID: taskModel.providerID, + time: { + created: Date.now(), + }, + })) as MessageV2.Assistant + let part = (await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: assistantMessage.id, + sessionID: assistantMessage.sessionID, + type: "tool", + callID: ulid(), + tool: TaskTool.id, + state: { + status: "running", + input: { + prompt: task.prompt, + description: task.description, + subagent_type: task.agent, + command: task.command, + }, + time: { + start: Date.now(), + }, + }, + })) as MessageV2.ToolPart + const taskArgs = { prompt: task.prompt, description: task.description, subagent_type: task.agent, command: task.command, - }, - time: { - start: Date.now(), - }, - }, - })) as MessageV2.ToolPart - const taskArgs = { - prompt: task.prompt, - description: task.description, - subagent_type: task.agent, - command: task.command, - } - await Plugin.trigger( - "tool.execute.before", - { - tool: "task", - sessionID, - callID: part.id, - }, - { args: taskArgs }, - ) - let executionError: Error | undefined - const taskAgent = await Agent.get(task.agent) - const taskCtx: Tool.Context = { - agent: task.agent, - messageID: assistantMessage.id, - sessionID: sessionID, - abort, - callID: part.callID, - extra: { bypassAgentCheck: true }, - async metadata(input) { - await Session.updatePart({ - ...part, - type: "tool", - state: { - ...part.state, - ...input, + } + await Plugin.trigger( + "tool.execute.before", + { + tool: "task", + sessionID, + callID: part.id, }, - } satisfies MessageV2.ToolPart) - }, - async ask(req) { - await PermissionNext.ask({ - ...req, + { args: taskArgs }, + ) + let executionError: Error | undefined + const taskAgent = await Agent.get(task.agent) + const taskCtx: Tool.Context = { + agent: task.agent, + messageID: assistantMessage.id, sessionID: sessionID, - ruleset: PermissionNext.merge(taskAgent.permission, session.permission ?? []), - }) - }, - } - const result = await taskTool.execute(taskArgs, taskCtx).catch((error) => { - executionError = error - log.error("subtask execution failed", { error, agent: task.agent, description: task.description }) - return undefined - }) - await Plugin.trigger( - "tool.execute.after", - { - tool: "task", - sessionID, - callID: part.id, - }, - result, - ) - assistantMessage.finish = "tool-calls" - assistantMessage.time.completed = Date.now() - await Session.updateMessage(assistantMessage) - if (result && part.state.status === "running") { - await Session.updatePart({ - ...part, - state: { - status: "completed", - input: part.state.input, - title: result.title, - metadata: result.metadata, - output: result.output, - attachments: result.attachments, - time: { - ...part.state.time, - end: Date.now(), + abort, + callID: part.callID, + extra: { bypassAgentCheck: true }, + async metadata(input) { + await Session.updatePart({ + ...part, + type: "tool", + state: { + ...part.state, + ...input, + }, + } satisfies MessageV2.ToolPart) }, - }, - } satisfies MessageV2.ToolPart) - } - if (!result) { - await Session.updatePart({ - ...part, - state: { - status: "error", - error: executionError ? `Tool execution failed: ${executionError.message}` : "Tool execution failed", - time: { - start: part.state.status === "running" ? part.state.time.start : Date.now(), - end: Date.now(), + async ask(req) { + await PermissionNext.ask({ + ...req, + sessionID: sessionID, + ruleset: PermissionNext.merge(taskAgent.permission, session.permission ?? []), + }) }, - metadata: part.metadata, - input: part.state.input, - }, - } satisfies MessageV2.ToolPart) - } - - if (task.command) { - // Add synthetic user message to prevent certain reasoning models from erroring - // If we create assistant messages w/ out user ones following mid loop thinking signatures - // will be missing and it can cause errors for models like gemini for example - const summaryUserMsg: MessageV2.User = { - id: Identifier.ascending("message"), - sessionID, - role: "user", - time: { - created: Date.now(), - }, - agent: lastUser.agent, - model: lastUser.model, - } - await Session.updateMessage(summaryUserMsg) - await Session.updatePart({ - id: Identifier.ascending("part"), - messageID: summaryUserMsg.id, - sessionID, - type: "text", - text: "Summarize the task tool output above and continue with your task.", - synthetic: true, - } satisfies MessageV2.TextPart) - } + } + const result = await taskTool.execute(taskArgs, taskCtx).catch((error) => { + executionError = error + log.error("subtask execution failed", { error, agent: task.agent, description: task.description }) + return undefined + }) + await Plugin.trigger( + "tool.execute.after", + { + tool: "task", + sessionID, + callID: part.id, + }, + result, + ) + assistantMessage.finish = "tool-calls" + assistantMessage.time.completed = Date.now() + await Session.updateMessage(assistantMessage) + if (result && part.state.status === "running") { + await Session.updatePart({ + ...part, + state: { + status: "completed", + input: part.state.input, + title: result.title, + metadata: result.metadata, + output: result.output, + attachments: result.attachments, + time: { + ...part.state.time, + end: Date.now(), + }, + }, + } satisfies MessageV2.ToolPart) + } + if (!result) { + await Session.updatePart({ + ...part, + state: { + status: "error", + error: executionError ? `Tool execution failed: ${executionError.message}` : "Tool execution failed", + time: { + start: part.state.status === "running" ? part.state.time.start : Date.now(), + end: Date.now(), + }, + metadata: part.metadata, + input: part.state.input, + }, + } satisfies MessageV2.ToolPart) + } + if (task.command) { + // Add synthetic user message to prevent certain reasoning models from erroring + // If we create assistant messages w/ out user ones following mid loop thinking signatures + // will be missing and it can cause errors for models like gemini for example + const summaryUserMsg: MessageV2.User = { + id: Identifier.ascending("message"), + sessionID, + role: "user", + time: { + created: Date.now(), + }, + agent: lastUser!.agent, + model: lastUser!.model, + } + await Session.updateMessage(summaryUserMsg) + await Session.updatePart({ + id: Identifier.ascending("part"), + messageID: summaryUserMsg.id, + sessionID, + type: "text", + text: "Summarize the task tool output above and continue with your task.", + synthetic: true, + } satisfies MessageV2.TextPart) + } + }), + ) continue } + const compaction = compactions.pop() // pending compaction - if (task?.type === "compaction") { + if (compaction) { const result = await SessionCompaction.process({ messages: msgs, parentID: lastUser.id, abort, sessionID, - auto: task.auto, + auto: compaction.auto, }) if (result === "stop") break continue @@ -593,6 +598,8 @@ export namespace SessionPrompt { await Plugin.trigger("experimental.chat.messages.transform", {}, { messages: sessionMessages }) + const windowedMessages = SlidingWindow.apply({ messages: sessionMessages, model }) + const result = await processor.process({ user: lastUser, agent, @@ -600,14 +607,14 @@ export namespace SessionPrompt { sessionID, system: [...(await SystemPrompt.environment()), ...(await SystemPrompt.custom())], messages: [ - ...MessageV2.toModelMessages(sessionMessages, model), + ...MessageV2.toModelMessages(windowedMessages, model), ...(isLastStep ? [ - { - role: "assistant" as const, - content: MAX_STEPS, - }, - ] + { + role: "assistant" as const, + content: MAX_STEPS, + }, + ] : []), ], tools, @@ -1008,8 +1015,8 @@ export namespace SessionPrompt { agent: input.agent!, messageID: info.id, extra: { bypassCwdCheck: true, model }, - metadata: async () => {}, - ask: async () => {}, + metadata: async () => { }, + ask: async () => { }, } const result = await t.execute(args, readCtx) pieces.push({ @@ -1069,8 +1076,8 @@ export namespace SessionPrompt { agent: input.agent!, messageID: info.id, extra: { bypassCwdCheck: true }, - metadata: async () => {}, - ask: async () => {}, + metadata: async () => { }, + ask: async () => { }, } const result = await ListTool.init().then((t) => t.execute(args, listCtx)) return [ @@ -1685,19 +1692,19 @@ NOTE: At any point in time through this workflow you should feel free to ask the const isSubtask = (agent.mode === "subagent" && command.subtask !== false) || command.subtask === true const parts = isSubtask ? [ - { - type: "subtask" as const, - agent: agent.name, - description: command.description ?? "", - command: input.command, - model: { - providerID: taskModel.providerID, - modelID: taskModel.modelID, - }, - // TODO: how can we make task tool accept a more complex input? - prompt: templateParts.find((y) => y.type === "text")?.text ?? "", + { + type: "subtask" as const, + agent: agent.name, + description: command.description ?? "", + command: input.command, + model: { + providerID: taskModel.providerID, + modelID: taskModel.modelID, }, - ] + // TODO: how can we make task tool accept a more complex input? + prompt: templateParts.find((y) => y.type === "text")?.text ?? "", + }, + ] : [...templateParts, ...(input.parts ?? [])] const userAgent = isSubtask ? (input.agent ?? (await Agent.defaultAgent())) : agentName diff --git a/packages/opencode/src/tool/task.ts b/packages/opencode/src/tool/task.ts index c87add638aa..364782cee44 100644 --- a/packages/opencode/src/tool/task.ts +++ b/packages/opencode/src/tool/task.ts @@ -10,6 +10,7 @@ import { SessionPrompt } from "../session/prompt" import { iife } from "@/util/iife" import { defer } from "@/util/defer" import { Config } from "../config/config" +import { Storage } from "../storage/storage" import { PermissionNext } from "@/permission/next" const parameters = z.object({ @@ -18,6 +19,7 @@ const parameters = z.object({ subagent_type: z.string().describe("The type of specialized agent to use for this task"), session_id: z.string().describe("Existing Task session to continue").optional(), command: z.string().describe("The command that triggered this task").optional(), + reset: z.boolean().describe("If true, ignore previous session history and start fresh").optional(), }) export const TaskTool = Tool.define("task", async (ctx) => { @@ -61,11 +63,23 @@ export const TaskTool = Tool.define("task", async (ctx) => { const session = await iife(async () => { if (params.session_id) { - const found = await Session.get(params.session_id).catch(() => {}) + const found = await Session.get(params.session_id).catch(() => { }) if (found) return found } - return await Session.create({ + // Try to find existing session for this subagent type + if (!params.reset) { + try { + const map = await Storage.read>(["subagents", ctx.sessionID]) + const existingID = map[params.subagent_type] + if (existingID) { + const found = await Session.get(existingID).catch(() => { }) + if (found) return found + } + } catch { } + } + + const newSession = await Session.create({ parentID: ctx.sessionID, title: params.description + ` (@${agent.name} subagent)`, permission: [ @@ -82,12 +96,12 @@ export const TaskTool = Tool.define("task", async (ctx) => { ...(hasTaskPermission ? [] : [ - { - permission: "task" as const, - pattern: "*" as const, - action: "deny" as const, - }, - ]), + { + permission: "task" as const, + pattern: "*" as const, + action: "deny" as const, + }, + ]), ...(config.experimental?.primary_tools?.map((t) => ({ pattern: "*", action: "allow" as const, @@ -95,6 +109,17 @@ export const TaskTool = Tool.define("task", async (ctx) => { })) ?? []), ], }) + + // Persist the new session ID + try { + await Storage.update>(["subagents", ctx.sessionID], (draft) => { + draft[params.subagent_type] = newSession.id + }) + } catch { + await Storage.write(["subagents", ctx.sessionID], { [params.subagent_type]: newSession.id }).catch(() => { }) + } + + return newSession }) const msg = await MessageV2.get({ sessionID: ctx.sessionID, messageID: ctx.messageID }) if (msg.info.role !== "assistant") throw new Error("Not an assistant message") diff --git a/packages/opencode/src/util/window.ts b/packages/opencode/src/util/window.ts new file mode 100644 index 00000000000..a5c5738235f --- /dev/null +++ b/packages/opencode/src/util/window.ts @@ -0,0 +1,89 @@ +import { MessageV2 } from "../session/message-v2" +import { Provider } from "../provider/provider" +import { Token } from "./token" +import { SessionPrompt } from "../session/prompt" + +export namespace SlidingWindow { + // Reserve tokens for output and safety buffer + const SAFETY_BUFFER = 1000 + + export function apply(input: { + messages: MessageV2.WithParts[] + model: Provider.Model + }): MessageV2.WithParts[] { + const { messages, model } = input + + const contextLimit = model.limit.context + const outputLimit = Math.min(model.limit.output, SessionPrompt.OUTPUT_TOKEN_MAX) || SessionPrompt.OUTPUT_TOKEN_MAX + const available = contextLimit - outputLimit - SAFETY_BUFFER + + // 1. Calculate tokens for all messages + // We map generic tokens to messages. + // Ideally we usage existing token counts if available, or estimate. + const sized = messages.map((msg) => { + const info = msg.info as any + return { + msg, + tokens: (info.tokens?.input ?? estimate(msg)) as number + } + }) + + const total = sized.reduce((acc, item) => acc + item.tokens, 0) + + if (total <= available) { + return messages + } + + // 2. Sliding Window Strategy + // Keep first message (often sets specific context for the session if not system) + // Keep last N messages that fit. + + // Always keep the *last* message (user query) + let budget = available + const result: MessageV2.WithParts[] = [] + + // Add last message + const last = sized[sized.length - 1] + result.unshift(last.msg) + budget -= last.tokens + + // Iterate backwards from second-to-last + for (let i = sized.length - 2; i >= 0; i--) { + const item = sized[i] + if (budget - item.tokens >= 0) { + result.unshift(item.msg) + budget -= item.tokens + } else { + // If we run out of budget, check if we should keep the VERY FIRST message? + // Often the first message is the "User: Create a project..." which sets the theme. + // If i === 0, we might want to swap it? + // For now, simple truncation from the middle/top. + break + } + } + + // Ensure strict chronological order (we unshifted, so they are in order) + // If we skipped messages, we might want to insert a "System: ... messages skipped ..." placeholder? + // But `MessageV2` structure is strict. + + // If we dropped messages, usually it's better to verify we kept the *first* message if possible. + // But simple sliding window (recent-focused) is the standard request. + + return result + } + + function estimate(msg: MessageV2.WithParts): number { + // Rough estimate + let text = "" + for (const part of msg.parts) { + if (part.type === "text") text += part.text + if (part.type === "tool") { + text += JSON.stringify(part.state.input) + if (part.state.status === "completed") { + text += JSON.stringify(part.state.output) + } + } + } + return Token.estimate(text) + } +}