fix(background-agent): improve task completion detection and concurrency release
- manager.ts: Release concurrency key immediately on task completion, not after retention - call-omo-agent: Add polling loop for sync agent completion detection - sisyphus-task: Add abort handling, improve poll logging for debugging
This commit is contained in:
@@ -556,6 +556,11 @@ cleanup(): void {
|
||||
}
|
||||
|
||||
private async notifyParentSession(task: BackgroundTask): Promise<void> {
|
||||
if (task.concurrencyKey) {
|
||||
this.concurrencyManager.release(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
const duration = this.formatDuration(task.startedAt, task.completedAt)
|
||||
|
||||
log("[background-agent] notifyParentSession called for task:", task.id)
|
||||
@@ -636,13 +641,8 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
log("[background-agent] Failed to send notification:", error)
|
||||
}
|
||||
|
||||
// Cleanup after retention period
|
||||
const taskId = task.id
|
||||
setTimeout(() => {
|
||||
if (task.concurrencyKey) {
|
||||
this.concurrencyManager.release(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
this.clearNotificationsForTask(taskId)
|
||||
this.tasks.delete(taskId)
|
||||
log("[background-agent] Removed completed task from memory:", taskId)
|
||||
|
||||
@@ -191,7 +191,58 @@ async function executeSync(
|
||||
return `Error: Failed to send prompt: ${errorMessage}\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
}
|
||||
|
||||
log(`[call_omo_agent] Prompt sent, fetching messages...`)
|
||||
log(`[call_omo_agent] Prompt sent, polling for completion...`)
|
||||
|
||||
// Poll for session completion
|
||||
const POLL_INTERVAL_MS = 500
|
||||
const MAX_POLL_TIME_MS = 5 * 60 * 1000 // 5 minutes max
|
||||
const pollStart = Date.now()
|
||||
let lastMsgCount = 0
|
||||
let stablePolls = 0
|
||||
const STABILITY_REQUIRED = 3
|
||||
|
||||
while (Date.now() - pollStart < MAX_POLL_TIME_MS) {
|
||||
// Check if aborted
|
||||
if (toolContext.abort?.aborted) {
|
||||
log(`[call_omo_agent] Aborted by user`)
|
||||
return `Task aborted.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
|
||||
|
||||
// Check session status
|
||||
const statusResult = await ctx.client.session.status()
|
||||
const allStatuses = (statusResult.data ?? {}) as Record<string, { type: string }>
|
||||
const sessionStatus = allStatuses[sessionID]
|
||||
|
||||
// If session is actively running, reset stability counter
|
||||
if (sessionStatus && sessionStatus.type !== "idle") {
|
||||
stablePolls = 0
|
||||
lastMsgCount = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// Session is idle - check message stability
|
||||
const messagesCheck = await ctx.client.session.messages({ path: { id: sessionID } })
|
||||
const msgs = ((messagesCheck as { data?: unknown }).data ?? messagesCheck) as Array<unknown>
|
||||
const currentMsgCount = msgs.length
|
||||
|
||||
if (currentMsgCount > 0 && currentMsgCount === lastMsgCount) {
|
||||
stablePolls++
|
||||
if (stablePolls >= STABILITY_REQUIRED) {
|
||||
log(`[call_omo_agent] Session complete, ${currentMsgCount} messages`)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
stablePolls = 0
|
||||
lastMsgCount = currentMsgCount
|
||||
}
|
||||
}
|
||||
|
||||
if (Date.now() - pollStart >= MAX_POLL_TIME_MS) {
|
||||
log(`[call_omo_agent] Timeout reached`)
|
||||
return `Error: Agent task timed out after 5 minutes.\n\n<task_metadata>\nsession_id: ${sessionID}\n</task_metadata>`
|
||||
}
|
||||
|
||||
const messagesResult = await ctx.client.session.messages({
|
||||
path: { id: sessionID },
|
||||
|
||||
@@ -471,36 +471,55 @@ System notifies on completion. Use \`background_output\` with task_id="${task.id
|
||||
const pollStart = Date.now()
|
||||
let lastMsgCount = 0
|
||||
let stablePolls = 0
|
||||
let pollCount = 0
|
||||
|
||||
log("[sisyphus_task] Starting poll loop", { sessionID, agentToUse })
|
||||
|
||||
while (Date.now() - pollStart < MAX_POLL_TIME_MS) {
|
||||
if (ctx.abort?.aborted) {
|
||||
log("[sisyphus_task] Aborted by user", { sessionID })
|
||||
if (toastManager && taskId) toastManager.removeTask(taskId)
|
||||
return `Task aborted.\n\nSession ID: ${sessionID}`
|
||||
}
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, POLL_INTERVAL_MS))
|
||||
pollCount++
|
||||
|
||||
const statusResult = await client.session.status()
|
||||
const allStatuses = (statusResult.data ?? {}) as Record<string, { type: string }>
|
||||
const sessionStatus = allStatuses[sessionID]
|
||||
|
||||
// If session is actively running, reset stability
|
||||
if (pollCount % 10 === 0) {
|
||||
log("[sisyphus_task] Poll status", {
|
||||
sessionID,
|
||||
pollCount,
|
||||
elapsed: Math.floor((Date.now() - pollStart) / 1000) + "s",
|
||||
sessionStatus: sessionStatus?.type ?? "not_in_status",
|
||||
stablePolls,
|
||||
lastMsgCount,
|
||||
})
|
||||
}
|
||||
|
||||
if (sessionStatus && sessionStatus.type !== "idle") {
|
||||
stablePolls = 0
|
||||
lastMsgCount = 0
|
||||
continue
|
||||
}
|
||||
|
||||
// Session is idle or not in status - check message stability
|
||||
const elapsed = Date.now() - pollStart
|
||||
if (elapsed < MIN_STABILITY_TIME_MS) {
|
||||
continue // Don't accept completion too early
|
||||
continue
|
||||
}
|
||||
|
||||
// Get current message count
|
||||
const messagesCheck = await client.session.messages({ path: { id: sessionID } })
|
||||
const msgs = ((messagesCheck as { data?: unknown }).data ?? messagesCheck) as Array<unknown>
|
||||
const currentMsgCount = msgs.length
|
||||
|
||||
if (currentMsgCount > 0 && currentMsgCount === lastMsgCount) {
|
||||
if (currentMsgCount === lastMsgCount) {
|
||||
stablePolls++
|
||||
if (stablePolls >= STABILITY_POLLS_REQUIRED) {
|
||||
break // Messages stable for 3 polls - task complete
|
||||
log("[sisyphus_task] Poll complete - messages stable", { sessionID, pollCount, currentMsgCount })
|
||||
break
|
||||
}
|
||||
} else {
|
||||
stablePolls = 0
|
||||
@@ -508,6 +527,10 @@ System notifies on completion. Use \`background_output\` with task_id="${task.id
|
||||
}
|
||||
}
|
||||
|
||||
if (Date.now() - pollStart >= MAX_POLL_TIME_MS) {
|
||||
log("[sisyphus_task] Poll timeout reached", { sessionID, pollCount, lastMsgCount, stablePolls })
|
||||
}
|
||||
|
||||
const messagesResult = await client.session.messages({
|
||||
path: { id: sessionID },
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user