Merge pull request #1795 from code-yeongyu/fix/background-agent-session-error
fix: handle session.error and prevent zombie task starts in background-agent
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { log } from "../../shared"
|
||||
import { MIN_IDLE_TIME_MS } from "./constants"
|
||||
import { subagentSessions } from "../claude-code-session-state"
|
||||
import type { BackgroundTask } from "./types"
|
||||
import { cleanupTaskAfterSessionEnds } from "./session-task-cleanup"
|
||||
import { handleSessionIdleBackgroundEvent } from "./session-idle-event-handler"
|
||||
|
||||
type Event = { type: string; properties?: Record<string, unknown> }
|
||||
|
||||
@@ -18,6 +18,7 @@ export function handleBackgroundEvent(args: {
|
||||
event: Event
|
||||
findBySession: (sessionID: string) => BackgroundTask | undefined
|
||||
getAllDescendantTasks: (sessionID: string) => BackgroundTask[]
|
||||
releaseConcurrencyKey?: (key: string) => void
|
||||
cancelTask: (
|
||||
taskId: string,
|
||||
options: { source: string; reason: string; skipNotification: true }
|
||||
@@ -36,6 +37,7 @@ export function handleBackgroundEvent(args: {
|
||||
event,
|
||||
findBySession,
|
||||
getAllDescendantTasks,
|
||||
releaseConcurrencyKey,
|
||||
cancelTask,
|
||||
tryCompleteTask,
|
||||
validateSessionHasOutput,
|
||||
@@ -78,6 +80,19 @@ export function handleBackgroundEvent(args: {
|
||||
}
|
||||
|
||||
if (event.type === "session.idle") {
|
||||
if (!props || !isRecord(props)) return
|
||||
handleSessionIdleBackgroundEvent({
|
||||
properties: props,
|
||||
findBySession,
|
||||
idleDeferralTimers,
|
||||
validateSessionHasOutput,
|
||||
checkSessionTodos,
|
||||
tryCompleteTask,
|
||||
emitIdleEvent,
|
||||
})
|
||||
}
|
||||
|
||||
if (event.type === "session.error") {
|
||||
if (!props || !isRecord(props)) return
|
||||
const sessionID = getString(props, "sessionID")
|
||||
if (!sessionID) return
|
||||
@@ -85,64 +100,26 @@ export function handleBackgroundEvent(args: {
|
||||
const task = findBySession(sessionID)
|
||||
if (!task || task.status !== "running") return
|
||||
|
||||
const startedAt = task.startedAt
|
||||
if (!startedAt) return
|
||||
const errorRaw = props["error"]
|
||||
const dataRaw = isRecord(errorRaw) ? errorRaw["data"] : undefined
|
||||
const message =
|
||||
(isRecord(dataRaw) ? getString(dataRaw, "message") : undefined) ??
|
||||
(isRecord(errorRaw) ? getString(errorRaw, "message") : undefined) ??
|
||||
"Session error"
|
||||
|
||||
const elapsedMs = Date.now() - startedAt.getTime()
|
||||
if (elapsedMs < MIN_IDLE_TIME_MS) {
|
||||
const remainingMs = MIN_IDLE_TIME_MS - elapsedMs
|
||||
if (!idleDeferralTimers.has(task.id)) {
|
||||
log("[background-agent] Deferring early session.idle:", {
|
||||
elapsedMs,
|
||||
remainingMs,
|
||||
taskId: task.id,
|
||||
})
|
||||
const timer = setTimeout(() => {
|
||||
idleDeferralTimers.delete(task.id)
|
||||
emitIdleEvent(sessionID)
|
||||
}, remainingMs)
|
||||
idleDeferralTimers.set(task.id, timer)
|
||||
} else {
|
||||
log("[background-agent] session.idle already deferred:", { elapsedMs, taskId: task.id })
|
||||
}
|
||||
return
|
||||
}
|
||||
task.status = "error"
|
||||
task.error = message
|
||||
task.completedAt = new Date()
|
||||
|
||||
validateSessionHasOutput(sessionID)
|
||||
.then(async (hasValidOutput) => {
|
||||
if (task.status !== "running") {
|
||||
log("[background-agent] Task status changed during validation, skipping:", {
|
||||
taskId: task.id,
|
||||
status: task.status,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (!hasValidOutput) {
|
||||
log("[background-agent] Session.idle but no valid output yet, waiting:", task.id)
|
||||
return
|
||||
}
|
||||
|
||||
const hasIncompleteTodos = await checkSessionTodos(sessionID)
|
||||
|
||||
if (task.status !== "running") {
|
||||
log("[background-agent] Task status changed during todo check, skipping:", {
|
||||
taskId: task.id,
|
||||
status: task.status,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (hasIncompleteTodos) {
|
||||
log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id)
|
||||
return
|
||||
}
|
||||
|
||||
await tryCompleteTask(task, "session.idle event")
|
||||
})
|
||||
.catch((err) => {
|
||||
log("[background-agent] Error in session.idle handler:", err)
|
||||
})
|
||||
cleanupTaskAfterSessionEnds({
|
||||
task,
|
||||
tasks,
|
||||
idleDeferralTimers,
|
||||
completionTimers,
|
||||
cleanupPendingByParent,
|
||||
clearNotificationsForTask,
|
||||
releaseConcurrencyKey,
|
||||
})
|
||||
}
|
||||
|
||||
if (event.type === "session.deleted") {
|
||||
@@ -176,24 +153,15 @@ export function handleBackgroundEvent(args: {
|
||||
})
|
||||
}
|
||||
|
||||
const completionTimer = completionTimers.get(task.id)
|
||||
if (completionTimer) {
|
||||
clearTimeout(completionTimer)
|
||||
completionTimers.delete(task.id)
|
||||
}
|
||||
|
||||
const idleTimer = idleDeferralTimers.get(task.id)
|
||||
if (idleTimer) {
|
||||
clearTimeout(idleTimer)
|
||||
idleDeferralTimers.delete(task.id)
|
||||
}
|
||||
|
||||
cleanupPendingByParent(task)
|
||||
tasks.delete(task.id)
|
||||
clearNotificationsForTask(task.id)
|
||||
if (task.sessionID) {
|
||||
subagentSessions.delete(task.sessionID)
|
||||
}
|
||||
cleanupTaskAfterSessionEnds({
|
||||
task,
|
||||
tasks,
|
||||
idleDeferralTimers,
|
||||
completionTimers,
|
||||
cleanupPendingByParent,
|
||||
clearNotificationsForTask,
|
||||
releaseConcurrencyKey,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -190,6 +190,22 @@ function getPendingByParent(manager: BackgroundManager): Map<string, Set<string>
|
||||
return (manager as unknown as { pendingByParent: Map<string, Set<string>> }).pendingByParent
|
||||
}
|
||||
|
||||
function getQueuesByKey(
|
||||
manager: BackgroundManager
|
||||
): Map<string, Array<{ task: BackgroundTask; input: import("./types").LaunchInput }>> {
|
||||
return (manager as unknown as {
|
||||
queuesByKey: Map<string, Array<{ task: BackgroundTask; input: import("./types").LaunchInput }>>
|
||||
}).queuesByKey
|
||||
}
|
||||
|
||||
async function processKeyForTest(manager: BackgroundManager, key: string): Promise<void> {
|
||||
return (manager as unknown as { processKey: (key: string) => Promise<void> }).processKey(key)
|
||||
}
|
||||
|
||||
function pruneStaleTasksAndNotificationsForTest(manager: BackgroundManager): void {
|
||||
;(manager as unknown as { pruneStaleTasksAndNotifications: () => void }).pruneStaleTasksAndNotifications()
|
||||
}
|
||||
|
||||
async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise<boolean> {
|
||||
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> })
|
||||
.tryCompleteTask(task, "test")
|
||||
@@ -2505,6 +2521,198 @@ describe("BackgroundManager.handleEvent - session.deleted cascade", () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.handleEvent - session.error", () => {
|
||||
test("sets task to error, releases concurrency, and cleans up", async () => {
|
||||
//#given
|
||||
const manager = createBackgroundManager()
|
||||
const concurrencyManager = getConcurrencyManager(manager)
|
||||
const concurrencyKey = "test-provider/test-model"
|
||||
await concurrencyManager.acquire(concurrencyKey)
|
||||
|
||||
const sessionID = "ses_error_1"
|
||||
const task = createMockTask({
|
||||
id: "task-session-error",
|
||||
sessionID,
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "msg-1",
|
||||
description: "task that errors",
|
||||
agent: "explore",
|
||||
status: "running",
|
||||
concurrencyKey,
|
||||
})
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
getPendingByParent(manager).set(task.parentSessionID, new Set([task.id]))
|
||||
|
||||
//#when
|
||||
manager.handleEvent({
|
||||
type: "session.error",
|
||||
properties: {
|
||||
sessionID,
|
||||
error: {
|
||||
name: "UnknownError",
|
||||
data: { message: "Model not found: kimi-for-coding/k2p5." },
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(task.status).toBe("error")
|
||||
expect(task.error).toBe("Model not found: kimi-for-coding/k2p5.")
|
||||
expect(task.completedAt).toBeInstanceOf(Date)
|
||||
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||
expect(getTaskMap(manager).has(task.id)).toBe(false)
|
||||
expect(getPendingByParent(manager).get(task.parentSessionID)).toBeUndefined()
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
test("ignores session.error for non-running tasks", () => {
|
||||
//#given
|
||||
const manager = createBackgroundManager()
|
||||
const sessionID = "ses_error_ignored"
|
||||
const task = createMockTask({
|
||||
id: "task-non-running",
|
||||
sessionID,
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "msg-1",
|
||||
description: "task already done",
|
||||
agent: "explore",
|
||||
status: "completed",
|
||||
})
|
||||
task.completedAt = new Date()
|
||||
task.error = "previous"
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
|
||||
//#when
|
||||
manager.handleEvent({
|
||||
type: "session.error",
|
||||
properties: {
|
||||
sessionID,
|
||||
error: { name: "UnknownError", message: "should not matter" },
|
||||
},
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(task.status).toBe("completed")
|
||||
expect(task.error).toBe("previous")
|
||||
expect(getTaskMap(manager).has(task.id)).toBe(true)
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
test("ignores session.error for unknown session", () => {
|
||||
//#given
|
||||
const manager = createBackgroundManager()
|
||||
|
||||
//#when
|
||||
const handler = () =>
|
||||
manager.handleEvent({
|
||||
type: "session.error",
|
||||
properties: {
|
||||
sessionID: "ses_unknown",
|
||||
error: { name: "UnknownError", message: "Model not found" },
|
||||
},
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(handler).not.toThrow()
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager queue processing - error tasks are skipped", () => {
|
||||
test("does not start tasks with status=error", async () => {
|
||||
//#given
|
||||
const client = {
|
||||
session: {
|
||||
prompt: async () => ({}),
|
||||
promptAsync: async () => ({}),
|
||||
abort: async () => ({}),
|
||||
},
|
||||
}
|
||||
const manager = new BackgroundManager(
|
||||
{ client, directory: tmpdir() } as unknown as PluginInput,
|
||||
{ defaultConcurrency: 1 }
|
||||
)
|
||||
|
||||
const key = "test-key"
|
||||
const task: BackgroundTask = {
|
||||
id: "task-error-queued",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "msg-1",
|
||||
description: "queued error task",
|
||||
prompt: "test",
|
||||
agent: "test-agent",
|
||||
status: "error",
|
||||
queuedAt: new Date(),
|
||||
}
|
||||
|
||||
const input: import("./types").LaunchInput = {
|
||||
description: task.description,
|
||||
prompt: task.prompt,
|
||||
agent: task.agent,
|
||||
parentSessionID: task.parentSessionID,
|
||||
parentMessageID: task.parentMessageID,
|
||||
}
|
||||
|
||||
let startCalled = false
|
||||
;(manager as unknown as { startTask: (item: unknown) => Promise<void> }).startTask = async () => {
|
||||
startCalled = true
|
||||
}
|
||||
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
getQueuesByKey(manager).set(key, [{ task, input }])
|
||||
|
||||
//#when
|
||||
await processKeyForTest(manager, key)
|
||||
|
||||
//#then
|
||||
expect(startCalled).toBe(false)
|
||||
expect(getQueuesByKey(manager).get(key)?.length ?? 0).toBe(0)
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.pruneStaleTasksAndNotifications - removes pruned tasks from queuesByKey", () => {
|
||||
test("removes stale pending task from queue", () => {
|
||||
//#given
|
||||
const manager = createBackgroundManager()
|
||||
const queuedAt = new Date(Date.now() - 31 * 60 * 1000)
|
||||
const task: BackgroundTask = {
|
||||
id: "task-stale-pending",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "msg-1",
|
||||
description: "stale pending",
|
||||
prompt: "test",
|
||||
agent: "test-agent",
|
||||
status: "pending",
|
||||
queuedAt,
|
||||
}
|
||||
const key = task.agent
|
||||
|
||||
const input: import("./types").LaunchInput = {
|
||||
description: task.description,
|
||||
prompt: task.prompt,
|
||||
agent: task.agent,
|
||||
parentSessionID: task.parentSessionID,
|
||||
parentMessageID: task.parentMessageID,
|
||||
}
|
||||
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
getQueuesByKey(manager).set(key, [{ task, input }])
|
||||
|
||||
//#when
|
||||
pruneStaleTasksAndNotificationsForTest(manager)
|
||||
|
||||
//#then
|
||||
expect(getQueuesByKey(manager).get(key)).toBeUndefined()
|
||||
|
||||
manager.shutdown()
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.completionTimers - Memory Leak Fix", () => {
|
||||
function getCompletionTimers(manager: BackgroundManager): Map<string, ReturnType<typeof setTimeout>> {
|
||||
return (manager as unknown as { completionTimers: Map<string, ReturnType<typeof setTimeout>> }).completionTimers
|
||||
|
||||
@@ -192,7 +192,7 @@ export class BackgroundManager {
|
||||
|
||||
await this.concurrencyManager.acquire(key)
|
||||
|
||||
if (item.task.status === "cancelled") {
|
||||
if (item.task.status === "cancelled" || item.task.status === "error") {
|
||||
this.concurrencyManager.release(key)
|
||||
queue.shift()
|
||||
continue
|
||||
@@ -729,6 +729,44 @@ export class BackgroundManager {
|
||||
})
|
||||
}
|
||||
|
||||
if (event.type === "session.error") {
|
||||
const sessionID = typeof props?.sessionID === "string" ? props.sessionID : undefined
|
||||
if (!sessionID) return
|
||||
|
||||
const task = this.findBySession(sessionID)
|
||||
if (!task || task.status !== "running") return
|
||||
|
||||
const errorMessage = props ? this.getSessionErrorMessage(props) : undefined
|
||||
|
||||
task.status = "error"
|
||||
task.error = errorMessage ?? "Session error"
|
||||
task.completedAt = new Date()
|
||||
|
||||
if (task.concurrencyKey) {
|
||||
this.concurrencyManager.release(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
const completionTimer = this.completionTimers.get(task.id)
|
||||
if (completionTimer) {
|
||||
clearTimeout(completionTimer)
|
||||
this.completionTimers.delete(task.id)
|
||||
}
|
||||
|
||||
const idleTimer = this.idleDeferralTimers.get(task.id)
|
||||
if (idleTimer) {
|
||||
clearTimeout(idleTimer)
|
||||
this.idleDeferralTimers.delete(task.id)
|
||||
}
|
||||
|
||||
this.cleanupPendingByParent(task)
|
||||
this.tasks.delete(task.id)
|
||||
this.clearNotificationsForTask(task.id)
|
||||
if (task.sessionID) {
|
||||
subagentSessions.delete(task.sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
if (event.type === "session.deleted") {
|
||||
const info = props?.info
|
||||
if (!info || typeof info.id !== "string") return
|
||||
@@ -1281,6 +1319,24 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
return ""
|
||||
}
|
||||
|
||||
private isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null
|
||||
}
|
||||
|
||||
private getSessionErrorMessage(properties: EventProperties): string | undefined {
|
||||
const errorRaw = properties["error"]
|
||||
if (!this.isRecord(errorRaw)) return undefined
|
||||
|
||||
const dataRaw = errorRaw["data"]
|
||||
if (this.isRecord(dataRaw)) {
|
||||
const message = dataRaw["message"]
|
||||
if (typeof message === "string") return message
|
||||
}
|
||||
|
||||
const message = errorRaw["message"]
|
||||
return typeof message === "string" ? message : undefined
|
||||
}
|
||||
|
||||
private hasRunningTasks(): boolean {
|
||||
for (const task of this.tasks.values()) {
|
||||
if (task.status === "running") return true
|
||||
@@ -1292,6 +1348,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
const now = Date.now()
|
||||
|
||||
for (const [taskId, task] of this.tasks.entries()) {
|
||||
const wasPending = task.status === "pending"
|
||||
const timestamp = task.status === "pending"
|
||||
? task.queuedAt?.getTime()
|
||||
: task.startedAt?.getTime()
|
||||
@@ -1316,6 +1373,21 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
|
||||
}
|
||||
// Clean up pendingByParent to prevent stale entries
|
||||
this.cleanupPendingByParent(task)
|
||||
if (wasPending) {
|
||||
const key = task.model
|
||||
? `${task.model.providerID}/${task.model.modelID}`
|
||||
: task.agent
|
||||
const queue = this.queuesByKey.get(key)
|
||||
if (queue) {
|
||||
const index = queue.findIndex((item) => item.task.id === taskId)
|
||||
if (index !== -1) {
|
||||
queue.splice(index, 1)
|
||||
if (queue.length === 0) {
|
||||
this.queuesByKey.delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.clearNotificationsForTask(taskId)
|
||||
this.tasks.delete(taskId)
|
||||
if (task.sessionID) {
|
||||
|
||||
93
src/features/background-agent/session-idle-event-handler.ts
Normal file
93
src/features/background-agent/session-idle-event-handler.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { log } from "../../shared"
|
||||
import { MIN_IDLE_TIME_MS } from "./constants"
|
||||
import type { BackgroundTask } from "./types"
|
||||
|
||||
function getString(obj: Record<string, unknown>, key: string): string | undefined {
|
||||
const value = obj[key]
|
||||
return typeof value === "string" ? value : undefined
|
||||
}
|
||||
|
||||
export function handleSessionIdleBackgroundEvent(args: {
|
||||
properties: Record<string, unknown>
|
||||
findBySession: (sessionID: string) => BackgroundTask | undefined
|
||||
idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>>
|
||||
validateSessionHasOutput: (sessionID: string) => Promise<boolean>
|
||||
checkSessionTodos: (sessionID: string) => Promise<boolean>
|
||||
tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean>
|
||||
emitIdleEvent: (sessionID: string) => void
|
||||
}): void {
|
||||
const {
|
||||
properties,
|
||||
findBySession,
|
||||
idleDeferralTimers,
|
||||
validateSessionHasOutput,
|
||||
checkSessionTodos,
|
||||
tryCompleteTask,
|
||||
emitIdleEvent,
|
||||
} = args
|
||||
|
||||
const sessionID = getString(properties, "sessionID")
|
||||
if (!sessionID) return
|
||||
|
||||
const task = findBySession(sessionID)
|
||||
if (!task || task.status !== "running") return
|
||||
|
||||
const startedAt = task.startedAt
|
||||
if (!startedAt) return
|
||||
|
||||
const elapsedMs = Date.now() - startedAt.getTime()
|
||||
if (elapsedMs < MIN_IDLE_TIME_MS) {
|
||||
const remainingMs = MIN_IDLE_TIME_MS - elapsedMs
|
||||
if (!idleDeferralTimers.has(task.id)) {
|
||||
log("[background-agent] Deferring early session.idle:", {
|
||||
elapsedMs,
|
||||
remainingMs,
|
||||
taskId: task.id,
|
||||
})
|
||||
const timer = setTimeout(() => {
|
||||
idleDeferralTimers.delete(task.id)
|
||||
emitIdleEvent(sessionID)
|
||||
}, remainingMs)
|
||||
idleDeferralTimers.set(task.id, timer)
|
||||
} else {
|
||||
log("[background-agent] session.idle already deferred:", { elapsedMs, taskId: task.id })
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
validateSessionHasOutput(sessionID)
|
||||
.then(async (hasValidOutput) => {
|
||||
if (task.status !== "running") {
|
||||
log("[background-agent] Task status changed during validation, skipping:", {
|
||||
taskId: task.id,
|
||||
status: task.status,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (!hasValidOutput) {
|
||||
log("[background-agent] Session.idle but no valid output yet, waiting:", task.id)
|
||||
return
|
||||
}
|
||||
|
||||
const hasIncompleteTodos = await checkSessionTodos(sessionID)
|
||||
|
||||
if (task.status !== "running") {
|
||||
log("[background-agent] Task status changed during todo check, skipping:", {
|
||||
taskId: task.id,
|
||||
status: task.status,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if (hasIncompleteTodos) {
|
||||
log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id)
|
||||
return
|
||||
}
|
||||
|
||||
await tryCompleteTask(task, "session.idle event")
|
||||
})
|
||||
.catch((err) => {
|
||||
log("[background-agent] Error in session.idle handler:", err)
|
||||
})
|
||||
}
|
||||
46
src/features/background-agent/session-task-cleanup.ts
Normal file
46
src/features/background-agent/session-task-cleanup.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import { subagentSessions } from "../claude-code-session-state"
|
||||
import type { BackgroundTask } from "./types"
|
||||
|
||||
export function cleanupTaskAfterSessionEnds(args: {
|
||||
task: BackgroundTask
|
||||
tasks: Map<string, BackgroundTask>
|
||||
idleDeferralTimers: Map<string, ReturnType<typeof setTimeout>>
|
||||
completionTimers: Map<string, ReturnType<typeof setTimeout>>
|
||||
cleanupPendingByParent: (task: BackgroundTask) => void
|
||||
clearNotificationsForTask: (taskId: string) => void
|
||||
releaseConcurrencyKey?: (key: string) => void
|
||||
}): void {
|
||||
const {
|
||||
task,
|
||||
tasks,
|
||||
idleDeferralTimers,
|
||||
completionTimers,
|
||||
cleanupPendingByParent,
|
||||
clearNotificationsForTask,
|
||||
releaseConcurrencyKey,
|
||||
} = args
|
||||
|
||||
const completionTimer = completionTimers.get(task.id)
|
||||
if (completionTimer) {
|
||||
clearTimeout(completionTimer)
|
||||
completionTimers.delete(task.id)
|
||||
}
|
||||
|
||||
const idleTimer = idleDeferralTimers.get(task.id)
|
||||
if (idleTimer) {
|
||||
clearTimeout(idleTimer)
|
||||
idleDeferralTimers.delete(task.id)
|
||||
}
|
||||
|
||||
if (task.concurrencyKey && releaseConcurrencyKey) {
|
||||
releaseConcurrencyKey(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
cleanupPendingByParent(task)
|
||||
clearNotificationsForTask(task.id)
|
||||
tasks.delete(task.id)
|
||||
if (task.sessionID) {
|
||||
subagentSessions.delete(task.sessionID)
|
||||
}
|
||||
}
|
||||
@@ -4,12 +4,15 @@ import { TASK_TTL_MS } from "./constants"
|
||||
import { subagentSessions } from "../claude-code-session-state"
|
||||
import { pruneStaleTasksAndNotifications } from "./task-poller"
|
||||
|
||||
import type { BackgroundTask } from "./types"
|
||||
import type { BackgroundTask, LaunchInput } from "./types"
|
||||
import type { ConcurrencyManager } from "./concurrency"
|
||||
|
||||
type QueueItem = { task: BackgroundTask; input: LaunchInput }
|
||||
|
||||
export function pruneStaleState(args: {
|
||||
tasks: Map<string, BackgroundTask>
|
||||
notifications: Map<string, BackgroundTask[]>
|
||||
queuesByKey: Map<string, QueueItem[]>
|
||||
concurrencyManager: ConcurrencyManager
|
||||
cleanupPendingByParent: (task: BackgroundTask) => void
|
||||
clearNotificationsForTask: (taskId: string) => void
|
||||
@@ -17,6 +20,7 @@ export function pruneStaleState(args: {
|
||||
const {
|
||||
tasks,
|
||||
notifications,
|
||||
queuesByKey,
|
||||
concurrencyManager,
|
||||
cleanupPendingByParent,
|
||||
clearNotificationsForTask,
|
||||
@@ -26,6 +30,7 @@ export function pruneStaleState(args: {
|
||||
tasks,
|
||||
notifications,
|
||||
onTaskPruned: (taskId, task, errorMessage) => {
|
||||
const wasPending = task.status === "pending"
|
||||
const now = Date.now()
|
||||
const timestamp = task.status === "pending"
|
||||
? task.queuedAt?.getTime()
|
||||
@@ -47,6 +52,21 @@ export function pruneStaleState(args: {
|
||||
}
|
||||
|
||||
cleanupPendingByParent(task)
|
||||
if (wasPending) {
|
||||
const key = task.model
|
||||
? `${task.model.providerID}/${task.model.modelID}`
|
||||
: task.agent
|
||||
const queue = queuesByKey.get(key)
|
||||
if (queue) {
|
||||
const index = queue.findIndex((item) => item.task.id === taskId)
|
||||
if (index !== -1) {
|
||||
queue.splice(index, 1)
|
||||
if (queue.length === 0) {
|
||||
queuesByKey.delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
clearNotificationsForTask(taskId)
|
||||
tasks.delete(taskId)
|
||||
if (task.sessionID) {
|
||||
|
||||
@@ -27,7 +27,7 @@ export async function processConcurrencyKeyQueue(args: {
|
||||
|
||||
await concurrencyManager.acquire(key)
|
||||
|
||||
if (item.task.status === "cancelled") {
|
||||
if (item.task.status === "cancelled" || item.task.status === "error") {
|
||||
concurrencyManager.release(key)
|
||||
queue.shift()
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user