Compare commits
1 Commits
v3.13.0
...
refactor/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78c9ad3e7f |
@@ -1572,6 +1572,189 @@ describe("BackgroundManager.trackTask", () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.launch and resume cleanup regressions", () => {
|
||||
test("launch should register pending task under parent before background start finishes", async () => {
|
||||
//#given
|
||||
let releaseCreate: (() => void) | undefined
|
||||
const createGate = new Promise<void>((resolve) => {
|
||||
releaseCreate = resolve
|
||||
})
|
||||
const client = {
|
||||
session: {
|
||||
get: async () => ({ data: { directory: "/test/dir" } }),
|
||||
create: async () => {
|
||||
await createGate
|
||||
return { data: { id: "session-launch-pending" } }
|
||||
},
|
||||
promptAsync: async () => ({}),
|
||||
abort: async () => ({}),
|
||||
},
|
||||
}
|
||||
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||
|
||||
//#when
|
||||
const task = await manager.launch({
|
||||
description: "pending registration",
|
||||
prompt: "launch prompt",
|
||||
agent: "explore",
|
||||
parentSessionID: "parent-launch-pending",
|
||||
parentMessageID: "msg-launch-pending",
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(getPendingByParent(manager).get("parent-launch-pending")?.has(task.id)).toBe(true)
|
||||
expect(manager.getTask(task.id)?.status).toBe("pending")
|
||||
|
||||
releaseCreate?.()
|
||||
await flushBackgroundNotifications()
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
test("launch should clean pending bookkeeping and format missing-agent prompt errors", async () => {
|
||||
//#given
|
||||
const abortedSessionIDs: string[] = []
|
||||
const promptAsyncCalls: string[] = []
|
||||
const client = {
|
||||
session: {
|
||||
get: async () => ({ data: { directory: "/test/dir" } }),
|
||||
create: async () => ({ data: { id: "session-launch-error" } }),
|
||||
promptAsync: async (args: { path: { id: string } }) => {
|
||||
promptAsyncCalls.push(args.path.id)
|
||||
if (args.path.id === "session-launch-error") {
|
||||
throw new Error("agent.name is undefined")
|
||||
}
|
||||
return {}
|
||||
},
|
||||
abort: async (args: { path: { id: string } }) => {
|
||||
abortedSessionIDs.push(args.path.id)
|
||||
return {}
|
||||
},
|
||||
messages: async () => ({ data: [] }),
|
||||
},
|
||||
}
|
||||
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||
|
||||
//#when
|
||||
const launchedTask = await manager.launch({
|
||||
description: "launch prompt error",
|
||||
prompt: "launch prompt",
|
||||
agent: "missing-agent",
|
||||
parentSessionID: "parent-launch-error",
|
||||
parentMessageID: "msg-launch-error",
|
||||
})
|
||||
await flushBackgroundNotifications()
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
//#then
|
||||
const storedTask = manager.getTask(launchedTask.id)
|
||||
expect(storedTask?.status).toBe("interrupt")
|
||||
expect(storedTask?.error).toBe('Agent "missing-agent" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.')
|
||||
expect(storedTask?.concurrencyKey).toBeUndefined()
|
||||
expect(storedTask?.completedAt).toBeInstanceOf(Date)
|
||||
expect(getPendingByParent(manager).get("parent-launch-error")).toBeUndefined()
|
||||
expect(abortedSessionIDs).toContain("session-launch-error")
|
||||
expect(promptAsyncCalls).toContain("parent-launch-error")
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
test("resume should clean pending bookkeeping and preserve raw prompt errors", async () => {
|
||||
//#given
|
||||
const abortedSessionIDs: string[] = []
|
||||
const promptAsyncCalls: string[] = []
|
||||
const client = {
|
||||
session: {
|
||||
promptAsync: async (args: { path: { id: string } }) => {
|
||||
promptAsyncCalls.push(args.path.id)
|
||||
if (args.path.id === "session-resume-error") {
|
||||
throw new Error("resume prompt exploded")
|
||||
}
|
||||
return {}
|
||||
},
|
||||
abort: async (args: { path: { id: string } }) => {
|
||||
abortedSessionIDs.push(args.path.id)
|
||||
return {}
|
||||
},
|
||||
messages: async () => ({ data: [] }),
|
||||
},
|
||||
}
|
||||
const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||
const task: BackgroundTask = {
|
||||
id: "task-resume-error",
|
||||
sessionID: "session-resume-error",
|
||||
parentSessionID: "parent-before-resume-error",
|
||||
parentMessageID: "msg-before-resume-error",
|
||||
description: "resume prompt error",
|
||||
prompt: "resume prompt",
|
||||
agent: "explore",
|
||||
status: "completed",
|
||||
startedAt: new Date(),
|
||||
completedAt: new Date(),
|
||||
concurrencyGroup: "explore",
|
||||
}
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
|
||||
//#when
|
||||
await manager.resume({
|
||||
sessionId: "session-resume-error",
|
||||
prompt: "resume now",
|
||||
parentSessionID: "parent-resume-error",
|
||||
parentMessageID: "msg-resume-error",
|
||||
})
|
||||
await flushBackgroundNotifications()
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
|
||||
//#then
|
||||
expect(task.status).toBe("interrupt")
|
||||
expect(task.error).toBe("resume prompt exploded")
|
||||
expect(task.concurrencyKey).toBeUndefined()
|
||||
expect(task.completedAt).toBeInstanceOf(Date)
|
||||
expect(getPendingByParent(manager).get("parent-resume-error")).toBeUndefined()
|
||||
expect(abortedSessionIDs).toContain("session-resume-error")
|
||||
expect(promptAsyncCalls).toContain("parent-resume-error")
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
test("trackTask should move pending bookkeeping when parent session changes", async () => {
|
||||
//#given
|
||||
const manager = createBackgroundManager()
|
||||
stubNotifyParentSession(manager)
|
||||
const existingTask: BackgroundTask = {
|
||||
id: "task-parent-move",
|
||||
sessionID: "session-parent-move",
|
||||
parentSessionID: "parent-before-move",
|
||||
parentMessageID: "msg-before-move",
|
||||
description: "tracked external task",
|
||||
prompt: "",
|
||||
agent: "task",
|
||||
status: "running",
|
||||
startedAt: new Date(),
|
||||
progress: {
|
||||
toolCalls: 0,
|
||||
lastUpdate: new Date(),
|
||||
},
|
||||
}
|
||||
getTaskMap(manager).set(existingTask.id, existingTask)
|
||||
getPendingByParent(manager).set("parent-before-move", new Set([existingTask.id]))
|
||||
|
||||
//#when
|
||||
await manager.trackTask({
|
||||
taskId: existingTask.id,
|
||||
sessionID: existingTask.sessionID!,
|
||||
parentSessionID: "parent-after-move",
|
||||
description: existingTask.description,
|
||||
agent: existingTask.agent,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(getPendingByParent(manager).get("parent-before-move")).toBeUndefined()
|
||||
expect(getPendingByParent(manager).get("parent-after-move")?.has(existingTask.id)).toBe(true)
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.resume concurrency key", () => {
|
||||
let manager: BackgroundManager
|
||||
|
||||
|
||||
@@ -291,11 +291,7 @@ export class BackgroundManager {
|
||||
this.taskHistory.record(input.parentSessionID, { id: task.id, agent: input.agent, description: input.description, status: "pending", category: input.category })
|
||||
|
||||
// Track for batched notifications immediately (pending state)
|
||||
if (input.parentSessionID) {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(task.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
}
|
||||
this.registerPendingTaskForParent(input.parentSessionID, task.id)
|
||||
|
||||
// Add to queue
|
||||
const key = this.getConcurrencyKeyFromInput(input)
|
||||
@@ -481,59 +477,21 @@ export class BackgroundManager {
|
||||
// Include model if caller provided one (e.g., from Sisyphus category configs)
|
||||
// IMPORTANT: variant must be a top-level field in the body, NOT nested inside model
|
||||
// OpenCode's PromptInput schema expects: { model: { providerID, modelID }, variant: "max" }
|
||||
const launchModel = input.model
|
||||
? { providerID: input.model.providerID, modelID: input.model.modelID }
|
||||
: undefined
|
||||
const launchVariant = input.model?.variant
|
||||
|
||||
promptWithModelSuggestionRetry(this.client, {
|
||||
path: { id: sessionID },
|
||||
body: {
|
||||
body: this.buildTaskPromptBody({
|
||||
sessionID,
|
||||
agent: input.agent,
|
||||
...(launchModel ? { model: launchModel } : {}),
|
||||
...(launchVariant ? { variant: launchVariant } : {}),
|
||||
system: input.skillContent,
|
||||
tools: (() => {
|
||||
const tools = {
|
||||
task: false,
|
||||
call_omo_agent: true,
|
||||
question: false,
|
||||
...getAgentToolRestrictions(input.agent),
|
||||
}
|
||||
setSessionTools(sessionID, tools)
|
||||
return tools
|
||||
})(),
|
||||
parts: [createInternalAgentTextPart(input.prompt)],
|
||||
},
|
||||
model: input.model,
|
||||
skillContent: input.skillContent,
|
||||
prompt: input.prompt,
|
||||
}),
|
||||
}).catch((error) => {
|
||||
log("[background-agent] promptAsync error:", error)
|
||||
const existingTask = this.findBySession(sessionID)
|
||||
if (existingTask) {
|
||||
existingTask.status = "interrupt"
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) {
|
||||
existingTask.error = `Agent "${input.agent}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.`
|
||||
} else {
|
||||
existingTask.error = errorMessage
|
||||
}
|
||||
existingTask.completedAt = new Date()
|
||||
if (existingTask.concurrencyKey) {
|
||||
this.concurrencyManager.release(existingTask.concurrencyKey)
|
||||
existingTask.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
removeTaskToastTracking(existingTask.id)
|
||||
|
||||
// Abort the session to prevent infinite polling hang
|
||||
this.client.session.abort({
|
||||
path: { id: sessionID },
|
||||
}).catch(() => {})
|
||||
|
||||
this.markForNotification(existingTask)
|
||||
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||
log("[background-agent] Failed to notify on error:", err)
|
||||
})
|
||||
}
|
||||
this.handlePromptDispatchError(task, error, {
|
||||
agentName: input.agent,
|
||||
errorLogLabel: "[background-agent] promptAsync error:",
|
||||
notifyLogLabel: "[background-agent] Failed to notify on error:",
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -582,6 +540,95 @@ export class BackgroundManager {
|
||||
return input.agent
|
||||
}
|
||||
|
||||
private registerPendingTaskForParent(parentSessionID: string | undefined, taskId: string): void {
|
||||
if (!parentSessionID) {
|
||||
return
|
||||
}
|
||||
|
||||
const pending = this.pendingByParent.get(parentSessionID) ?? new Set<string>()
|
||||
pending.add(taskId)
|
||||
this.pendingByParent.set(parentSessionID, pending)
|
||||
}
|
||||
|
||||
private buildTaskPromptTools(sessionID: string, agent: string): Record<string, boolean> {
|
||||
const tools = {
|
||||
task: false,
|
||||
call_omo_agent: true,
|
||||
question: false,
|
||||
...getAgentToolRestrictions(agent),
|
||||
}
|
||||
setSessionTools(sessionID, tools)
|
||||
return tools
|
||||
}
|
||||
|
||||
private buildTaskPromptBody(options: {
|
||||
sessionID: string
|
||||
agent: string
|
||||
model?: BackgroundTask["model"]
|
||||
skillContent?: string
|
||||
prompt: string
|
||||
}): {
|
||||
agent: string
|
||||
model?: { providerID: string; modelID: string }
|
||||
variant?: string
|
||||
system?: string
|
||||
tools: Record<string, boolean>
|
||||
parts: ReturnType<typeof createInternalAgentTextPart>[]
|
||||
} {
|
||||
const model = options.model
|
||||
? { providerID: options.model.providerID, modelID: options.model.modelID }
|
||||
: undefined
|
||||
const variant = options.model?.variant
|
||||
|
||||
return {
|
||||
agent: options.agent,
|
||||
...(model ? { model } : {}),
|
||||
...(variant ? { variant } : {}),
|
||||
...(options.skillContent ? { system: options.skillContent } : {}),
|
||||
tools: this.buildTaskPromptTools(options.sessionID, options.agent),
|
||||
parts: [createInternalAgentTextPart(options.prompt)],
|
||||
}
|
||||
}
|
||||
|
||||
private handlePromptDispatchError(
|
||||
task: BackgroundTask,
|
||||
error: unknown,
|
||||
options: {
|
||||
agentName?: string
|
||||
errorLogLabel: string
|
||||
notifyLogLabel: string
|
||||
}
|
||||
): void {
|
||||
log(options.errorLogLabel, error)
|
||||
|
||||
task.status = "interrupt"
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
if (options.agentName && (errorMessage.includes("agent.name") || errorMessage.includes("undefined"))) {
|
||||
task.error = `Agent "${options.agentName}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.`
|
||||
} else {
|
||||
task.error = errorMessage
|
||||
}
|
||||
task.completedAt = new Date()
|
||||
|
||||
if (task.concurrencyKey) {
|
||||
this.concurrencyManager.release(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
removeTaskToastTracking(task.id)
|
||||
|
||||
if (task.sessionID) {
|
||||
this.client.session.abort({
|
||||
path: { id: task.sessionID },
|
||||
}).catch(() => {})
|
||||
}
|
||||
|
||||
this.markForNotification(task)
|
||||
this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task)).catch(err => {
|
||||
log(options.notifyLogLabel, err)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Track a task created elsewhere (e.g., from task) for notification tracking.
|
||||
* This allows tasks created by other tools to receive the same toast/prompt notifications.
|
||||
@@ -618,9 +665,7 @@ export class BackgroundManager {
|
||||
|
||||
// Track for batched notifications if task is pending or running
|
||||
if (existingTask.status === "pending" || existingTask.status === "running") {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(existingTask.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
this.registerPendingTaskForParent(input.parentSessionID, existingTask.id)
|
||||
} else if (!parentChanged) {
|
||||
// Only clean up if parent didn't change (already cleaned above if it did)
|
||||
this.cleanupPendingByParent(existingTask)
|
||||
@@ -662,11 +707,7 @@ export class BackgroundManager {
|
||||
this.startPolling()
|
||||
this.taskHistory.record(input.parentSessionID, { id: task.id, sessionID: input.sessionID, agent: input.agent || "task", description: input.description, status: "running", startedAt: task.startedAt })
|
||||
|
||||
if (input.parentSessionID) {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(task.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
}
|
||||
this.registerPendingTaskForParent(input.parentSessionID, task.id)
|
||||
|
||||
log("[background-agent] Registered external task:", { taskId: task.id, sessionID: input.sessionID })
|
||||
|
||||
@@ -728,11 +769,7 @@ export class BackgroundManager {
|
||||
subagentSessions.add(existingTask.sessionID)
|
||||
}
|
||||
|
||||
if (input.parentSessionID) {
|
||||
const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set()
|
||||
pending.add(existingTask.id)
|
||||
this.pendingByParent.set(input.parentSessionID, pending)
|
||||
}
|
||||
this.registerPendingTaskForParent(input.parentSessionID, existingTask.id)
|
||||
|
||||
const toastManager = getTaskToastManager()
|
||||
if (toastManager) {
|
||||
@@ -756,54 +793,18 @@ export class BackgroundManager {
|
||||
// Fire-and-forget prompt via promptAsync (no response body needed)
|
||||
// Include model if task has one (preserved from original launch with category config)
|
||||
// variant must be top-level in body, not nested inside model (OpenCode PromptInput schema)
|
||||
const resumeModel = existingTask.model
|
||||
? { providerID: existingTask.model.providerID, modelID: existingTask.model.modelID }
|
||||
: undefined
|
||||
const resumeVariant = existingTask.model?.variant
|
||||
|
||||
this.client.session.promptAsync({
|
||||
path: { id: existingTask.sessionID },
|
||||
body: {
|
||||
body: this.buildTaskPromptBody({
|
||||
sessionID: existingTask.sessionID,
|
||||
agent: existingTask.agent,
|
||||
...(resumeModel ? { model: resumeModel } : {}),
|
||||
...(resumeVariant ? { variant: resumeVariant } : {}),
|
||||
tools: (() => {
|
||||
const tools = {
|
||||
task: false,
|
||||
call_omo_agent: true,
|
||||
question: false,
|
||||
...getAgentToolRestrictions(existingTask.agent),
|
||||
}
|
||||
setSessionTools(existingTask.sessionID!, tools)
|
||||
return tools
|
||||
})(),
|
||||
parts: [createInternalAgentTextPart(input.prompt)],
|
||||
},
|
||||
model: existingTask.model,
|
||||
prompt: input.prompt,
|
||||
}),
|
||||
}).catch((error) => {
|
||||
log("[background-agent] resume prompt error:", error)
|
||||
existingTask.status = "interrupt"
|
||||
const errorMessage = error instanceof Error ? error.message : String(error)
|
||||
existingTask.error = errorMessage
|
||||
existingTask.completedAt = new Date()
|
||||
|
||||
// Release concurrency on error to prevent slot leaks
|
||||
if (existingTask.concurrencyKey) {
|
||||
this.concurrencyManager.release(existingTask.concurrencyKey)
|
||||
existingTask.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
removeTaskToastTracking(existingTask.id)
|
||||
|
||||
// Abort the session to prevent infinite polling hang
|
||||
if (existingTask.sessionID) {
|
||||
this.client.session.abort({
|
||||
path: { id: existingTask.sessionID },
|
||||
}).catch(() => {})
|
||||
}
|
||||
|
||||
this.markForNotification(existingTask)
|
||||
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||
log("[background-agent] Failed to notify on resume error:", err)
|
||||
this.handlePromptDispatchError(existingTask, error, {
|
||||
errorLogLabel: "[background-agent] resume prompt error:",
|
||||
notifyLogLabel: "[background-agent] Failed to notify on resume error:",
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user