Merge pull request #2449 from code-yeongyu/fix/issue-2330-v2
fix(background-agent): cap recursive subagent spawning
This commit is contained in:
@@ -3678,6 +3678,16 @@
|
||||
"minimum": 0
|
||||
}
|
||||
},
|
||||
"maxDepth": {
|
||||
"type": "integer",
|
||||
"minimum": 1,
|
||||
"maximum": 9007199254740991
|
||||
},
|
||||
"maxDescendants": {
|
||||
"type": "integer",
|
||||
"minimum": 1,
|
||||
"maximum": 9007199254740991
|
||||
},
|
||||
"staleTimeoutMs": {
|
||||
"type": "number",
|
||||
"minimum": 60000
|
||||
|
||||
@@ -3,6 +3,54 @@ import { ZodError } from "zod/v4"
|
||||
import { BackgroundTaskConfigSchema } from "./background-task"
|
||||
|
||||
describe("BackgroundTaskConfigSchema", () => {
|
||||
describe("maxDepth", () => {
|
||||
describe("#given valid maxDepth (3)", () => {
|
||||
test("#when parsed #then returns correct value", () => {
|
||||
const result = BackgroundTaskConfigSchema.parse({ maxDepth: 3 })
|
||||
|
||||
expect(result.maxDepth).toBe(3)
|
||||
})
|
||||
})
|
||||
|
||||
describe("#given maxDepth below minimum (0)", () => {
|
||||
test("#when parsed #then throws ZodError", () => {
|
||||
let thrownError: unknown
|
||||
|
||||
try {
|
||||
BackgroundTaskConfigSchema.parse({ maxDepth: 0 })
|
||||
} catch (error) {
|
||||
thrownError = error
|
||||
}
|
||||
|
||||
expect(thrownError).toBeInstanceOf(ZodError)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("maxDescendants", () => {
|
||||
describe("#given valid maxDescendants (50)", () => {
|
||||
test("#when parsed #then returns correct value", () => {
|
||||
const result = BackgroundTaskConfigSchema.parse({ maxDescendants: 50 })
|
||||
|
||||
expect(result.maxDescendants).toBe(50)
|
||||
})
|
||||
})
|
||||
|
||||
describe("#given maxDescendants below minimum (0)", () => {
|
||||
test("#when parsed #then throws ZodError", () => {
|
||||
let thrownError: unknown
|
||||
|
||||
try {
|
||||
BackgroundTaskConfigSchema.parse({ maxDescendants: 0 })
|
||||
} catch (error) {
|
||||
thrownError = error
|
||||
}
|
||||
|
||||
expect(thrownError).toBeInstanceOf(ZodError)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("syncPollTimeoutMs", () => {
|
||||
describe("#given valid syncPollTimeoutMs (120000)", () => {
|
||||
test("#when parsed #then returns correct value", () => {
|
||||
|
||||
@@ -4,6 +4,8 @@ export const BackgroundTaskConfigSchema = z.object({
|
||||
defaultConcurrency: z.number().min(1).optional(),
|
||||
providerConcurrency: z.record(z.string(), z.number().min(0)).optional(),
|
||||
modelConcurrency: z.record(z.string(), z.number().min(0)).optional(),
|
||||
maxDepth: z.number().int().min(1).optional(),
|
||||
maxDescendants: z.number().int().min(1).optional(),
|
||||
/** Stale timeout in milliseconds - interrupt tasks with no activity for this duration (default: 180000 = 3 minutes, minimum: 60000 = 1 minute) */
|
||||
staleTimeoutMs: z.number().min(60000).optional(),
|
||||
/** Timeout for tasks that never received any progress update, falling back to startedAt (default: 600000 = 10 minutes, minimum: 60000 = 1 minute) */
|
||||
|
||||
@@ -1746,6 +1746,32 @@ describe("BackgroundManager - Non-blocking Queue Integration", () => {
|
||||
}
|
||||
}
|
||||
|
||||
function createMockClientWithSessionChain(
|
||||
sessions: Record<string, { directory: string; parentID?: string }>,
|
||||
options?: { sessionLookupError?: Error }
|
||||
) {
|
||||
return {
|
||||
session: {
|
||||
create: async (_args?: any) => ({ data: { id: `ses_${crypto.randomUUID()}` } }),
|
||||
get: async ({ path }: { path: { id: string } }) => {
|
||||
if (options?.sessionLookupError) {
|
||||
throw options.sessionLookupError
|
||||
}
|
||||
|
||||
return {
|
||||
data: sessions[path.id] ?? { directory: "/test/dir" },
|
||||
}
|
||||
},
|
||||
prompt: async () => ({}),
|
||||
promptAsync: async () => ({}),
|
||||
messages: async () => ({ data: [] }),
|
||||
todo: async () => ({ data: [] }),
|
||||
status: async () => ({ data: {} }),
|
||||
abort: async () => ({}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
// given
|
||||
mockClient = createMockClient()
|
||||
@@ -1940,6 +1966,151 @@ describe("BackgroundManager - Non-blocking Queue Integration", () => {
|
||||
expect(updatedTask.startedAt.getTime()).toBeGreaterThanOrEqual(queuedAt.getTime())
|
||||
}
|
||||
})
|
||||
|
||||
test("should track rootSessionID and spawnDepth from the parent chain", async () => {
|
||||
// given
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager(
|
||||
{
|
||||
client: createMockClientWithSessionChain({
|
||||
"session-depth-2": { directory: "/test/dir", parentID: "session-depth-1" },
|
||||
"session-depth-1": { directory: "/test/dir", parentID: "session-root" },
|
||||
"session-root": { directory: "/test/dir" },
|
||||
}),
|
||||
directory: tmpdir(),
|
||||
} as unknown as PluginInput,
|
||||
{ maxDepth: 3 },
|
||||
)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "session-depth-2",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// when
|
||||
const task = await manager.launch(input)
|
||||
|
||||
// then
|
||||
expect(task.rootSessionID).toBe("session-root")
|
||||
expect(task.spawnDepth).toBe(3)
|
||||
})
|
||||
|
||||
test("should block launches that exceed maxDepth", async () => {
|
||||
// given
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager(
|
||||
{
|
||||
client: createMockClientWithSessionChain({
|
||||
"session-depth-3": { directory: "/test/dir", parentID: "session-depth-2" },
|
||||
"session-depth-2": { directory: "/test/dir", parentID: "session-depth-1" },
|
||||
"session-depth-1": { directory: "/test/dir", parentID: "session-root" },
|
||||
"session-root": { directory: "/test/dir" },
|
||||
}),
|
||||
directory: tmpdir(),
|
||||
} as unknown as PluginInput,
|
||||
{ maxDepth: 3 },
|
||||
)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "session-depth-3",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// when
|
||||
const result = manager.launch(input)
|
||||
|
||||
// then
|
||||
await expect(result).rejects.toThrow("background_task.maxDepth=3")
|
||||
})
|
||||
|
||||
test("should block launches when maxDescendants is reached", async () => {
|
||||
// given
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager(
|
||||
{
|
||||
client: createMockClientWithSessionChain({
|
||||
"session-root": { directory: "/test/dir" },
|
||||
}),
|
||||
directory: tmpdir(),
|
||||
} as unknown as PluginInput,
|
||||
{ maxDescendants: 1 },
|
||||
)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "session-root",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
await manager.launch(input)
|
||||
|
||||
// when
|
||||
const result = manager.launch(input)
|
||||
|
||||
// then
|
||||
await expect(result).rejects.toThrow("background_task.maxDescendants=1")
|
||||
})
|
||||
|
||||
test("should consume descendant quota for reserved sync spawns", async () => {
|
||||
// given
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager(
|
||||
{
|
||||
client: createMockClientWithSessionChain({
|
||||
"session-root": { directory: "/test/dir" },
|
||||
}),
|
||||
directory: tmpdir(),
|
||||
} as unknown as PluginInput,
|
||||
{ maxDescendants: 1 },
|
||||
)
|
||||
|
||||
await manager.reserveSubagentSpawn("session-root")
|
||||
|
||||
// when
|
||||
const result = manager.assertCanSpawn("session-root")
|
||||
|
||||
// then
|
||||
await expect(result).rejects.toThrow("background_task.maxDescendants=1")
|
||||
})
|
||||
|
||||
test("should fail closed when session lineage lookup fails", async () => {
|
||||
// given
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager(
|
||||
{
|
||||
client: createMockClientWithSessionChain(
|
||||
{
|
||||
"session-root": { directory: "/test/dir" },
|
||||
},
|
||||
{ sessionLookupError: new Error("session lookup failed") }
|
||||
),
|
||||
directory: tmpdir(),
|
||||
} as unknown as PluginInput,
|
||||
{ maxDescendants: 1 },
|
||||
)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "session-root",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// when
|
||||
const result = manager.launch(input)
|
||||
|
||||
// then
|
||||
await expect(result).rejects.toThrow("background_task.maxDescendants cannot be enforced safely")
|
||||
})
|
||||
})
|
||||
|
||||
describe("pending task can be cancelled", () => {
|
||||
|
||||
@@ -51,6 +51,14 @@ import { join } from "node:path"
|
||||
import { pruneStaleTasksAndNotifications } from "./task-poller"
|
||||
import { checkAndInterruptStaleTasks } from "./task-poller"
|
||||
import { removeTaskToastTracking } from "./remove-task-toast-tracking"
|
||||
import {
|
||||
createSubagentDepthLimitError,
|
||||
createSubagentDescendantLimitError,
|
||||
getMaxRootSessionSpawnBudget,
|
||||
getMaxSubagentDepth,
|
||||
resolveSubagentSpawnContext,
|
||||
type SubagentSpawnContext,
|
||||
} from "./subagent-spawn-limits"
|
||||
|
||||
type OpencodeClient = PluginInput["client"]
|
||||
|
||||
@@ -115,6 +123,7 @@ export class BackgroundManager {
|
||||
private completionTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||
private idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||
private notificationQueueByParent: Map<string, Promise<void>> = new Map()
|
||||
private rootDescendantCounts: Map<string, number>
|
||||
private enableParentSessionNotifications: boolean
|
||||
readonly taskHistory = new TaskHistory()
|
||||
|
||||
@@ -139,10 +148,77 @@ export class BackgroundManager {
|
||||
this.tmuxEnabled = options?.tmuxConfig?.enabled ?? false
|
||||
this.onSubagentSessionCreated = options?.onSubagentSessionCreated
|
||||
this.onShutdown = options?.onShutdown
|
||||
this.rootDescendantCounts = new Map()
|
||||
this.enableParentSessionNotifications = options?.enableParentSessionNotifications ?? true
|
||||
this.registerProcessCleanup()
|
||||
}
|
||||
|
||||
async assertCanSpawn(parentSessionID: string): Promise<SubagentSpawnContext> {
|
||||
const spawnContext = await resolveSubagentSpawnContext(this.client, parentSessionID)
|
||||
const maxDepth = getMaxSubagentDepth(this.config)
|
||||
if (spawnContext.childDepth > maxDepth) {
|
||||
throw createSubagentDepthLimitError({
|
||||
childDepth: spawnContext.childDepth,
|
||||
maxDepth,
|
||||
parentSessionID,
|
||||
rootSessionID: spawnContext.rootSessionID,
|
||||
})
|
||||
}
|
||||
|
||||
const maxRootSessionSpawnBudget = getMaxRootSessionSpawnBudget(this.config)
|
||||
const descendantCount = this.rootDescendantCounts.get(spawnContext.rootSessionID) ?? 0
|
||||
if (descendantCount >= maxRootSessionSpawnBudget) {
|
||||
throw createSubagentDescendantLimitError({
|
||||
rootSessionID: spawnContext.rootSessionID,
|
||||
descendantCount,
|
||||
maxDescendants: maxRootSessionSpawnBudget,
|
||||
})
|
||||
}
|
||||
|
||||
return spawnContext
|
||||
}
|
||||
|
||||
async reserveSubagentSpawn(parentSessionID: string): Promise<{
|
||||
spawnContext: SubagentSpawnContext
|
||||
descendantCount: number
|
||||
commit: () => number
|
||||
rollback: () => void
|
||||
}> {
|
||||
const spawnContext = await this.assertCanSpawn(parentSessionID)
|
||||
const descendantCount = this.registerRootDescendant(spawnContext.rootSessionID)
|
||||
let settled = false
|
||||
|
||||
return {
|
||||
spawnContext,
|
||||
descendantCount,
|
||||
commit: () => {
|
||||
settled = true
|
||||
return descendantCount
|
||||
},
|
||||
rollback: () => {
|
||||
if (settled) return
|
||||
settled = true
|
||||
this.unregisterRootDescendant(spawnContext.rootSessionID)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
private registerRootDescendant(rootSessionID: string): number {
|
||||
const nextCount = (this.rootDescendantCounts.get(rootSessionID) ?? 0) + 1
|
||||
this.rootDescendantCounts.set(rootSessionID, nextCount)
|
||||
return nextCount
|
||||
}
|
||||
|
||||
private unregisterRootDescendant(rootSessionID: string): void {
|
||||
const currentCount = this.rootDescendantCounts.get(rootSessionID) ?? 0
|
||||
if (currentCount <= 1) {
|
||||
this.rootDescendantCounts.delete(rootSessionID)
|
||||
return
|
||||
}
|
||||
|
||||
this.rootDescendantCounts.set(rootSessionID, currentCount - 1)
|
||||
}
|
||||
|
||||
async launch(input: LaunchInput): Promise<BackgroundTask> {
|
||||
log("[background-agent] launch() called with:", {
|
||||
agent: input.agent,
|
||||
@@ -155,61 +231,79 @@ export class BackgroundManager {
|
||||
throw new Error("Agent parameter is required")
|
||||
}
|
||||
|
||||
// Create task immediately with status="pending"
|
||||
const task: BackgroundTask = {
|
||||
id: `bg_${crypto.randomUUID().slice(0, 8)}`,
|
||||
status: "pending",
|
||||
queuedAt: new Date(),
|
||||
// Do NOT set startedAt - will be set when running
|
||||
// Do NOT set sessionID - will be set when running
|
||||
description: input.description,
|
||||
prompt: input.prompt,
|
||||
agent: input.agent,
|
||||
parentSessionID: input.parentSessionID,
|
||||
parentMessageID: input.parentMessageID,
|
||||
parentModel: input.parentModel,
|
||||
parentAgent: input.parentAgent,
|
||||
parentTools: input.parentTools,
|
||||
model: input.model,
|
||||
fallbackChain: input.fallbackChain,
|
||||
attemptCount: 0,
|
||||
category: input.category,
|
||||
}
|
||||
const spawnReservation = await this.reserveSubagentSpawn(input.parentSessionID)
|
||||
|
||||
this.tasks.set(task.id, task)
|
||||
this.taskHistory.record(input.parentSessionID, { id: task.id, agent: input.agent, description: input.description, status: "pending", category: input.category })
|
||||
|
||||
// Track for batched notifications immediately (pending state)
|
||||
if (input.parentSessionID) {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(task.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
}
|
||||
|
||||
// Add to queue
|
||||
const key = this.getConcurrencyKeyFromInput(input)
|
||||
const queue = this.queuesByKey.get(key) ?? []
|
||||
queue.push({ task, input })
|
||||
this.queuesByKey.set(key, queue)
|
||||
|
||||
log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: queue.length })
|
||||
|
||||
const toastManager = getTaskToastManager()
|
||||
if (toastManager) {
|
||||
toastManager.addTask({
|
||||
id: task.id,
|
||||
description: input.description,
|
||||
agent: input.agent,
|
||||
isBackground: true,
|
||||
status: "queued",
|
||||
skills: input.skills,
|
||||
try {
|
||||
log("[background-agent] spawn guard passed", {
|
||||
parentSessionID: input.parentSessionID,
|
||||
rootSessionID: spawnReservation.spawnContext.rootSessionID,
|
||||
childDepth: spawnReservation.spawnContext.childDepth,
|
||||
descendantCount: spawnReservation.descendantCount,
|
||||
})
|
||||
|
||||
// Create task immediately with status="pending"
|
||||
const task: BackgroundTask = {
|
||||
id: `bg_${crypto.randomUUID().slice(0, 8)}`,
|
||||
status: "pending",
|
||||
queuedAt: new Date(),
|
||||
rootSessionID: spawnReservation.spawnContext.rootSessionID,
|
||||
// Do NOT set startedAt - will be set when running
|
||||
// Do NOT set sessionID - will be set when running
|
||||
description: input.description,
|
||||
prompt: input.prompt,
|
||||
agent: input.agent,
|
||||
spawnDepth: spawnReservation.spawnContext.childDepth,
|
||||
parentSessionID: input.parentSessionID,
|
||||
parentMessageID: input.parentMessageID,
|
||||
parentModel: input.parentModel,
|
||||
parentAgent: input.parentAgent,
|
||||
parentTools: input.parentTools,
|
||||
model: input.model,
|
||||
fallbackChain: input.fallbackChain,
|
||||
attemptCount: 0,
|
||||
category: input.category,
|
||||
}
|
||||
|
||||
this.tasks.set(task.id, task)
|
||||
this.taskHistory.record(input.parentSessionID, { id: task.id, agent: input.agent, description: input.description, status: "pending", category: input.category })
|
||||
|
||||
// Track for batched notifications immediately (pending state)
|
||||
if (input.parentSessionID) {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(task.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
}
|
||||
|
||||
// Add to queue
|
||||
const key = this.getConcurrencyKeyFromInput(input)
|
||||
const queue = this.queuesByKey.get(key) ?? []
|
||||
queue.push({ task, input })
|
||||
this.queuesByKey.set(key, queue)
|
||||
|
||||
log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: queue.length })
|
||||
|
||||
const toastManager = getTaskToastManager()
|
||||
if (toastManager) {
|
||||
toastManager.addTask({
|
||||
id: task.id,
|
||||
description: input.description,
|
||||
agent: input.agent,
|
||||
isBackground: true,
|
||||
status: "queued",
|
||||
skills: input.skills,
|
||||
})
|
||||
}
|
||||
|
||||
spawnReservation.commit()
|
||||
|
||||
// Trigger processing (fire-and-forget)
|
||||
this.processKey(key)
|
||||
|
||||
return { ...task }
|
||||
} catch (error) {
|
||||
spawnReservation.rollback()
|
||||
throw error
|
||||
}
|
||||
|
||||
// Trigger processing (fire-and-forget)
|
||||
this.processKey(key)
|
||||
|
||||
return task
|
||||
}
|
||||
|
||||
private async processKey(key: string): Promise<void> {
|
||||
@@ -865,6 +959,7 @@ export class BackgroundManager {
|
||||
}
|
||||
}
|
||||
|
||||
this.rootDescendantCounts.delete(sessionID)
|
||||
SessionCategoryRegistry.remove(sessionID)
|
||||
}
|
||||
|
||||
@@ -1411,14 +1506,6 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
}
|
||||
}
|
||||
|
||||
private formatDuration(start: Date, end?: Date): string {
|
||||
return formatDuration(start, end)
|
||||
}
|
||||
|
||||
private isAbortedSessionError(error: unknown): boolean {
|
||||
return isAbortedSessionError(error)
|
||||
}
|
||||
|
||||
private hasRunningTasks(): boolean {
|
||||
for (const task of this.tasks.values()) {
|
||||
if (task.status === "running") return true
|
||||
@@ -1618,6 +1705,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
this.pendingNotifications.clear()
|
||||
this.pendingByParent.clear()
|
||||
this.notificationQueueByParent.clear()
|
||||
this.rootDescendantCounts.clear()
|
||||
this.queuesByKey.clear()
|
||||
this.processingKeys.clear()
|
||||
this.unregisterProcessCleanup()
|
||||
|
||||
87
src/features/background-agent/subagent-spawn-limits.ts
Normal file
87
src/features/background-agent/subagent-spawn-limits.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import type { BackgroundTaskConfig } from "../../config/schema"
|
||||
import type { OpencodeClient } from "./constants"
|
||||
|
||||
export const DEFAULT_MAX_SUBAGENT_DEPTH = 3
|
||||
export const DEFAULT_MAX_ROOT_SESSION_SPAWN_BUDGET = 50
|
||||
|
||||
export interface SubagentSpawnContext {
|
||||
rootSessionID: string
|
||||
parentDepth: number
|
||||
childDepth: number
|
||||
}
|
||||
|
||||
export function getMaxSubagentDepth(config?: BackgroundTaskConfig): number {
|
||||
return config?.maxDepth ?? DEFAULT_MAX_SUBAGENT_DEPTH
|
||||
}
|
||||
|
||||
export function getMaxRootSessionSpawnBudget(config?: BackgroundTaskConfig): number {
|
||||
return config?.maxDescendants ?? DEFAULT_MAX_ROOT_SESSION_SPAWN_BUDGET
|
||||
}
|
||||
|
||||
export async function resolveSubagentSpawnContext(
|
||||
client: OpencodeClient,
|
||||
parentSessionID: string
|
||||
): Promise<SubagentSpawnContext> {
|
||||
const visitedSessionIDs = new Set<string>()
|
||||
let rootSessionID = parentSessionID
|
||||
let currentSessionID = parentSessionID
|
||||
let parentDepth = 0
|
||||
|
||||
while (true) {
|
||||
if (visitedSessionIDs.has(currentSessionID)) {
|
||||
throw new Error(`Detected a session parent cycle while resolving ${parentSessionID}`)
|
||||
}
|
||||
|
||||
visitedSessionIDs.add(currentSessionID)
|
||||
|
||||
let nextParentSessionID: string | undefined
|
||||
try {
|
||||
const session = await client.session.get({
|
||||
path: { id: currentSessionID },
|
||||
})
|
||||
nextParentSessionID = session.data?.parentID
|
||||
} catch (error) {
|
||||
const reason = error instanceof Error ? error.message : String(error)
|
||||
throw new Error(
|
||||
`Subagent spawn blocked: failed to resolve session lineage for ${parentSessionID}, so background_task.maxDescendants cannot be enforced safely. ${reason}`
|
||||
)
|
||||
}
|
||||
|
||||
if (!nextParentSessionID) {
|
||||
rootSessionID = currentSessionID
|
||||
break
|
||||
}
|
||||
|
||||
currentSessionID = nextParentSessionID
|
||||
parentDepth += 1
|
||||
}
|
||||
|
||||
return {
|
||||
rootSessionID,
|
||||
parentDepth,
|
||||
childDepth: parentDepth + 1,
|
||||
}
|
||||
}
|
||||
|
||||
export function createSubagentDepthLimitError(input: {
|
||||
childDepth: number
|
||||
maxDepth: number
|
||||
parentSessionID: string
|
||||
rootSessionID: string
|
||||
}): Error {
|
||||
const { childDepth, maxDepth, parentSessionID, rootSessionID } = input
|
||||
return new Error(
|
||||
`Subagent spawn blocked: child depth ${childDepth} exceeds background_task.maxDepth=${maxDepth}. Parent session: ${parentSessionID}. Root session: ${rootSessionID}. Continue in an existing subagent session instead of spawning another.`
|
||||
)
|
||||
}
|
||||
|
||||
export function createSubagentDescendantLimitError(input: {
|
||||
rootSessionID: string
|
||||
descendantCount: number
|
||||
maxDescendants: number
|
||||
}): Error {
|
||||
const { rootSessionID, descendantCount, maxDescendants } = input
|
||||
return new Error(
|
||||
`Subagent spawn blocked: root session ${rootSessionID} already has ${descendantCount} descendants, which meets background_task.maxDescendants=${maxDescendants}. Reuse an existing session instead of spawning another.`
|
||||
)
|
||||
}
|
||||
@@ -20,11 +20,13 @@ export interface TaskProgress {
|
||||
export interface BackgroundTask {
|
||||
id: string
|
||||
sessionID?: string
|
||||
rootSessionID?: string
|
||||
parentSessionID: string
|
||||
parentMessageID: string
|
||||
description: string
|
||||
prompt: string
|
||||
agent: string
|
||||
spawnDepth?: number
|
||||
status: BackgroundTaskStatus
|
||||
queuedAt?: Date
|
||||
startedAt?: Date
|
||||
|
||||
@@ -12,4 +12,4 @@ export const CALL_OMO_AGENT_DESCRIPTION = `Spawn explore/librarian agent. run_in
|
||||
|
||||
Available: {agents}
|
||||
|
||||
Pass \`session_id=<id>\` to continue previous agent with full context. Prompts MUST be in English. Use \`background_output\` for async results.`
|
||||
Pass \`session_id=<id>\` to continue previous agent with full context. Nested subagent depth is tracked automatically and blocked past the configured limit. Prompts MUST be in English. Use \`background_output\` for async results.`
|
||||
|
||||
@@ -249,4 +249,52 @@ describe("executeSync", () => {
|
||||
expect(deps.waitForCompletion).not.toHaveBeenCalled()
|
||||
expect(deps.processMessages).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
test("commits reserved descendant quota after creating a new sync session", async () => {
|
||||
//#given
|
||||
const { executeSync } = require("./sync-executor")
|
||||
|
||||
const deps = {
|
||||
createOrGetSession: mock(async () => ({ sessionID: "ses-test-789", isNew: true })),
|
||||
waitForCompletion: mock(async () => {}),
|
||||
processMessages: mock(async () => "agent response"),
|
||||
setSessionFallbackChain: mock(() => {}),
|
||||
}
|
||||
|
||||
const spawnReservation = {
|
||||
commit: mock(() => 1),
|
||||
rollback: mock(() => {}),
|
||||
}
|
||||
|
||||
const args = {
|
||||
subagent_type: "explore",
|
||||
description: "test task",
|
||||
prompt: "find something",
|
||||
}
|
||||
|
||||
const toolContext = {
|
||||
sessionID: "parent-session",
|
||||
messageID: "msg-4",
|
||||
agent: "sisyphus",
|
||||
abort: new AbortController().signal,
|
||||
metadata: mock(async () => {}),
|
||||
}
|
||||
|
||||
const ctx = {
|
||||
client: {
|
||||
session: {
|
||||
promptAsync: mock(async () => ({ data: {} })),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
//#when
|
||||
await executeSync(args, toolContext, ctx as any, deps, undefined, spawnReservation)
|
||||
|
||||
//#then
|
||||
expect(spawnReservation.commit).toHaveBeenCalledTimes(1)
|
||||
expect(spawnReservation.rollback).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
})
|
||||
|
||||
export {}
|
||||
|
||||
@@ -19,6 +19,11 @@ type ExecuteSyncDeps = {
|
||||
setSessionFallbackChain: typeof setSessionFallbackChain
|
||||
}
|
||||
|
||||
type SpawnReservation = {
|
||||
commit: () => number
|
||||
rollback: () => void
|
||||
}
|
||||
|
||||
const defaultDeps: ExecuteSyncDeps = {
|
||||
createOrGetSession,
|
||||
waitForCompletion,
|
||||
@@ -33,54 +38,67 @@ export async function executeSync(
|
||||
messageID: string
|
||||
agent: string
|
||||
abort: AbortSignal
|
||||
metadata?: (input: { title?: string; metadata?: Record<string, unknown> }) => void
|
||||
metadata?: (input: { title?: string; metadata?: Record<string, unknown> }) => void | Promise<void>
|
||||
},
|
||||
ctx: PluginInput,
|
||||
deps: ExecuteSyncDeps = defaultDeps,
|
||||
fallbackChain?: FallbackEntry[],
|
||||
spawnReservation?: SpawnReservation,
|
||||
): Promise<string> {
|
||||
const { sessionID } = await deps.createOrGetSession(args, toolContext, ctx)
|
||||
|
||||
if (fallbackChain && fallbackChain.length > 0) {
|
||||
deps.setSessionFallbackChain(sessionID, fallbackChain)
|
||||
}
|
||||
|
||||
await toolContext.metadata?.({
|
||||
title: args.description,
|
||||
metadata: { sessionId: sessionID },
|
||||
})
|
||||
|
||||
log(`[call_omo_agent] Sending prompt to session ${sessionID}`)
|
||||
log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100))
|
||||
let sessionID: string | undefined
|
||||
|
||||
try {
|
||||
await (ctx.client.session as unknown as SessionWithPromptAsync).promptAsync({
|
||||
path: { id: sessionID },
|
||||
body: {
|
||||
agent: args.subagent_type,
|
||||
tools: {
|
||||
...getAgentToolRestrictions(args.subagent_type),
|
||||
task: false,
|
||||
question: false,
|
||||
},
|
||||
parts: [{ type: "text", text: args.prompt }],
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
log(`[call_omo_agent] Prompt error:`, errorMessage)
|
||||
if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) {
|
||||
return `Error: Agent "${args.subagent_type}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
const session = await deps.createOrGetSession(args, toolContext, ctx)
|
||||
sessionID = session.sessionID
|
||||
|
||||
if (session.isNew) {
|
||||
spawnReservation?.commit()
|
||||
}
|
||||
return `Error: Failed to send prompt: ${errorMessage}\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
|
||||
if (fallbackChain && fallbackChain.length > 0) {
|
||||
deps.setSessionFallbackChain(sessionID, fallbackChain)
|
||||
}
|
||||
|
||||
await toolContext.metadata?.({
|
||||
title: args.description,
|
||||
metadata: { sessionId: sessionID },
|
||||
})
|
||||
|
||||
log(`[call_omo_agent] Sending prompt to session ${sessionID}`)
|
||||
log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100))
|
||||
|
||||
try {
|
||||
await (ctx.client.session as unknown as SessionWithPromptAsync).promptAsync({
|
||||
path: { id: sessionID },
|
||||
body: {
|
||||
agent: args.subagent_type,
|
||||
tools: {
|
||||
...getAgentToolRestrictions(args.subagent_type),
|
||||
task: false,
|
||||
question: false,
|
||||
},
|
||||
parts: [{ type: "text", text: args.prompt }],
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
log(`[call_omo_agent] Prompt error:`, errorMessage)
|
||||
if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) {
|
||||
return `Error: Agent "${args.subagent_type}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
}
|
||||
return `Error: Failed to send prompt: ${errorMessage}\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
}
|
||||
|
||||
await deps.waitForCompletion(sessionID, toolContext, ctx)
|
||||
|
||||
const responseText = await deps.processMessages(sessionID, ctx)
|
||||
|
||||
const output =
|
||||
responseText + "\n\n" + ["<task_metadata>", `session_id: ${sessionID}`, "</task_metadata>"].join("\n")
|
||||
|
||||
return output
|
||||
} catch (error) {
|
||||
spawnReservation?.rollback()
|
||||
throw error
|
||||
}
|
||||
|
||||
await deps.waitForCompletion(sessionID, toolContext, ctx)
|
||||
|
||||
const responseText = await deps.processMessages(sessionID, ctx)
|
||||
|
||||
const output =
|
||||
responseText + "\n\n" + ["<task_metadata>", `session_id: ${sessionID}`, "</task_metadata>"].join("\n")
|
||||
|
||||
return output
|
||||
}
|
||||
|
||||
@@ -1,15 +1,24 @@
|
||||
import { describe, test, expect, mock } from "bun:test"
|
||||
import type { PluginInput } from "@opencode-ai/plugin"
|
||||
import type { BackgroundManager } from "../../features/background-agent"
|
||||
import { createCallOmoAgent } from "./tools"
|
||||
const { beforeEach, describe, test, expect, mock } = require("bun:test")
|
||||
const { createCallOmoAgent } = require("./tools")
|
||||
|
||||
describe("createCallOmoAgent", () => {
|
||||
const assertCanSpawnMock = mock(() => Promise.resolve(undefined))
|
||||
const reserveCommitMock = mock(() => 1)
|
||||
const reserveRollbackMock = mock(() => {})
|
||||
const reserveSubagentSpawnMock = mock(() => Promise.resolve({
|
||||
spawnContext: { rootSessionID: "root-session", parentDepth: 0, childDepth: 1 },
|
||||
descendantCount: 1,
|
||||
commit: reserveCommitMock,
|
||||
rollback: reserveRollbackMock,
|
||||
}))
|
||||
const mockCtx = {
|
||||
client: {},
|
||||
directory: "/test",
|
||||
} as unknown as PluginInput
|
||||
}
|
||||
|
||||
const mockBackgroundManager = {
|
||||
assertCanSpawn: assertCanSpawnMock,
|
||||
reserveSubagentSpawn: reserveSubagentSpawnMock,
|
||||
launch: mock(() => Promise.resolve({
|
||||
id: "test-task-id",
|
||||
sessionID: null,
|
||||
@@ -17,7 +26,14 @@ describe("createCallOmoAgent", () => {
|
||||
agent: "test-agent",
|
||||
status: "pending",
|
||||
})),
|
||||
} as unknown as BackgroundManager
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
assertCanSpawnMock.mockClear()
|
||||
reserveSubagentSpawnMock.mockClear()
|
||||
reserveCommitMock.mockClear()
|
||||
reserveRollbackMock.mockClear()
|
||||
})
|
||||
|
||||
test("should reject agent in disabled_agents list", async () => {
|
||||
//#given
|
||||
@@ -102,7 +118,7 @@ describe("createCallOmoAgent", () => {
|
||||
|
||||
test("uses agent override fallback_models when launching background subagent", async () => {
|
||||
//#given
|
||||
const launch = mock(() => Promise.resolve({
|
||||
const launch = mock((_input: { fallbackChain?: Array<{ providers: string[]; model: string; variant?: string }> }) => Promise.resolve({
|
||||
id: "task-fallback",
|
||||
sessionID: "sub-session",
|
||||
description: "Test task",
|
||||
@@ -112,7 +128,7 @@ describe("createCallOmoAgent", () => {
|
||||
const managerWithLaunch = {
|
||||
launch,
|
||||
getTask: mock(() => undefined),
|
||||
} as unknown as BackgroundManager
|
||||
}
|
||||
const toolDef = createCallOmoAgent(
|
||||
mockCtx,
|
||||
managerWithLaunch,
|
||||
@@ -137,10 +153,38 @@ describe("createCallOmoAgent", () => {
|
||||
)
|
||||
|
||||
//#then
|
||||
const launchArgs = launch.mock.calls[0]?.[0]
|
||||
const firstLaunchCall = launch.mock.calls[0]
|
||||
if (firstLaunchCall === undefined) {
|
||||
throw new Error("Expected launch to be called")
|
||||
}
|
||||
|
||||
const [launchArgs] = firstLaunchCall
|
||||
expect(launchArgs.fallbackChain).toEqual([
|
||||
{ providers: ["quotio"], model: "kimi-k2.5", variant: undefined },
|
||||
{ providers: ["openai"], model: "gpt-5.2", variant: "high" },
|
||||
])
|
||||
})
|
||||
|
||||
test("should return a tool error when sync spawn depth validation fails", async () => {
|
||||
//#given
|
||||
reserveSubagentSpawnMock.mockRejectedValueOnce(new Error("Subagent spawn blocked: child depth 4 exceeds background_task.maxDepth=3."))
|
||||
const toolDef = createCallOmoAgent(mockCtx, mockBackgroundManager, [])
|
||||
const executeFunc = toolDef.execute as Function
|
||||
|
||||
//#when
|
||||
const result = await executeFunc(
|
||||
{
|
||||
description: "Test",
|
||||
prompt: "Test prompt",
|
||||
subagent_type: "explore",
|
||||
run_in_background: false,
|
||||
},
|
||||
{ sessionID: "test", messageID: "msg", agent: "test", abort: new AbortController().signal },
|
||||
)
|
||||
|
||||
//#then
|
||||
expect(result).toContain("background_task.maxDepth=3")
|
||||
})
|
||||
})
|
||||
|
||||
export {}
|
||||
|
||||
@@ -95,6 +95,17 @@ export function createCallOmoAgent(
|
||||
return await executeBackground(args, toolCtx, backgroundManager, ctx.client, fallbackChain)
|
||||
}
|
||||
|
||||
if (!args.session_id) {
|
||||
let spawnReservation: Awaited<ReturnType<BackgroundManager["reserveSubagentSpawn"]>> | undefined
|
||||
try {
|
||||
spawnReservation = await backgroundManager.reserveSubagentSpawn(toolCtx.sessionID)
|
||||
return await executeSync(args, toolCtx, ctx, undefined, fallbackChain, spawnReservation)
|
||||
} catch (error) {
|
||||
spawnReservation?.rollback()
|
||||
return `Error: ${error instanceof Error ? error.message : String(error)}`
|
||||
}
|
||||
}
|
||||
|
||||
return await executeSync(args, toolCtx, ctx, undefined, fallbackChain)
|
||||
},
|
||||
})
|
||||
|
||||
@@ -110,6 +110,65 @@ describe("executeSyncTask - cleanup on error paths", () => {
|
||||
expect(deleteCalls[0]).toBe("ses_test_12345678")
|
||||
})
|
||||
|
||||
test("rolls back reserved descendant quota when sync session creation fails", async () => {
|
||||
const mockClient = {
|
||||
session: {
|
||||
create: async () => ({ data: { id: "ses_test_12345678" } }),
|
||||
},
|
||||
}
|
||||
|
||||
const { executeSyncTask } = require("./sync-task")
|
||||
|
||||
const commit = mock(() => 1)
|
||||
const rollback = mock(() => {})
|
||||
const reserveSubagentSpawn = mock(async () => ({
|
||||
spawnContext: { rootSessionID: "parent-session", parentDepth: 0, childDepth: 1 },
|
||||
descendantCount: 1,
|
||||
commit,
|
||||
rollback,
|
||||
}))
|
||||
|
||||
const deps = {
|
||||
createSyncSession: async () => ({ ok: false as const, error: "Failed to create session" }),
|
||||
sendSyncPrompt: async () => null,
|
||||
pollSyncSession: async () => null,
|
||||
fetchSyncResult: async () => ({ ok: true as const, textContent: "Result" }),
|
||||
}
|
||||
|
||||
const mockCtx = {
|
||||
sessionID: "parent-session",
|
||||
callID: "call-123",
|
||||
metadata: () => {},
|
||||
}
|
||||
|
||||
const mockExecutorCtx = {
|
||||
manager: { reserveSubagentSpawn },
|
||||
client: mockClient,
|
||||
directory: "/tmp",
|
||||
onSyncSessionCreated: null,
|
||||
}
|
||||
|
||||
const args = {
|
||||
prompt: "test prompt",
|
||||
description: "test task",
|
||||
category: "test",
|
||||
load_skills: [],
|
||||
run_in_background: false,
|
||||
command: null,
|
||||
}
|
||||
|
||||
//#when
|
||||
const result = await executeSyncTask(args, mockCtx, mockExecutorCtx, {
|
||||
sessionID: "parent-session",
|
||||
}, "test-agent", undefined, undefined, undefined, undefined, deps)
|
||||
|
||||
//#then
|
||||
expect(result).toBe("Failed to create session")
|
||||
expect(reserveSubagentSpawn).toHaveBeenCalledWith("parent-session")
|
||||
expect(commit).toHaveBeenCalledTimes(0)
|
||||
expect(rollback).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
test("cleans up toast and subagentSessions when pollSyncSession returns error", async () => {
|
||||
const mockClient = {
|
||||
session: {
|
||||
@@ -182,7 +241,18 @@ describe("executeSyncTask - cleanup on error paths", () => {
|
||||
metadata: () => {},
|
||||
}
|
||||
|
||||
const commit = mock(() => 1)
|
||||
const rollback = mock(() => {})
|
||||
|
||||
const mockExecutorCtx = {
|
||||
manager: {
|
||||
reserveSubagentSpawn: mock(async () => ({
|
||||
spawnContext: { rootSessionID: "parent-session", parentDepth: 0, childDepth: 1 },
|
||||
descendantCount: 1,
|
||||
commit,
|
||||
rollback,
|
||||
})),
|
||||
},
|
||||
client: mockClient,
|
||||
directory: "/tmp",
|
||||
onSyncSessionCreated: null,
|
||||
@@ -204,9 +274,14 @@ describe("executeSyncTask - cleanup on error paths", () => {
|
||||
|
||||
//#then - should complete and cleanup resources
|
||||
expect(result).toContain("Task completed")
|
||||
expect(mockExecutorCtx.manager.reserveSubagentSpawn).toHaveBeenCalledWith("parent-session")
|
||||
expect(commit).toHaveBeenCalledTimes(1)
|
||||
expect(rollback).toHaveBeenCalledTimes(0)
|
||||
expect(removeTaskCalls.length).toBe(1)
|
||||
expect(removeTaskCalls[0]).toBe("sync_ses_test")
|
||||
expect(deleteCalls.length).toBe(1)
|
||||
expect(deleteCalls[0]).toBe("ses_test_12345678")
|
||||
})
|
||||
})
|
||||
|
||||
export {}
|
||||
|
||||
@@ -23,12 +23,28 @@ export async function executeSyncTask(
|
||||
fallbackChain?: import("../../shared/model-requirements").FallbackEntry[],
|
||||
deps: SyncTaskDeps = syncTaskDeps
|
||||
): Promise<string> {
|
||||
const { client, directory, onSyncSessionCreated, syncPollTimeoutMs } = executorCtx
|
||||
const { manager, client, directory, onSyncSessionCreated, syncPollTimeoutMs } = executorCtx
|
||||
const toastManager = getTaskToastManager()
|
||||
let taskId: string | undefined
|
||||
let syncSessionID: string | undefined
|
||||
let spawnReservation:
|
||||
| Awaited<ReturnType<ExecutorContext["manager"]["reserveSubagentSpawn"]>>
|
||||
| undefined
|
||||
|
||||
try {
|
||||
if (typeof manager?.reserveSubagentSpawn === "function") {
|
||||
spawnReservation = await manager.reserveSubagentSpawn(parentContext.sessionID)
|
||||
}
|
||||
|
||||
const spawnContext = spawnReservation?.spawnContext
|
||||
?? (typeof manager?.assertCanSpawn === "function"
|
||||
? await manager.assertCanSpawn(parentContext.sessionID)
|
||||
: {
|
||||
rootSessionID: parentContext.sessionID,
|
||||
parentDepth: 0,
|
||||
childDepth: 1,
|
||||
})
|
||||
|
||||
const createSessionResult = await deps.createSyncSession(client, {
|
||||
parentSessionID: parentContext.sessionID,
|
||||
agentToUse,
|
||||
@@ -37,10 +53,12 @@ export async function executeSyncTask(
|
||||
})
|
||||
|
||||
if (!createSessionResult.ok) {
|
||||
spawnReservation?.rollback()
|
||||
return createSessionResult.error
|
||||
}
|
||||
|
||||
const sessionID = createSessionResult.sessionID
|
||||
spawnReservation?.commit()
|
||||
syncSessionID = sessionID
|
||||
subagentSessions.add(sessionID)
|
||||
syncSubagentSessions.add(sessionID)
|
||||
@@ -90,6 +108,7 @@ export async function executeSyncTask(
|
||||
run_in_background: args.run_in_background,
|
||||
sessionId: sessionID,
|
||||
sync: true,
|
||||
spawnDepth: spawnContext.childDepth,
|
||||
command: args.command,
|
||||
model: categoryModel ? { providerID: categoryModel.providerID, modelID: categoryModel.modelID } : undefined,
|
||||
},
|
||||
@@ -147,6 +166,7 @@ session_id: ${sessionID}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
spawnReservation?.rollback()
|
||||
return formatDetailedError(error, {
|
||||
operation: "Execute task",
|
||||
args,
|
||||
|
||||
Reference in New Issue
Block a user