fix(background-agent): release interrupted task slots during startup cleanup
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -1361,6 +1361,73 @@ describe("BackgroundManager.tryCompleteTask", () => {
|
||||
}
|
||||
})
|
||||
|
||||
test("should release task concurrencyKey when startTask throws after assigning it", async () => {
|
||||
// given
|
||||
const concurrencyKey = "anthropic/claude-opus-4-6"
|
||||
const concurrencyManager = getConcurrencyManager(manager)
|
||||
|
||||
const task = createMockTask({
|
||||
id: "task-process-key-concurrency",
|
||||
sessionID: "session-process-key-concurrency",
|
||||
parentSessionID: "parent-process-key-concurrency",
|
||||
status: "pending",
|
||||
agent: "explore",
|
||||
})
|
||||
const input = {
|
||||
description: task.description,
|
||||
prompt: task.prompt,
|
||||
agent: task.agent,
|
||||
parentSessionID: task.parentSessionID,
|
||||
parentMessageID: task.parentMessageID,
|
||||
model: { providerID: "anthropic", modelID: "claude-opus-4-6" },
|
||||
}
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
getQueuesByKey(manager).set(concurrencyKey, [{ task, input }])
|
||||
|
||||
;(manager as unknown as { startTask: (item: { task: BackgroundTask; input: typeof input }) => Promise<void> }).startTask = async (item) => {
|
||||
item.task.concurrencyKey = concurrencyKey
|
||||
throw new Error("startTask failed after assigning concurrencyKey")
|
||||
}
|
||||
|
||||
// when
|
||||
await processKeyForTest(manager, concurrencyKey)
|
||||
|
||||
// then
|
||||
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||
expect(task.concurrencyKey).toBeUndefined()
|
||||
})
|
||||
|
||||
test("should release queue slot when queued task is already interrupt", async () => {
|
||||
// given
|
||||
const concurrencyKey = "anthropic/claude-opus-4-6"
|
||||
const concurrencyManager = getConcurrencyManager(manager)
|
||||
|
||||
const task = createMockTask({
|
||||
id: "task-process-key-interrupt",
|
||||
sessionID: "session-process-key-interrupt",
|
||||
parentSessionID: "parent-process-key-interrupt",
|
||||
status: "interrupt",
|
||||
agent: "explore",
|
||||
})
|
||||
const input = {
|
||||
description: task.description,
|
||||
prompt: task.prompt,
|
||||
agent: task.agent,
|
||||
parentSessionID: task.parentSessionID,
|
||||
parentMessageID: task.parentMessageID,
|
||||
model: { providerID: "anthropic", modelID: "claude-opus-4-6" },
|
||||
}
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
getQueuesByKey(manager).set(concurrencyKey, [{ task, input }])
|
||||
|
||||
// when
|
||||
await processKeyForTest(manager, concurrencyKey)
|
||||
|
||||
// then
|
||||
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||
expect(getQueuesByKey(manager).get(concurrencyKey)).toEqual([])
|
||||
})
|
||||
|
||||
test("should avoid overlapping promptAsync calls when tasks complete concurrently", async () => {
|
||||
// given
|
||||
type PromptAsyncBody = Record<string, unknown> & { noReply?: boolean }
|
||||
@@ -3216,7 +3283,7 @@ describe("BackgroundManager.handleEvent - session.error", () => {
|
||||
concurrencyKey,
|
||||
fallbackChain: [
|
||||
{ providers: ["anthropic"], model: "claude-opus-4-6", variant: "max" },
|
||||
{ providers: ["anthropic"], model: "claude-opus-4-5" },
|
||||
{ providers: ["anthropic"], model: "claude-opus-4-5", variant: "max" },
|
||||
],
|
||||
})
|
||||
|
||||
@@ -3298,21 +3365,23 @@ describe("BackgroundManager.handleEvent - session.error", () => {
|
||||
})
|
||||
|
||||
//#when
|
||||
const messageInfo = {
|
||||
id: "msg_errored",
|
||||
sessionID,
|
||||
role: "assistant",
|
||||
error: {
|
||||
name: "UnknownError",
|
||||
data: {
|
||||
message:
|
||||
"Bad Gateway: {\"error\":{\"message\":\"unknown provider for model claude-opus-4-6-thinking\"}}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
manager.handleEvent({
|
||||
type: "message.updated",
|
||||
properties: {
|
||||
info: {
|
||||
id: "msg_errored",
|
||||
sessionID,
|
||||
role: "assistant",
|
||||
error: {
|
||||
name: "UnknownError",
|
||||
data: {
|
||||
message:
|
||||
"Bad Gateway: {\"error\":{\"message\":\"unknown provider for model claude-opus-4-6-thinking\"}}",
|
||||
},
|
||||
},
|
||||
},
|
||||
info: messageInfo,
|
||||
},
|
||||
})
|
||||
|
||||
|
||||
@@ -223,7 +223,7 @@ export class BackgroundManager {
|
||||
|
||||
await this.concurrencyManager.acquire(key)
|
||||
|
||||
if (item.task.status === "cancelled" || item.task.status === "error") {
|
||||
if (item.task.status === "cancelled" || item.task.status === "error" || item.task.status === "interrupt") {
|
||||
this.concurrencyManager.release(key)
|
||||
queue.shift()
|
||||
continue
|
||||
@@ -233,9 +233,10 @@ export class BackgroundManager {
|
||||
await this.startTask(item)
|
||||
} catch (error) {
|
||||
log("[background-agent] Error starting task:", error)
|
||||
// Release concurrency slot if startTask failed and didn't release it itself
|
||||
// This prevents slot leaks when errors occur after acquire but before task.concurrencyKey is set
|
||||
if (!item.task.concurrencyKey) {
|
||||
if (item.task.concurrencyKey) {
|
||||
this.concurrencyManager.release(item.task.concurrencyKey)
|
||||
item.task.concurrencyKey = undefined
|
||||
} else {
|
||||
this.concurrencyManager.release(key)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user