diff --git a/src/features/background-agent/background-event-handler.ts b/src/features/background-agent/background-event-handler.ts index 3d6e18d9b..67bd77059 100644 --- a/src/features/background-agent/background-event-handler.ts +++ b/src/features/background-agent/background-event-handler.ts @@ -1,7 +1,7 @@ import { log } from "../../shared" -import { MIN_IDLE_TIME_MS } from "./constants" -import { subagentSessions } from "../claude-code-session-state" import type { BackgroundTask } from "./types" +import { cleanupTaskAfterSessionEnds } from "./session-task-cleanup" +import { handleSessionIdleBackgroundEvent } from "./session-idle-event-handler" type Event = { type: string; properties?: Record } @@ -18,6 +18,7 @@ export function handleBackgroundEvent(args: { event: Event findBySession: (sessionID: string) => BackgroundTask | undefined getAllDescendantTasks: (sessionID: string) => BackgroundTask[] + releaseConcurrencyKey?: (key: string) => void cancelTask: ( taskId: string, options: { source: string; reason: string; skipNotification: true } @@ -36,6 +37,7 @@ export function handleBackgroundEvent(args: { event, findBySession, getAllDescendantTasks, + releaseConcurrencyKey, cancelTask, tryCompleteTask, validateSessionHasOutput, @@ -78,6 +80,19 @@ export function handleBackgroundEvent(args: { } if (event.type === "session.idle") { + if (!props || !isRecord(props)) return + handleSessionIdleBackgroundEvent({ + properties: props, + findBySession, + idleDeferralTimers, + validateSessionHasOutput, + checkSessionTodos, + tryCompleteTask, + emitIdleEvent, + }) + } + + if (event.type === "session.error") { if (!props || !isRecord(props)) return const sessionID = getString(props, "sessionID") if (!sessionID) return @@ -85,64 +100,26 @@ export function handleBackgroundEvent(args: { const task = findBySession(sessionID) if (!task || task.status !== "running") return - const startedAt = task.startedAt - if (!startedAt) return + const errorRaw = props["error"] + const dataRaw = isRecord(errorRaw) ? errorRaw["data"] : undefined + const message = + (isRecord(dataRaw) ? getString(dataRaw, "message") : undefined) ?? + (isRecord(errorRaw) ? getString(errorRaw, "message") : undefined) ?? + "Session error" - const elapsedMs = Date.now() - startedAt.getTime() - if (elapsedMs < MIN_IDLE_TIME_MS) { - const remainingMs = MIN_IDLE_TIME_MS - elapsedMs - if (!idleDeferralTimers.has(task.id)) { - log("[background-agent] Deferring early session.idle:", { - elapsedMs, - remainingMs, - taskId: task.id, - }) - const timer = setTimeout(() => { - idleDeferralTimers.delete(task.id) - emitIdleEvent(sessionID) - }, remainingMs) - idleDeferralTimers.set(task.id, timer) - } else { - log("[background-agent] session.idle already deferred:", { elapsedMs, taskId: task.id }) - } - return - } + task.status = "error" + task.error = message + task.completedAt = new Date() - validateSessionHasOutput(sessionID) - .then(async (hasValidOutput) => { - if (task.status !== "running") { - log("[background-agent] Task status changed during validation, skipping:", { - taskId: task.id, - status: task.status, - }) - return - } - - if (!hasValidOutput) { - log("[background-agent] Session.idle but no valid output yet, waiting:", task.id) - return - } - - const hasIncompleteTodos = await checkSessionTodos(sessionID) - - if (task.status !== "running") { - log("[background-agent] Task status changed during todo check, skipping:", { - taskId: task.id, - status: task.status, - }) - return - } - - if (hasIncompleteTodos) { - log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id) - return - } - - await tryCompleteTask(task, "session.idle event") - }) - .catch((err) => { - log("[background-agent] Error in session.idle handler:", err) - }) + cleanupTaskAfterSessionEnds({ + task, + tasks, + idleDeferralTimers, + completionTimers, + cleanupPendingByParent, + clearNotificationsForTask, + releaseConcurrencyKey, + }) } if (event.type === "session.deleted") { @@ -176,24 +153,15 @@ export function handleBackgroundEvent(args: { }) } - const completionTimer = completionTimers.get(task.id) - if (completionTimer) { - clearTimeout(completionTimer) - completionTimers.delete(task.id) - } - - const idleTimer = idleDeferralTimers.get(task.id) - if (idleTimer) { - clearTimeout(idleTimer) - idleDeferralTimers.delete(task.id) - } - - cleanupPendingByParent(task) - tasks.delete(task.id) - clearNotificationsForTask(task.id) - if (task.sessionID) { - subagentSessions.delete(task.sessionID) - } + cleanupTaskAfterSessionEnds({ + task, + tasks, + idleDeferralTimers, + completionTimers, + cleanupPendingByParent, + clearNotificationsForTask, + releaseConcurrencyKey, + }) } } } diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index 83afdae4a..03d607ddd 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -190,6 +190,22 @@ function getPendingByParent(manager: BackgroundManager): Map return (manager as unknown as { pendingByParent: Map> }).pendingByParent } +function getQueuesByKey( + manager: BackgroundManager +): Map> { + return (manager as unknown as { + queuesByKey: Map> + }).queuesByKey +} + +async function processKeyForTest(manager: BackgroundManager, key: string): Promise { + return (manager as unknown as { processKey: (key: string) => Promise }).processKey(key) +} + +function pruneStaleTasksAndNotificationsForTest(manager: BackgroundManager): void { + ;(manager as unknown as { pruneStaleTasksAndNotifications: () => void }).pruneStaleTasksAndNotifications() +} + async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise { return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise }) .tryCompleteTask(task, "test") @@ -2505,6 +2521,198 @@ describe("BackgroundManager.handleEvent - session.deleted cascade", () => { }) }) +describe("BackgroundManager.handleEvent - session.error", () => { + test("sets task to error, releases concurrency, and cleans up", async () => { + //#given + const manager = createBackgroundManager() + const concurrencyManager = getConcurrencyManager(manager) + const concurrencyKey = "test-provider/test-model" + await concurrencyManager.acquire(concurrencyKey) + + const sessionID = "ses_error_1" + const task = createMockTask({ + id: "task-session-error", + sessionID, + parentSessionID: "parent-session", + parentMessageID: "msg-1", + description: "task that errors", + agent: "explore", + status: "running", + concurrencyKey, + }) + getTaskMap(manager).set(task.id, task) + getPendingByParent(manager).set(task.parentSessionID, new Set([task.id])) + + //#when + manager.handleEvent({ + type: "session.error", + properties: { + sessionID, + error: { + name: "UnknownError", + data: { message: "Model not found: kimi-for-coding/k2p5." }, + }, + }, + }) + + //#then + expect(task.status).toBe("error") + expect(task.error).toBe("Model not found: kimi-for-coding/k2p5.") + expect(task.completedAt).toBeInstanceOf(Date) + expect(concurrencyManager.getCount(concurrencyKey)).toBe(0) + expect(getTaskMap(manager).has(task.id)).toBe(false) + expect(getPendingByParent(manager).get(task.parentSessionID)).toBeUndefined() + + manager.shutdown() + }) + + test("ignores session.error for non-running tasks", () => { + //#given + const manager = createBackgroundManager() + const sessionID = "ses_error_ignored" + const task = createMockTask({ + id: "task-non-running", + sessionID, + parentSessionID: "parent-session", + parentMessageID: "msg-1", + description: "task already done", + agent: "explore", + status: "completed", + }) + task.completedAt = new Date() + task.error = "previous" + getTaskMap(manager).set(task.id, task) + + //#when + manager.handleEvent({ + type: "session.error", + properties: { + sessionID, + error: { name: "UnknownError", message: "should not matter" }, + }, + }) + + //#then + expect(task.status).toBe("completed") + expect(task.error).toBe("previous") + expect(getTaskMap(manager).has(task.id)).toBe(true) + + manager.shutdown() + }) + + test("ignores session.error for unknown session", () => { + //#given + const manager = createBackgroundManager() + + //#when + const handler = () => + manager.handleEvent({ + type: "session.error", + properties: { + sessionID: "ses_unknown", + error: { name: "UnknownError", message: "Model not found" }, + }, + }) + + //#then + expect(handler).not.toThrow() + + manager.shutdown() + }) +}) + +describe("BackgroundManager queue processing - error tasks are skipped", () => { + test("does not start tasks with status=error", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager( + { client, directory: tmpdir() } as unknown as PluginInput, + { defaultConcurrency: 1 } + ) + + const key = "test-key" + const task: BackgroundTask = { + id: "task-error-queued", + parentSessionID: "parent-session", + parentMessageID: "msg-1", + description: "queued error task", + prompt: "test", + agent: "test-agent", + status: "error", + queuedAt: new Date(), + } + + const input: import("./types").LaunchInput = { + description: task.description, + prompt: task.prompt, + agent: task.agent, + parentSessionID: task.parentSessionID, + parentMessageID: task.parentMessageID, + } + + let startCalled = false + ;(manager as unknown as { startTask: (item: unknown) => Promise }).startTask = async () => { + startCalled = true + } + + getTaskMap(manager).set(task.id, task) + getQueuesByKey(manager).set(key, [{ task, input }]) + + //#when + await processKeyForTest(manager, key) + + //#then + expect(startCalled).toBe(false) + expect(getQueuesByKey(manager).get(key)?.length ?? 0).toBe(0) + + manager.shutdown() + }) +}) + +describe("BackgroundManager.pruneStaleTasksAndNotifications - removes pruned tasks from queuesByKey", () => { + test("removes stale pending task from queue", () => { + //#given + const manager = createBackgroundManager() + const queuedAt = new Date(Date.now() - 31 * 60 * 1000) + const task: BackgroundTask = { + id: "task-stale-pending", + parentSessionID: "parent-session", + parentMessageID: "msg-1", + description: "stale pending", + prompt: "test", + agent: "test-agent", + status: "pending", + queuedAt, + } + const key = task.agent + + const input: import("./types").LaunchInput = { + description: task.description, + prompt: task.prompt, + agent: task.agent, + parentSessionID: task.parentSessionID, + parentMessageID: task.parentMessageID, + } + + getTaskMap(manager).set(task.id, task) + getQueuesByKey(manager).set(key, [{ task, input }]) + + //#when + pruneStaleTasksAndNotificationsForTest(manager) + + //#then + expect(getQueuesByKey(manager).get(key)).toBeUndefined() + + manager.shutdown() + }) +}) + describe("BackgroundManager.completionTimers - Memory Leak Fix", () => { function getCompletionTimers(manager: BackgroundManager): Map> { return (manager as unknown as { completionTimers: Map> }).completionTimers diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index dad9dca25..cbe169991 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -192,7 +192,7 @@ export class BackgroundManager { await this.concurrencyManager.acquire(key) - if (item.task.status === "cancelled") { + if (item.task.status === "cancelled" || item.task.status === "error") { this.concurrencyManager.release(key) queue.shift() continue @@ -729,6 +729,44 @@ export class BackgroundManager { }) } + if (event.type === "session.error") { + const sessionID = typeof props?.sessionID === "string" ? props.sessionID : undefined + if (!sessionID) return + + const task = this.findBySession(sessionID) + if (!task || task.status !== "running") return + + const errorMessage = props ? this.getSessionErrorMessage(props) : undefined + + task.status = "error" + task.error = errorMessage ?? "Session error" + task.completedAt = new Date() + + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + + const completionTimer = this.completionTimers.get(task.id) + if (completionTimer) { + clearTimeout(completionTimer) + this.completionTimers.delete(task.id) + } + + const idleTimer = this.idleDeferralTimers.get(task.id) + if (idleTimer) { + clearTimeout(idleTimer) + this.idleDeferralTimers.delete(task.id) + } + + this.cleanupPendingByParent(task) + this.tasks.delete(task.id) + this.clearNotificationsForTask(task.id) + if (task.sessionID) { + subagentSessions.delete(task.sessionID) + } + } + if (event.type === "session.deleted") { const info = props?.info if (!info || typeof info.id !== "string") return @@ -1281,6 +1319,24 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea return "" } + private isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null + } + + private getSessionErrorMessage(properties: EventProperties): string | undefined { + const errorRaw = properties["error"] + if (!this.isRecord(errorRaw)) return undefined + + const dataRaw = errorRaw["data"] + if (this.isRecord(dataRaw)) { + const message = dataRaw["message"] + if (typeof message === "string") return message + } + + const message = errorRaw["message"] + return typeof message === "string" ? message : undefined + } + private hasRunningTasks(): boolean { for (const task of this.tasks.values()) { if (task.status === "running") return true @@ -1292,6 +1348,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea const now = Date.now() for (const [taskId, task] of this.tasks.entries()) { + const wasPending = task.status === "pending" const timestamp = task.status === "pending" ? task.queuedAt?.getTime() : task.startedAt?.getTime() @@ -1316,6 +1373,21 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea } // Clean up pendingByParent to prevent stale entries this.cleanupPendingByParent(task) + if (wasPending) { + const key = task.model + ? `${task.model.providerID}/${task.model.modelID}` + : task.agent + const queue = this.queuesByKey.get(key) + if (queue) { + const index = queue.findIndex((item) => item.task.id === taskId) + if (index !== -1) { + queue.splice(index, 1) + if (queue.length === 0) { + this.queuesByKey.delete(key) + } + } + } + } this.clearNotificationsForTask(taskId) this.tasks.delete(taskId) if (task.sessionID) { diff --git a/src/features/background-agent/session-idle-event-handler.ts b/src/features/background-agent/session-idle-event-handler.ts new file mode 100644 index 000000000..17fb70abd --- /dev/null +++ b/src/features/background-agent/session-idle-event-handler.ts @@ -0,0 +1,93 @@ +import { log } from "../../shared" +import { MIN_IDLE_TIME_MS } from "./constants" +import type { BackgroundTask } from "./types" + +function getString(obj: Record, key: string): string | undefined { + const value = obj[key] + return typeof value === "string" ? value : undefined +} + +export function handleSessionIdleBackgroundEvent(args: { + properties: Record + findBySession: (sessionID: string) => BackgroundTask | undefined + idleDeferralTimers: Map> + validateSessionHasOutput: (sessionID: string) => Promise + checkSessionTodos: (sessionID: string) => Promise + tryCompleteTask: (task: BackgroundTask, source: string) => Promise + emitIdleEvent: (sessionID: string) => void +}): void { + const { + properties, + findBySession, + idleDeferralTimers, + validateSessionHasOutput, + checkSessionTodos, + tryCompleteTask, + emitIdleEvent, + } = args + + const sessionID = getString(properties, "sessionID") + if (!sessionID) return + + const task = findBySession(sessionID) + if (!task || task.status !== "running") return + + const startedAt = task.startedAt + if (!startedAt) return + + const elapsedMs = Date.now() - startedAt.getTime() + if (elapsedMs < MIN_IDLE_TIME_MS) { + const remainingMs = MIN_IDLE_TIME_MS - elapsedMs + if (!idleDeferralTimers.has(task.id)) { + log("[background-agent] Deferring early session.idle:", { + elapsedMs, + remainingMs, + taskId: task.id, + }) + const timer = setTimeout(() => { + idleDeferralTimers.delete(task.id) + emitIdleEvent(sessionID) + }, remainingMs) + idleDeferralTimers.set(task.id, timer) + } else { + log("[background-agent] session.idle already deferred:", { elapsedMs, taskId: task.id }) + } + return + } + + validateSessionHasOutput(sessionID) + .then(async (hasValidOutput) => { + if (task.status !== "running") { + log("[background-agent] Task status changed during validation, skipping:", { + taskId: task.id, + status: task.status, + }) + return + } + + if (!hasValidOutput) { + log("[background-agent] Session.idle but no valid output yet, waiting:", task.id) + return + } + + const hasIncompleteTodos = await checkSessionTodos(sessionID) + + if (task.status !== "running") { + log("[background-agent] Task status changed during todo check, skipping:", { + taskId: task.id, + status: task.status, + }) + return + } + + if (hasIncompleteTodos) { + log("[background-agent] Task has incomplete todos, waiting for todo-continuation:", task.id) + return + } + + await tryCompleteTask(task, "session.idle event") + }) + .catch((err) => { + log("[background-agent] Error in session.idle handler:", err) + }) +} diff --git a/src/features/background-agent/session-task-cleanup.ts b/src/features/background-agent/session-task-cleanup.ts new file mode 100644 index 000000000..4130da4ed --- /dev/null +++ b/src/features/background-agent/session-task-cleanup.ts @@ -0,0 +1,46 @@ +import { subagentSessions } from "../claude-code-session-state" +import type { BackgroundTask } from "./types" + +export function cleanupTaskAfterSessionEnds(args: { + task: BackgroundTask + tasks: Map + idleDeferralTimers: Map> + completionTimers: Map> + cleanupPendingByParent: (task: BackgroundTask) => void + clearNotificationsForTask: (taskId: string) => void + releaseConcurrencyKey?: (key: string) => void +}): void { + const { + task, + tasks, + idleDeferralTimers, + completionTimers, + cleanupPendingByParent, + clearNotificationsForTask, + releaseConcurrencyKey, + } = args + + const completionTimer = completionTimers.get(task.id) + if (completionTimer) { + clearTimeout(completionTimer) + completionTimers.delete(task.id) + } + + const idleTimer = idleDeferralTimers.get(task.id) + if (idleTimer) { + clearTimeout(idleTimer) + idleDeferralTimers.delete(task.id) + } + + if (task.concurrencyKey && releaseConcurrencyKey) { + releaseConcurrencyKey(task.concurrencyKey) + task.concurrencyKey = undefined + } + + cleanupPendingByParent(task) + clearNotificationsForTask(task.id) + tasks.delete(task.id) + if (task.sessionID) { + subagentSessions.delete(task.sessionID) + } +} diff --git a/src/features/background-agent/stale-task-pruner.ts b/src/features/background-agent/stale-task-pruner.ts index 160f73138..0858737c3 100644 --- a/src/features/background-agent/stale-task-pruner.ts +++ b/src/features/background-agent/stale-task-pruner.ts @@ -4,12 +4,15 @@ import { TASK_TTL_MS } from "./constants" import { subagentSessions } from "../claude-code-session-state" import { pruneStaleTasksAndNotifications } from "./task-poller" -import type { BackgroundTask } from "./types" +import type { BackgroundTask, LaunchInput } from "./types" import type { ConcurrencyManager } from "./concurrency" +type QueueItem = { task: BackgroundTask; input: LaunchInput } + export function pruneStaleState(args: { tasks: Map notifications: Map + queuesByKey: Map concurrencyManager: ConcurrencyManager cleanupPendingByParent: (task: BackgroundTask) => void clearNotificationsForTask: (taskId: string) => void @@ -17,6 +20,7 @@ export function pruneStaleState(args: { const { tasks, notifications, + queuesByKey, concurrencyManager, cleanupPendingByParent, clearNotificationsForTask, @@ -26,6 +30,7 @@ export function pruneStaleState(args: { tasks, notifications, onTaskPruned: (taskId, task, errorMessage) => { + const wasPending = task.status === "pending" const now = Date.now() const timestamp = task.status === "pending" ? task.queuedAt?.getTime() @@ -47,6 +52,21 @@ export function pruneStaleState(args: { } cleanupPendingByParent(task) + if (wasPending) { + const key = task.model + ? `${task.model.providerID}/${task.model.modelID}` + : task.agent + const queue = queuesByKey.get(key) + if (queue) { + const index = queue.findIndex((item) => item.task.id === taskId) + if (index !== -1) { + queue.splice(index, 1) + if (queue.length === 0) { + queuesByKey.delete(key) + } + } + } + } clearNotificationsForTask(taskId) tasks.delete(taskId) if (task.sessionID) { diff --git a/src/features/background-agent/task-queue-processor.ts b/src/features/background-agent/task-queue-processor.ts index 64568eab4..7458de207 100644 --- a/src/features/background-agent/task-queue-processor.ts +++ b/src/features/background-agent/task-queue-processor.ts @@ -27,7 +27,7 @@ export async function processConcurrencyKeyQueue(args: { await concurrencyManager.acquire(key) - if (item.task.status === "cancelled") { + if (item.task.status === "cancelled" || item.task.status === "error") { concurrencyManager.release(key) queue.shift() continue