diff --git a/assets/oh-my-opencode.schema.json b/assets/oh-my-opencode.schema.json index 0420a9bea..ee2b5e367 100644 --- a/assets/oh-my-opencode.schema.json +++ b/assets/oh-my-opencode.schema.json @@ -3678,6 +3678,16 @@ "minimum": 0 } }, + "maxDepth": { + "type": "integer", + "minimum": 1, + "maximum": 9007199254740991 + }, + "maxDescendants": { + "type": "integer", + "minimum": 1, + "maximum": 9007199254740991 + }, "staleTimeoutMs": { "type": "number", "minimum": 60000 diff --git a/src/config/schema/background-task.test.ts b/src/config/schema/background-task.test.ts index 2ca225864..9bd6c74de 100644 --- a/src/config/schema/background-task.test.ts +++ b/src/config/schema/background-task.test.ts @@ -3,6 +3,54 @@ import { ZodError } from "zod/v4" import { BackgroundTaskConfigSchema } from "./background-task" describe("BackgroundTaskConfigSchema", () => { + describe("maxDepth", () => { + describe("#given valid maxDepth (3)", () => { + test("#when parsed #then returns correct value", () => { + const result = BackgroundTaskConfigSchema.parse({ maxDepth: 3 }) + + expect(result.maxDepth).toBe(3) + }) + }) + + describe("#given maxDepth below minimum (0)", () => { + test("#when parsed #then throws ZodError", () => { + let thrownError: unknown + + try { + BackgroundTaskConfigSchema.parse({ maxDepth: 0 }) + } catch (error) { + thrownError = error + } + + expect(thrownError).toBeInstanceOf(ZodError) + }) + }) + }) + + describe("maxDescendants", () => { + describe("#given valid maxDescendants (50)", () => { + test("#when parsed #then returns correct value", () => { + const result = BackgroundTaskConfigSchema.parse({ maxDescendants: 50 }) + + expect(result.maxDescendants).toBe(50) + }) + }) + + describe("#given maxDescendants below minimum (0)", () => { + test("#when parsed #then throws ZodError", () => { + let thrownError: unknown + + try { + BackgroundTaskConfigSchema.parse({ maxDescendants: 0 }) + } catch (error) { + thrownError = error + } + + expect(thrownError).toBeInstanceOf(ZodError) + }) + }) + }) + describe("syncPollTimeoutMs", () => { describe("#given valid syncPollTimeoutMs (120000)", () => { test("#when parsed #then returns correct value", () => { diff --git a/src/config/schema/background-task.ts b/src/config/schema/background-task.ts index b955de6b5..f53a67f6c 100644 --- a/src/config/schema/background-task.ts +++ b/src/config/schema/background-task.ts @@ -4,6 +4,8 @@ export const BackgroundTaskConfigSchema = z.object({ defaultConcurrency: z.number().min(1).optional(), providerConcurrency: z.record(z.string(), z.number().min(0)).optional(), modelConcurrency: z.record(z.string(), z.number().min(0)).optional(), + maxDepth: z.number().int().min(1).optional(), + maxDescendants: z.number().int().min(1).optional(), /** Stale timeout in milliseconds - interrupt tasks with no activity for this duration (default: 180000 = 3 minutes, minimum: 60000 = 1 minute) */ staleTimeoutMs: z.number().min(60000).optional(), /** Timeout for tasks that never received any progress update, falling back to startedAt (default: 600000 = 10 minutes, minimum: 60000 = 1 minute) */ diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index d05b22c41..d4a1244d6 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -1746,6 +1746,32 @@ describe("BackgroundManager - Non-blocking Queue Integration", () => { } } + function createMockClientWithSessionChain( + sessions: Record, + options?: { sessionLookupError?: Error } + ) { + return { + session: { + create: async (_args?: any) => ({ data: { id: `ses_${crypto.randomUUID()}` } }), + get: async ({ path }: { path: { id: string } }) => { + if (options?.sessionLookupError) { + throw options.sessionLookupError + } + + return { + data: sessions[path.id] ?? { directory: "/test/dir" }, + } + }, + prompt: async () => ({}), + promptAsync: async () => ({}), + messages: async () => ({ data: [] }), + todo: async () => ({ data: [] }), + status: async () => ({ data: {} }), + abort: async () => ({}), + }, + } + } + beforeEach(() => { // given mockClient = createMockClient() @@ -1940,6 +1966,151 @@ describe("BackgroundManager - Non-blocking Queue Integration", () => { expect(updatedTask.startedAt.getTime()).toBeGreaterThanOrEqual(queuedAt.getTime()) } }) + + test("should track rootSessionID and spawnDepth from the parent chain", async () => { + // given + manager.shutdown() + manager = new BackgroundManager( + { + client: createMockClientWithSessionChain({ + "session-depth-2": { directory: "/test/dir", parentID: "session-depth-1" }, + "session-depth-1": { directory: "/test/dir", parentID: "session-root" }, + "session-root": { directory: "/test/dir" }, + }), + directory: tmpdir(), + } as unknown as PluginInput, + { maxDepth: 3 }, + ) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "session-depth-2", + parentMessageID: "parent-message", + } + + // when + const task = await manager.launch(input) + + // then + expect(task.rootSessionID).toBe("session-root") + expect(task.spawnDepth).toBe(3) + }) + + test("should block launches that exceed maxDepth", async () => { + // given + manager.shutdown() + manager = new BackgroundManager( + { + client: createMockClientWithSessionChain({ + "session-depth-3": { directory: "/test/dir", parentID: "session-depth-2" }, + "session-depth-2": { directory: "/test/dir", parentID: "session-depth-1" }, + "session-depth-1": { directory: "/test/dir", parentID: "session-root" }, + "session-root": { directory: "/test/dir" }, + }), + directory: tmpdir(), + } as unknown as PluginInput, + { maxDepth: 3 }, + ) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "session-depth-3", + parentMessageID: "parent-message", + } + + // when + const result = manager.launch(input) + + // then + await expect(result).rejects.toThrow("background_task.maxDepth=3") + }) + + test("should block launches when maxDescendants is reached", async () => { + // given + manager.shutdown() + manager = new BackgroundManager( + { + client: createMockClientWithSessionChain({ + "session-root": { directory: "/test/dir" }, + }), + directory: tmpdir(), + } as unknown as PluginInput, + { maxDescendants: 1 }, + ) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "session-root", + parentMessageID: "parent-message", + } + + await manager.launch(input) + + // when + const result = manager.launch(input) + + // then + await expect(result).rejects.toThrow("background_task.maxDescendants=1") + }) + + test("should consume descendant quota for reserved sync spawns", async () => { + // given + manager.shutdown() + manager = new BackgroundManager( + { + client: createMockClientWithSessionChain({ + "session-root": { directory: "/test/dir" }, + }), + directory: tmpdir(), + } as unknown as PluginInput, + { maxDescendants: 1 }, + ) + + await manager.reserveSubagentSpawn("session-root") + + // when + const result = manager.assertCanSpawn("session-root") + + // then + await expect(result).rejects.toThrow("background_task.maxDescendants=1") + }) + + test("should fail closed when session lineage lookup fails", async () => { + // given + manager.shutdown() + manager = new BackgroundManager( + { + client: createMockClientWithSessionChain( + { + "session-root": { directory: "/test/dir" }, + }, + { sessionLookupError: new Error("session lookup failed") } + ), + directory: tmpdir(), + } as unknown as PluginInput, + { maxDescendants: 1 }, + ) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "session-root", + parentMessageID: "parent-message", + } + + // when + const result = manager.launch(input) + + // then + await expect(result).rejects.toThrow("background_task.maxDescendants cannot be enforced safely") + }) }) describe("pending task can be cancelled", () => { diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index 7b58076a6..8d810fd54 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -51,6 +51,14 @@ import { join } from "node:path" import { pruneStaleTasksAndNotifications } from "./task-poller" import { checkAndInterruptStaleTasks } from "./task-poller" import { removeTaskToastTracking } from "./remove-task-toast-tracking" +import { + createSubagentDepthLimitError, + createSubagentDescendantLimitError, + getMaxRootSessionSpawnBudget, + getMaxSubagentDepth, + resolveSubagentSpawnContext, + type SubagentSpawnContext, +} from "./subagent-spawn-limits" type OpencodeClient = PluginInput["client"] @@ -115,6 +123,7 @@ export class BackgroundManager { private completionTimers: Map> = new Map() private idleDeferralTimers: Map> = new Map() private notificationQueueByParent: Map> = new Map() + private rootDescendantCounts: Map private enableParentSessionNotifications: boolean readonly taskHistory = new TaskHistory() @@ -139,10 +148,77 @@ export class BackgroundManager { this.tmuxEnabled = options?.tmuxConfig?.enabled ?? false this.onSubagentSessionCreated = options?.onSubagentSessionCreated this.onShutdown = options?.onShutdown + this.rootDescendantCounts = new Map() this.enableParentSessionNotifications = options?.enableParentSessionNotifications ?? true this.registerProcessCleanup() } + async assertCanSpawn(parentSessionID: string): Promise { + const spawnContext = await resolveSubagentSpawnContext(this.client, parentSessionID) + const maxDepth = getMaxSubagentDepth(this.config) + if (spawnContext.childDepth > maxDepth) { + throw createSubagentDepthLimitError({ + childDepth: spawnContext.childDepth, + maxDepth, + parentSessionID, + rootSessionID: spawnContext.rootSessionID, + }) + } + + const maxRootSessionSpawnBudget = getMaxRootSessionSpawnBudget(this.config) + const descendantCount = this.rootDescendantCounts.get(spawnContext.rootSessionID) ?? 0 + if (descendantCount >= maxRootSessionSpawnBudget) { + throw createSubagentDescendantLimitError({ + rootSessionID: spawnContext.rootSessionID, + descendantCount, + maxDescendants: maxRootSessionSpawnBudget, + }) + } + + return spawnContext + } + + async reserveSubagentSpawn(parentSessionID: string): Promise<{ + spawnContext: SubagentSpawnContext + descendantCount: number + commit: () => number + rollback: () => void + }> { + const spawnContext = await this.assertCanSpawn(parentSessionID) + const descendantCount = this.registerRootDescendant(spawnContext.rootSessionID) + let settled = false + + return { + spawnContext, + descendantCount, + commit: () => { + settled = true + return descendantCount + }, + rollback: () => { + if (settled) return + settled = true + this.unregisterRootDescendant(spawnContext.rootSessionID) + }, + } + } + + private registerRootDescendant(rootSessionID: string): number { + const nextCount = (this.rootDescendantCounts.get(rootSessionID) ?? 0) + 1 + this.rootDescendantCounts.set(rootSessionID, nextCount) + return nextCount + } + + private unregisterRootDescendant(rootSessionID: string): void { + const currentCount = this.rootDescendantCounts.get(rootSessionID) ?? 0 + if (currentCount <= 1) { + this.rootDescendantCounts.delete(rootSessionID) + return + } + + this.rootDescendantCounts.set(rootSessionID, currentCount - 1) + } + async launch(input: LaunchInput): Promise { log("[background-agent] launch() called with:", { agent: input.agent, @@ -155,61 +231,79 @@ export class BackgroundManager { throw new Error("Agent parameter is required") } - // Create task immediately with status="pending" - const task: BackgroundTask = { - id: `bg_${crypto.randomUUID().slice(0, 8)}`, - status: "pending", - queuedAt: new Date(), - // Do NOT set startedAt - will be set when running - // Do NOT set sessionID - will be set when running - description: input.description, - prompt: input.prompt, - agent: input.agent, - parentSessionID: input.parentSessionID, - parentMessageID: input.parentMessageID, - parentModel: input.parentModel, - parentAgent: input.parentAgent, - parentTools: input.parentTools, - model: input.model, - fallbackChain: input.fallbackChain, - attemptCount: 0, - category: input.category, - } + const spawnReservation = await this.reserveSubagentSpawn(input.parentSessionID) - this.tasks.set(task.id, task) - this.taskHistory.record(input.parentSessionID, { id: task.id, agent: input.agent, description: input.description, status: "pending", category: input.category }) - - // Track for batched notifications immediately (pending state) - if (input.parentSessionID) { - const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() - pending.add(task.id) - this.pendingByParent.set(input.parentSessionID, pending) - } - - // Add to queue - const key = this.getConcurrencyKeyFromInput(input) - const queue = this.queuesByKey.get(key) ?? [] - queue.push({ task, input }) - this.queuesByKey.set(key, queue) - - log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: queue.length }) - - const toastManager = getTaskToastManager() - if (toastManager) { - toastManager.addTask({ - id: task.id, - description: input.description, - agent: input.agent, - isBackground: true, - status: "queued", - skills: input.skills, + try { + log("[background-agent] spawn guard passed", { + parentSessionID: input.parentSessionID, + rootSessionID: spawnReservation.spawnContext.rootSessionID, + childDepth: spawnReservation.spawnContext.childDepth, + descendantCount: spawnReservation.descendantCount, }) + + // Create task immediately with status="pending" + const task: BackgroundTask = { + id: `bg_${crypto.randomUUID().slice(0, 8)}`, + status: "pending", + queuedAt: new Date(), + rootSessionID: spawnReservation.spawnContext.rootSessionID, + // Do NOT set startedAt - will be set when running + // Do NOT set sessionID - will be set when running + description: input.description, + prompt: input.prompt, + agent: input.agent, + spawnDepth: spawnReservation.spawnContext.childDepth, + parentSessionID: input.parentSessionID, + parentMessageID: input.parentMessageID, + parentModel: input.parentModel, + parentAgent: input.parentAgent, + parentTools: input.parentTools, + model: input.model, + fallbackChain: input.fallbackChain, + attemptCount: 0, + category: input.category, + } + + this.tasks.set(task.id, task) + this.taskHistory.record(input.parentSessionID, { id: task.id, agent: input.agent, description: input.description, status: "pending", category: input.category }) + + // Track for batched notifications immediately (pending state) + if (input.parentSessionID) { + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(task.id) + this.pendingByParent.set(input.parentSessionID, pending) + } + + // Add to queue + const key = this.getConcurrencyKeyFromInput(input) + const queue = this.queuesByKey.get(key) ?? [] + queue.push({ task, input }) + this.queuesByKey.set(key, queue) + + log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: queue.length }) + + const toastManager = getTaskToastManager() + if (toastManager) { + toastManager.addTask({ + id: task.id, + description: input.description, + agent: input.agent, + isBackground: true, + status: "queued", + skills: input.skills, + }) + } + + spawnReservation.commit() + + // Trigger processing (fire-and-forget) + this.processKey(key) + + return { ...task } + } catch (error) { + spawnReservation.rollback() + throw error } - - // Trigger processing (fire-and-forget) - this.processKey(key) - - return task } private async processKey(key: string): Promise { @@ -865,6 +959,7 @@ export class BackgroundManager { } } + this.rootDescendantCounts.delete(sessionID) SessionCategoryRegistry.remove(sessionID) } @@ -1411,14 +1506,6 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea } } - private formatDuration(start: Date, end?: Date): string { - return formatDuration(start, end) - } - - private isAbortedSessionError(error: unknown): boolean { - return isAbortedSessionError(error) - } - private hasRunningTasks(): boolean { for (const task of this.tasks.values()) { if (task.status === "running") return true @@ -1618,6 +1705,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea this.pendingNotifications.clear() this.pendingByParent.clear() this.notificationQueueByParent.clear() + this.rootDescendantCounts.clear() this.queuesByKey.clear() this.processingKeys.clear() this.unregisterProcessCleanup() diff --git a/src/features/background-agent/subagent-spawn-limits.ts b/src/features/background-agent/subagent-spawn-limits.ts new file mode 100644 index 000000000..c33ad7b21 --- /dev/null +++ b/src/features/background-agent/subagent-spawn-limits.ts @@ -0,0 +1,87 @@ +import type { BackgroundTaskConfig } from "../../config/schema" +import type { OpencodeClient } from "./constants" + +export const DEFAULT_MAX_SUBAGENT_DEPTH = 3 +export const DEFAULT_MAX_ROOT_SESSION_SPAWN_BUDGET = 50 + +export interface SubagentSpawnContext { + rootSessionID: string + parentDepth: number + childDepth: number +} + +export function getMaxSubagentDepth(config?: BackgroundTaskConfig): number { + return config?.maxDepth ?? DEFAULT_MAX_SUBAGENT_DEPTH +} + +export function getMaxRootSessionSpawnBudget(config?: BackgroundTaskConfig): number { + return config?.maxDescendants ?? DEFAULT_MAX_ROOT_SESSION_SPAWN_BUDGET +} + +export async function resolveSubagentSpawnContext( + client: OpencodeClient, + parentSessionID: string +): Promise { + const visitedSessionIDs = new Set() + let rootSessionID = parentSessionID + let currentSessionID = parentSessionID + let parentDepth = 0 + + while (true) { + if (visitedSessionIDs.has(currentSessionID)) { + throw new Error(`Detected a session parent cycle while resolving ${parentSessionID}`) + } + + visitedSessionIDs.add(currentSessionID) + + let nextParentSessionID: string | undefined + try { + const session = await client.session.get({ + path: { id: currentSessionID }, + }) + nextParentSessionID = session.data?.parentID + } catch (error) { + const reason = error instanceof Error ? error.message : String(error) + throw new Error( + `Subagent spawn blocked: failed to resolve session lineage for ${parentSessionID}, so background_task.maxDescendants cannot be enforced safely. ${reason}` + ) + } + + if (!nextParentSessionID) { + rootSessionID = currentSessionID + break + } + + currentSessionID = nextParentSessionID + parentDepth += 1 + } + + return { + rootSessionID, + parentDepth, + childDepth: parentDepth + 1, + } +} + +export function createSubagentDepthLimitError(input: { + childDepth: number + maxDepth: number + parentSessionID: string + rootSessionID: string +}): Error { + const { childDepth, maxDepth, parentSessionID, rootSessionID } = input + return new Error( + `Subagent spawn blocked: child depth ${childDepth} exceeds background_task.maxDepth=${maxDepth}. Parent session: ${parentSessionID}. Root session: ${rootSessionID}. Continue in an existing subagent session instead of spawning another.` + ) +} + +export function createSubagentDescendantLimitError(input: { + rootSessionID: string + descendantCount: number + maxDescendants: number +}): Error { + const { rootSessionID, descendantCount, maxDescendants } = input + return new Error( + `Subagent spawn blocked: root session ${rootSessionID} already has ${descendantCount} descendants, which meets background_task.maxDescendants=${maxDescendants}. Reuse an existing session instead of spawning another.` + ) +} diff --git a/src/features/background-agent/types.ts b/src/features/background-agent/types.ts index d1af31f43..73ae8a000 100644 --- a/src/features/background-agent/types.ts +++ b/src/features/background-agent/types.ts @@ -20,11 +20,13 @@ export interface TaskProgress { export interface BackgroundTask { id: string sessionID?: string + rootSessionID?: string parentSessionID: string parentMessageID: string description: string prompt: string agent: string + spawnDepth?: number status: BackgroundTaskStatus queuedAt?: Date startedAt?: Date diff --git a/src/tools/call-omo-agent/constants.ts b/src/tools/call-omo-agent/constants.ts index a17eea6dd..028b0d602 100644 --- a/src/tools/call-omo-agent/constants.ts +++ b/src/tools/call-omo-agent/constants.ts @@ -12,4 +12,4 @@ export const CALL_OMO_AGENT_DESCRIPTION = `Spawn explore/librarian agent. run_in Available: {agents} -Pass \`session_id=\` to continue previous agent with full context. Prompts MUST be in English. Use \`background_output\` for async results.` +Pass \`session_id=\` to continue previous agent with full context. Nested subagent depth is tracked automatically and blocked past the configured limit. Prompts MUST be in English. Use \`background_output\` for async results.` diff --git a/src/tools/call-omo-agent/sync-executor.test.ts b/src/tools/call-omo-agent/sync-executor.test.ts index f639ecd82..513588676 100644 --- a/src/tools/call-omo-agent/sync-executor.test.ts +++ b/src/tools/call-omo-agent/sync-executor.test.ts @@ -249,4 +249,52 @@ describe("executeSync", () => { expect(deps.waitForCompletion).not.toHaveBeenCalled() expect(deps.processMessages).not.toHaveBeenCalled() }) + + test("commits reserved descendant quota after creating a new sync session", async () => { + //#given + const { executeSync } = require("./sync-executor") + + const deps = { + createOrGetSession: mock(async () => ({ sessionID: "ses-test-789", isNew: true })), + waitForCompletion: mock(async () => {}), + processMessages: mock(async () => "agent response"), + setSessionFallbackChain: mock(() => {}), + } + + const spawnReservation = { + commit: mock(() => 1), + rollback: mock(() => {}), + } + + const args = { + subagent_type: "explore", + description: "test task", + prompt: "find something", + } + + const toolContext = { + sessionID: "parent-session", + messageID: "msg-4", + agent: "sisyphus", + abort: new AbortController().signal, + metadata: mock(async () => {}), + } + + const ctx = { + client: { + session: { + promptAsync: mock(async () => ({ data: {} })), + }, + }, + } + + //#when + await executeSync(args, toolContext, ctx as any, deps, undefined, spawnReservation) + + //#then + expect(spawnReservation.commit).toHaveBeenCalledTimes(1) + expect(spawnReservation.rollback).toHaveBeenCalledTimes(0) + }) }) + +export {} diff --git a/src/tools/call-omo-agent/sync-executor.ts b/src/tools/call-omo-agent/sync-executor.ts index 971a9e410..72e34d5aa 100644 --- a/src/tools/call-omo-agent/sync-executor.ts +++ b/src/tools/call-omo-agent/sync-executor.ts @@ -19,6 +19,11 @@ type ExecuteSyncDeps = { setSessionFallbackChain: typeof setSessionFallbackChain } +type SpawnReservation = { + commit: () => number + rollback: () => void +} + const defaultDeps: ExecuteSyncDeps = { createOrGetSession, waitForCompletion, @@ -33,54 +38,67 @@ export async function executeSync( messageID: string agent: string abort: AbortSignal - metadata?: (input: { title?: string; metadata?: Record }) => void + metadata?: (input: { title?: string; metadata?: Record }) => void | Promise }, ctx: PluginInput, deps: ExecuteSyncDeps = defaultDeps, fallbackChain?: FallbackEntry[], + spawnReservation?: SpawnReservation, ): Promise { - const { sessionID } = await deps.createOrGetSession(args, toolContext, ctx) - - if (fallbackChain && fallbackChain.length > 0) { - deps.setSessionFallbackChain(sessionID, fallbackChain) - } - - await toolContext.metadata?.({ - title: args.description, - metadata: { sessionId: sessionID }, - }) - - log(`[call_omo_agent] Sending prompt to session ${sessionID}`) - log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100)) + let sessionID: string | undefined try { - await (ctx.client.session as unknown as SessionWithPromptAsync).promptAsync({ - path: { id: sessionID }, - body: { - agent: args.subagent_type, - tools: { - ...getAgentToolRestrictions(args.subagent_type), - task: false, - question: false, - }, - parts: [{ type: "text", text: args.prompt }], - }, - }) - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error) - log(`[call_omo_agent] Prompt error:`, errorMessage) - if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) { - return `Error: Agent "${args.subagent_type}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.\n\n\nsession_id: ${sessionID}\n` + const session = await deps.createOrGetSession(args, toolContext, ctx) + sessionID = session.sessionID + + if (session.isNew) { + spawnReservation?.commit() } - return `Error: Failed to send prompt: ${errorMessage}\n\n\nsession_id: ${sessionID}\n` + + if (fallbackChain && fallbackChain.length > 0) { + deps.setSessionFallbackChain(sessionID, fallbackChain) + } + + await toolContext.metadata?.({ + title: args.description, + metadata: { sessionId: sessionID }, + }) + + log(`[call_omo_agent] Sending prompt to session ${sessionID}`) + log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100)) + + try { + await (ctx.client.session as unknown as SessionWithPromptAsync).promptAsync({ + path: { id: sessionID }, + body: { + agent: args.subagent_type, + tools: { + ...getAgentToolRestrictions(args.subagent_type), + task: false, + question: false, + }, + parts: [{ type: "text", text: args.prompt }], + }, + }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + log(`[call_omo_agent] Prompt error:`, errorMessage) + if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) { + return `Error: Agent "${args.subagent_type}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.\n\n\nsession_id: ${sessionID}\n` + } + return `Error: Failed to send prompt: ${errorMessage}\n\n\nsession_id: ${sessionID}\n` + } + + await deps.waitForCompletion(sessionID, toolContext, ctx) + + const responseText = await deps.processMessages(sessionID, ctx) + + const output = + responseText + "\n\n" + ["", `session_id: ${sessionID}`, ""].join("\n") + + return output + } catch (error) { + spawnReservation?.rollback() + throw error } - - await deps.waitForCompletion(sessionID, toolContext, ctx) - - const responseText = await deps.processMessages(sessionID, ctx) - - const output = - responseText + "\n\n" + ["", `session_id: ${sessionID}`, ""].join("\n") - - return output } diff --git a/src/tools/call-omo-agent/tools.test.ts b/src/tools/call-omo-agent/tools.test.ts index 4efbe9657..bbb215870 100644 --- a/src/tools/call-omo-agent/tools.test.ts +++ b/src/tools/call-omo-agent/tools.test.ts @@ -1,15 +1,24 @@ -import { describe, test, expect, mock } from "bun:test" -import type { PluginInput } from "@opencode-ai/plugin" -import type { BackgroundManager } from "../../features/background-agent" -import { createCallOmoAgent } from "./tools" +const { beforeEach, describe, test, expect, mock } = require("bun:test") +const { createCallOmoAgent } = require("./tools") describe("createCallOmoAgent", () => { + const assertCanSpawnMock = mock(() => Promise.resolve(undefined)) + const reserveCommitMock = mock(() => 1) + const reserveRollbackMock = mock(() => {}) + const reserveSubagentSpawnMock = mock(() => Promise.resolve({ + spawnContext: { rootSessionID: "root-session", parentDepth: 0, childDepth: 1 }, + descendantCount: 1, + commit: reserveCommitMock, + rollback: reserveRollbackMock, + })) const mockCtx = { client: {}, directory: "/test", - } as unknown as PluginInput + } const mockBackgroundManager = { + assertCanSpawn: assertCanSpawnMock, + reserveSubagentSpawn: reserveSubagentSpawnMock, launch: mock(() => Promise.resolve({ id: "test-task-id", sessionID: null, @@ -17,7 +26,14 @@ describe("createCallOmoAgent", () => { agent: "test-agent", status: "pending", })), - } as unknown as BackgroundManager + } + + beforeEach(() => { + assertCanSpawnMock.mockClear() + reserveSubagentSpawnMock.mockClear() + reserveCommitMock.mockClear() + reserveRollbackMock.mockClear() + }) test("should reject agent in disabled_agents list", async () => { //#given @@ -102,7 +118,7 @@ describe("createCallOmoAgent", () => { test("uses agent override fallback_models when launching background subagent", async () => { //#given - const launch = mock(() => Promise.resolve({ + const launch = mock((_input: { fallbackChain?: Array<{ providers: string[]; model: string; variant?: string }> }) => Promise.resolve({ id: "task-fallback", sessionID: "sub-session", description: "Test task", @@ -112,7 +128,7 @@ describe("createCallOmoAgent", () => { const managerWithLaunch = { launch, getTask: mock(() => undefined), - } as unknown as BackgroundManager + } const toolDef = createCallOmoAgent( mockCtx, managerWithLaunch, @@ -137,10 +153,38 @@ describe("createCallOmoAgent", () => { ) //#then - const launchArgs = launch.mock.calls[0]?.[0] + const firstLaunchCall = launch.mock.calls[0] + if (firstLaunchCall === undefined) { + throw new Error("Expected launch to be called") + } + + const [launchArgs] = firstLaunchCall expect(launchArgs.fallbackChain).toEqual([ { providers: ["quotio"], model: "kimi-k2.5", variant: undefined }, { providers: ["openai"], model: "gpt-5.2", variant: "high" }, ]) }) + + test("should return a tool error when sync spawn depth validation fails", async () => { + //#given + reserveSubagentSpawnMock.mockRejectedValueOnce(new Error("Subagent spawn blocked: child depth 4 exceeds background_task.maxDepth=3.")) + const toolDef = createCallOmoAgent(mockCtx, mockBackgroundManager, []) + const executeFunc = toolDef.execute as Function + + //#when + const result = await executeFunc( + { + description: "Test", + prompt: "Test prompt", + subagent_type: "explore", + run_in_background: false, + }, + { sessionID: "test", messageID: "msg", agent: "test", abort: new AbortController().signal }, + ) + + //#then + expect(result).toContain("background_task.maxDepth=3") + }) }) + +export {} diff --git a/src/tools/call-omo-agent/tools.ts b/src/tools/call-omo-agent/tools.ts index 1338282f1..14219dc27 100644 --- a/src/tools/call-omo-agent/tools.ts +++ b/src/tools/call-omo-agent/tools.ts @@ -95,6 +95,17 @@ export function createCallOmoAgent( return await executeBackground(args, toolCtx, backgroundManager, ctx.client, fallbackChain) } + if (!args.session_id) { + let spawnReservation: Awaited> | undefined + try { + spawnReservation = await backgroundManager.reserveSubagentSpawn(toolCtx.sessionID) + return await executeSync(args, toolCtx, ctx, undefined, fallbackChain, spawnReservation) + } catch (error) { + spawnReservation?.rollback() + return `Error: ${error instanceof Error ? error.message : String(error)}` + } + } + return await executeSync(args, toolCtx, ctx, undefined, fallbackChain) }, }) diff --git a/src/tools/delegate-task/sync-task.test.ts b/src/tools/delegate-task/sync-task.test.ts index d2fac4d6d..483a826b9 100644 --- a/src/tools/delegate-task/sync-task.test.ts +++ b/src/tools/delegate-task/sync-task.test.ts @@ -110,6 +110,65 @@ describe("executeSyncTask - cleanup on error paths", () => { expect(deleteCalls[0]).toBe("ses_test_12345678") }) + test("rolls back reserved descendant quota when sync session creation fails", async () => { + const mockClient = { + session: { + create: async () => ({ data: { id: "ses_test_12345678" } }), + }, + } + + const { executeSyncTask } = require("./sync-task") + + const commit = mock(() => 1) + const rollback = mock(() => {}) + const reserveSubagentSpawn = mock(async () => ({ + spawnContext: { rootSessionID: "parent-session", parentDepth: 0, childDepth: 1 }, + descendantCount: 1, + commit, + rollback, + })) + + const deps = { + createSyncSession: async () => ({ ok: false as const, error: "Failed to create session" }), + sendSyncPrompt: async () => null, + pollSyncSession: async () => null, + fetchSyncResult: async () => ({ ok: true as const, textContent: "Result" }), + } + + const mockCtx = { + sessionID: "parent-session", + callID: "call-123", + metadata: () => {}, + } + + const mockExecutorCtx = { + manager: { reserveSubagentSpawn }, + client: mockClient, + directory: "/tmp", + onSyncSessionCreated: null, + } + + const args = { + prompt: "test prompt", + description: "test task", + category: "test", + load_skills: [], + run_in_background: false, + command: null, + } + + //#when + const result = await executeSyncTask(args, mockCtx, mockExecutorCtx, { + sessionID: "parent-session", + }, "test-agent", undefined, undefined, undefined, undefined, deps) + + //#then + expect(result).toBe("Failed to create session") + expect(reserveSubagentSpawn).toHaveBeenCalledWith("parent-session") + expect(commit).toHaveBeenCalledTimes(0) + expect(rollback).toHaveBeenCalledTimes(1) + }) + test("cleans up toast and subagentSessions when pollSyncSession returns error", async () => { const mockClient = { session: { @@ -182,7 +241,18 @@ describe("executeSyncTask - cleanup on error paths", () => { metadata: () => {}, } + const commit = mock(() => 1) + const rollback = mock(() => {}) + const mockExecutorCtx = { + manager: { + reserveSubagentSpawn: mock(async () => ({ + spawnContext: { rootSessionID: "parent-session", parentDepth: 0, childDepth: 1 }, + descendantCount: 1, + commit, + rollback, + })), + }, client: mockClient, directory: "/tmp", onSyncSessionCreated: null, @@ -204,9 +274,14 @@ describe("executeSyncTask - cleanup on error paths", () => { //#then - should complete and cleanup resources expect(result).toContain("Task completed") + expect(mockExecutorCtx.manager.reserveSubagentSpawn).toHaveBeenCalledWith("parent-session") + expect(commit).toHaveBeenCalledTimes(1) + expect(rollback).toHaveBeenCalledTimes(0) expect(removeTaskCalls.length).toBe(1) expect(removeTaskCalls[0]).toBe("sync_ses_test") expect(deleteCalls.length).toBe(1) expect(deleteCalls[0]).toBe("ses_test_12345678") }) }) + +export {} diff --git a/src/tools/delegate-task/sync-task.ts b/src/tools/delegate-task/sync-task.ts index 115d2c57d..fa5fad4c0 100644 --- a/src/tools/delegate-task/sync-task.ts +++ b/src/tools/delegate-task/sync-task.ts @@ -23,12 +23,28 @@ export async function executeSyncTask( fallbackChain?: import("../../shared/model-requirements").FallbackEntry[], deps: SyncTaskDeps = syncTaskDeps ): Promise { - const { client, directory, onSyncSessionCreated, syncPollTimeoutMs } = executorCtx + const { manager, client, directory, onSyncSessionCreated, syncPollTimeoutMs } = executorCtx const toastManager = getTaskToastManager() let taskId: string | undefined let syncSessionID: string | undefined + let spawnReservation: + | Awaited> + | undefined try { + if (typeof manager?.reserveSubagentSpawn === "function") { + spawnReservation = await manager.reserveSubagentSpawn(parentContext.sessionID) + } + + const spawnContext = spawnReservation?.spawnContext + ?? (typeof manager?.assertCanSpawn === "function" + ? await manager.assertCanSpawn(parentContext.sessionID) + : { + rootSessionID: parentContext.sessionID, + parentDepth: 0, + childDepth: 1, + }) + const createSessionResult = await deps.createSyncSession(client, { parentSessionID: parentContext.sessionID, agentToUse, @@ -37,10 +53,12 @@ export async function executeSyncTask( }) if (!createSessionResult.ok) { + spawnReservation?.rollback() return createSessionResult.error } const sessionID = createSessionResult.sessionID + spawnReservation?.commit() syncSessionID = sessionID subagentSessions.add(sessionID) syncSubagentSessions.add(sessionID) @@ -90,6 +108,7 @@ export async function executeSyncTask( run_in_background: args.run_in_background, sessionId: sessionID, sync: true, + spawnDepth: spawnContext.childDepth, command: args.command, model: categoryModel ? { providerID: categoryModel.providerID, modelID: categoryModel.modelID } : undefined, }, @@ -147,6 +166,7 @@ session_id: ${sessionID} } } } catch (error) { + spawnReservation?.rollback() return formatDetailedError(error, { operation: "Execute task", args,