fix(background-agent): serialize parent notifications (#1582)
This commit is contained in:
@@ -1123,6 +1123,99 @@ describe("BackgroundManager.tryCompleteTask", () => {
|
||||
expect(task.status).toBe("completed")
|
||||
expect(getPendingByParent(manager).get(task.parentSessionID)).toBeUndefined()
|
||||
})
|
||||
|
||||
test("should avoid overlapping promptAsync calls when tasks complete concurrently", async () => {
|
||||
// given
|
||||
type PromptAsyncBody = Record<string, unknown> & { noReply?: boolean }
|
||||
|
||||
let resolveMessages: ((value: { data: unknown[] }) => void) | undefined
|
||||
const messagesBarrier = new Promise<{ data: unknown[] }>((resolve) => {
|
||||
resolveMessages = resolve
|
||||
})
|
||||
|
||||
const promptBodies: PromptAsyncBody[] = []
|
||||
let promptInFlight = false
|
||||
let rejectedCount = 0
|
||||
let promptCallCount = 0
|
||||
|
||||
let releaseFirstPrompt: (() => void) | undefined
|
||||
let resolveFirstStarted: (() => void) | undefined
|
||||
const firstStarted = new Promise<void>((resolve) => {
|
||||
resolveFirstStarted = resolve
|
||||
})
|
||||
|
||||
const client = {
|
||||
session: {
|
||||
prompt: async () => ({}),
|
||||
abort: async () => ({}),
|
||||
messages: async () => messagesBarrier,
|
||||
promptAsync: async (args: { path: { id: string }; body: PromptAsyncBody }) => {
|
||||
promptBodies.push(args.body)
|
||||
|
||||
if (!promptInFlight) {
|
||||
promptCallCount += 1
|
||||
if (promptCallCount === 1) {
|
||||
promptInFlight = true
|
||||
resolveFirstStarted?.()
|
||||
return await new Promise((resolve) => {
|
||||
releaseFirstPrompt = () => {
|
||||
promptInFlight = false
|
||||
resolve({})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return {}
|
||||
}
|
||||
|
||||
rejectedCount += 1
|
||||
throw new Error("BUSY")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput)
|
||||
|
||||
const parentSessionID = "parent-session"
|
||||
const taskA = createMockTask({
|
||||
id: "task-a",
|
||||
sessionID: "session-a",
|
||||
parentSessionID,
|
||||
})
|
||||
const taskB = createMockTask({
|
||||
id: "task-b",
|
||||
sessionID: "session-b",
|
||||
parentSessionID,
|
||||
})
|
||||
|
||||
getTaskMap(manager).set(taskA.id, taskA)
|
||||
getTaskMap(manager).set(taskB.id, taskB)
|
||||
getPendingByParent(manager).set(parentSessionID, new Set([taskA.id, taskB.id]))
|
||||
|
||||
// when
|
||||
const completionA = tryCompleteTaskForTest(manager, taskA)
|
||||
const completionB = tryCompleteTaskForTest(manager, taskB)
|
||||
resolveMessages?.({ data: [] })
|
||||
|
||||
await firstStarted
|
||||
|
||||
// Give the second completion a chance to attempt promptAsync while the first is in-flight.
|
||||
// In the buggy implementation, this triggers an overlap and increments rejectedCount.
|
||||
for (let i = 0; i < 20; i++) {
|
||||
await Promise.resolve()
|
||||
if (rejectedCount > 0) break
|
||||
if (promptBodies.length >= 2) break
|
||||
}
|
||||
|
||||
releaseFirstPrompt?.()
|
||||
await Promise.all([completionA, completionB])
|
||||
|
||||
// then
|
||||
expect(rejectedCount).toBe(0)
|
||||
expect(promptBodies.length).toBe(2)
|
||||
expect(promptBodies.some((b) => b.noReply === false)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.trackTask", () => {
|
||||
|
||||
@@ -89,6 +89,7 @@ export class BackgroundManager {
|
||||
private processingKeys: Set<string> = new Set()
|
||||
private completionTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||
private idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>> = new Map()
|
||||
private notificationQueueByParent: Map<string, Promise<void>> = new Map()
|
||||
|
||||
constructor(
|
||||
ctx: PluginInput,
|
||||
@@ -358,7 +359,7 @@ export class BackgroundManager {
|
||||
|
||||
this.markForNotification(existingTask)
|
||||
this.cleanupPendingByParent(existingTask)
|
||||
this.notifyParentSession(existingTask).catch(err => {
|
||||
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||
log("[background-agent] Failed to notify on error:", err)
|
||||
})
|
||||
}
|
||||
@@ -615,7 +616,7 @@ export class BackgroundManager {
|
||||
|
||||
this.markForNotification(existingTask)
|
||||
this.cleanupPendingByParent(existingTask)
|
||||
this.notifyParentSession(existingTask).catch(err => {
|
||||
this.enqueueNotificationForParent(existingTask.parentSessionID, () => this.notifyParentSession(existingTask)).catch(err => {
|
||||
log("[background-agent] Failed to notify on resume error:", err)
|
||||
})
|
||||
})
|
||||
@@ -949,7 +950,7 @@ export class BackgroundManager {
|
||||
this.markForNotification(task)
|
||||
|
||||
try {
|
||||
await this.notifyParentSession(task)
|
||||
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||
log(`[background-agent] Task cancelled via ${source}:`, task.id)
|
||||
} catch (err) {
|
||||
log("[background-agent] Error in notifyParentSession for cancelled task:", { taskId: task.id, error: err })
|
||||
@@ -1084,7 +1085,7 @@ export class BackgroundManager {
|
||||
}
|
||||
|
||||
try {
|
||||
await this.notifyParentSession(task)
|
||||
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||
log(`[background-agent] Task completed via ${source}:`, task.id)
|
||||
} catch (err) {
|
||||
log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err })
|
||||
@@ -1114,16 +1115,19 @@ export class BackgroundManager {
|
||||
|
||||
// Update pending tracking and check if all tasks complete
|
||||
const pendingSet = this.pendingByParent.get(task.parentSessionID)
|
||||
let allComplete = false
|
||||
let remainingCount = 0
|
||||
if (pendingSet) {
|
||||
pendingSet.delete(task.id)
|
||||
if (pendingSet.size === 0) {
|
||||
remainingCount = pendingSet.size
|
||||
allComplete = remainingCount === 0
|
||||
if (allComplete) {
|
||||
this.pendingByParent.delete(task.parentSessionID)
|
||||
}
|
||||
} else {
|
||||
allComplete = true
|
||||
}
|
||||
|
||||
const allComplete = !pendingSet || pendingSet.size === 0
|
||||
const remainingCount = pendingSet?.size ?? 0
|
||||
|
||||
const statusText = task.status === "completed" ? "COMPLETED" : "CANCELLED"
|
||||
const errorInfo = task.error ? `\n**Error:** ${task.error}` : ""
|
||||
|
||||
@@ -1378,7 +1382,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
log(`[background-agent] Task ${task.id} interrupted: stale timeout`)
|
||||
|
||||
try {
|
||||
await this.notifyParentSession(task)
|
||||
await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task))
|
||||
} catch (err) {
|
||||
log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err })
|
||||
}
|
||||
@@ -1572,12 +1576,37 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
this.tasks.clear()
|
||||
this.notifications.clear()
|
||||
this.pendingByParent.clear()
|
||||
this.notificationQueueByParent.clear()
|
||||
this.queuesByKey.clear()
|
||||
this.processingKeys.clear()
|
||||
this.unregisterProcessCleanup()
|
||||
log("[background-agent] Shutdown complete")
|
||||
|
||||
}
|
||||
|
||||
private enqueueNotificationForParent(
|
||||
parentSessionID: string | undefined,
|
||||
operation: () => Promise<void>
|
||||
): Promise<void> {
|
||||
if (!parentSessionID) {
|
||||
return operation()
|
||||
}
|
||||
|
||||
const previous = this.notificationQueueByParent.get(parentSessionID) ?? Promise.resolve()
|
||||
const current = previous
|
||||
.catch(() => {})
|
||||
.then(operation)
|
||||
|
||||
this.notificationQueueByParent.set(parentSessionID, current)
|
||||
|
||||
void current.finally(() => {
|
||||
if (this.notificationQueueByParent.get(parentSessionID) === current) {
|
||||
this.notificationQueueByParent.delete(parentSessionID)
|
||||
}
|
||||
}).catch(() => {})
|
||||
|
||||
return current
|
||||
}
|
||||
}
|
||||
|
||||
function registerProcessSignal(
|
||||
|
||||
Reference in New Issue
Block a user