From 3e9a0ef9aa97d5bd6a0aaa6ffd6697cd820efdbc Mon Sep 17 00:00:00 2001 From: justsisyphus Date: Sun, 1 Feb 2026 16:51:11 +0900 Subject: [PATCH] fix(background-agent): abort session on task completion to prevent zombie attach processes --- src/features/background-agent/manager.test.ts | 35 + src/features/background-agent/manager.ts | 982 +++++++++++++++--- 2 files changed, 867 insertions(+), 150 deletions(-) diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index fda2a3940..e797486fc 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -982,6 +982,41 @@ describe("BackgroundManager.tryCompleteTask", () => { expect(task.status).toBe("completed") expect(concurrencyManager.getCount(concurrencyKey)).toBe(0) }) + + test("should abort session on completion", async () => { + // #given + const abortedSessionIDs: string[] = [] + const client = { + session: { + prompt: async () => ({}), + abort: async (args: { path: { id: string } }) => { + abortedSessionIDs.push(args.path.id) + return {} + }, + }, + } + manager.shutdown() + manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput) + stubNotifyParentSession(manager) + + const task: BackgroundTask = { + id: "task-1", + sessionID: "session-1", + parentSessionID: "session-parent", + parentMessageID: "msg-1", + description: "test task", + prompt: "test", + agent: "explore", + status: "running", + startedAt: new Date(), + } + + // #when + await tryCompleteTaskForTest(manager, task) + + // #then + expect(abortedSessionIDs).toEqual(["session-1"]) + }) }) describe("BackgroundManager.trackTask", () => { diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index 6204e87ee..e537cd28e 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -1,41 +1,76 @@ + import type { PluginInput } from "@opencode-ai/plugin" -import type { BackgroundTask, LaunchInput, ResumeInput } from "./types" -import type { BackgroundTaskConfig, TmuxConfig } from "../../config/schema" -import { - TASK_TTL_MS, - MIN_STABILITY_TIME_MS, - DEFAULT_STALE_TIMEOUT_MS, - MIN_RUNTIME_BEFORE_STALE_MS, - MIN_IDLE_TIME_MS, - POLLING_INTERVAL_MS, - type ProcessCleanupEvent, - type OpencodeClient, - type MessagePartInfo, - type BackgroundEvent, -} from "./constants" -import { TaskStateManager } from "./state" -import { createTask, startTask, resumeTask, type SpawnerContext } from "./spawner" -import { - checkSessionTodos, - validateSessionHasOutput, - tryCompleteTask, - notifyParentSession, - type ResultHandlerContext, -} from "./result-handler" -import { log } from "../../shared" +import type { + BackgroundTask, + LaunchInput, + ResumeInput, +} from "./types" +import { log, getAgentToolRestrictions, promptWithModelSuggestionRetry } from "../../shared" import { ConcurrencyManager } from "./concurrency" +import type { BackgroundTaskConfig, TmuxConfig } from "../../config/schema" +import { isInsideTmux } from "../../shared/tmux" + import { subagentSessions } from "../claude-code-session-state" import { getTaskToastManager } from "../task-toast-manager" +import { findNearestMessageWithFields, MESSAGE_STORAGE } from "../hook-message-injector" +import { existsSync, readdirSync } from "node:fs" +import { join } from "node:path" -export { type SubagentSessionCreatedEvent, type OnSubagentSessionCreated } from "./constants" +const TASK_TTL_MS = 30 * 60 * 1000 +const MIN_STABILITY_TIME_MS = 10 * 1000 // Must run at least 10s before stability detection kicks in +const DEFAULT_STALE_TIMEOUT_MS = 180_000 // 3 minutes +const MIN_RUNTIME_BEFORE_STALE_MS = 30_000 // 30 seconds -type ProcessCleanupHandler = () => void +type ProcessCleanupEvent = NodeJS.Signals | "beforeExit" | "exit" + +type OpencodeClient = PluginInput["client"] + + +interface MessagePartInfo { + sessionID?: string + type?: string + tool?: string +} + +interface EventProperties { + sessionID?: string + info?: { id?: string } + [key: string]: unknown +} + +interface Event { + type: string + properties?: EventProperties +} + +interface Todo { + content: string + status: string + priority: string + id: string +} + +interface QueueItem { + task: BackgroundTask + input: LaunchInput +} + +export interface SubagentSessionCreatedEvent { + sessionID: string + parentID: string + title: string +} + +export type OnSubagentSessionCreated = (event: SubagentSessionCreatedEvent) => Promise export class BackgroundManager { private static cleanupManagers = new Set() private static cleanupRegistered = false - private static cleanupHandlers = new Map() + private static cleanupHandlers = new Map void>() + private tasks: Map + private notifications: Map + private pendingByParent: Map> // Track pending tasks per parent for batching private client: OpencodeClient private directory: string private pollingInterval?: ReturnType @@ -43,20 +78,25 @@ export class BackgroundManager { private shutdownTriggered = false private config?: BackgroundTaskConfig private tmuxEnabled: boolean - private onSubagentSessionCreated?: (event: { sessionID: string; parentID: string; title: string }) => Promise + private onSubagentSessionCreated?: OnSubagentSessionCreated private onShutdown?: () => void - private state: TaskStateManager + + private queuesByKey: Map = new Map() + private processingKeys: Set = new Set() + private completionTimers: Map> = new Map() constructor( ctx: PluginInput, config?: BackgroundTaskConfig, options?: { tmuxConfig?: TmuxConfig - onSubagentSessionCreated?: (event: { sessionID: string; parentID: string; title: string }) => Promise + onSubagentSessionCreated?: OnSubagentSessionCreated onShutdown?: () => void } ) { - this.state = new TaskStateManager() + this.tasks = new Map() + this.notifications = new Map() + this.pendingByParent = new Map() this.client = ctx.client this.directory = ctx.directory this.concurrencyManager = new ConcurrencyManager(config) @@ -67,48 +107,6 @@ export class BackgroundManager { this.registerProcessCleanup() } - private getSpawnerContext(): SpawnerContext { - return { - client: this.client, - directory: this.directory, - concurrencyManager: this.concurrencyManager, - tmuxEnabled: this.tmuxEnabled, - onSubagentSessionCreated: this.onSubagentSessionCreated, - onTaskError: (task, error) => this.handleTaskError(task, error), - } - } - - private getResultHandlerContext(): ResultHandlerContext { - return { - client: this.client, - concurrencyManager: this.concurrencyManager, - state: this.state, - } - } - - private handleTaskError(task: BackgroundTask, error: Error): void { - const existingTask = this.state.findBySession(task.sessionID ?? "") - if (existingTask) { - existingTask.status = "error" - const errorMessage = error.message - if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) { - existingTask.error = `Agent "${task.agent}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.` - } else { - existingTask.error = errorMessage - } - existingTask.completedAt = new Date() - if (existingTask.concurrencyKey) { - this.concurrencyManager.release(existingTask.concurrencyKey) - existingTask.concurrencyKey = undefined - } - - this.state.markForNotification(existingTask) - notifyParentSession(existingTask, this.getResultHandlerContext()).catch(err => { - log("[background-agent] Failed to notify on error:", err) - }) - } - } - async launch(input: LaunchInput): Promise { log("[background-agent] launch() called with:", { agent: input.agent, @@ -121,17 +119,39 @@ export class BackgroundManager { throw new Error("Agent parameter is required") } - const task = createTask(input) - this.state.addTask(task) - - if (input.parentSessionID) { - this.state.trackPendingTask(input.parentSessionID, task.id) + // Create task immediately with status="pending" + const task: BackgroundTask = { + id: `bg_${crypto.randomUUID().slice(0, 8)}`, + status: "pending", + queuedAt: new Date(), + // Do NOT set startedAt - will be set when running + // Do NOT set sessionID - will be set when running + description: input.description, + prompt: input.prompt, + agent: input.agent, + parentSessionID: input.parentSessionID, + parentMessageID: input.parentMessageID, + parentModel: input.parentModel, + parentAgent: input.parentAgent, + model: input.model, } - const key = this.state.getConcurrencyKeyFromInput(input) - this.state.addToQueue(key, { task, input }) + this.tasks.set(task.id, task) - log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: this.state.getQueue(key)?.length ?? 0 }) + // Track for batched notifications immediately (pending state) + if (input.parentSessionID) { + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(task.id) + this.pendingByParent.set(input.parentSessionID, pending) + } + + // Add to queue + const key = this.getConcurrencyKeyFromInput(input) + const queue = this.queuesByKey.get(key) ?? [] + queue.push({ task, input }) + this.queuesByKey.set(key, queue) + + log("[background-agent] Task queued:", { taskId: task.id, key, queueLength: queue.length }) const toastManager = getTaskToastManager() if (toastManager) { @@ -145,20 +165,21 @@ export class BackgroundManager { }) } + // Trigger processing (fire-and-forget) this.processKey(key) return task } private async processKey(key: string): Promise { - if (this.state.processingKeys.has(key)) { + if (this.processingKeys.has(key)) { return } - this.state.processingKeys.add(key) + this.processingKeys.add(key) try { - const queue = this.state.getQueue(key) + const queue = this.queuesByKey.get(key) while (queue && queue.length > 0) { const item = queue[0] @@ -171,8 +192,7 @@ export class BackgroundManager { } try { - await startTask(item, this.getSpawnerContext()) - this.startPolling() + await this.startTask(item) } catch (error) { log("[background-agent] Error starting task:", error) } @@ -180,26 +200,204 @@ export class BackgroundManager { queue.shift() } } finally { - this.state.processingKeys.delete(key) + this.processingKeys.delete(key) } } + private async startTask(item: QueueItem): Promise { + const { task, input } = item + + log("[background-agent] Starting task:", { + taskId: task.id, + agent: input.agent, + model: input.model, + }) + + const concurrencyKey = this.getConcurrencyKeyFromInput(input) + + const parentSession = await this.client.session.get({ + path: { id: input.parentSessionID }, + }).catch((err) => { + log(`[background-agent] Failed to get parent session: ${err}`) + return null + }) + const parentDirectory = parentSession?.data?.directory ?? this.directory + log(`[background-agent] Parent dir: ${parentSession?.data?.directory}, using: ${parentDirectory}`) + + const createResult = await this.client.session.create({ + body: { + parentID: input.parentSessionID, + title: `Background: ${input.description}`, + permission: [ + { permission: "question", action: "deny" as const, pattern: "*" }, + ], + } as any, + query: { + directory: parentDirectory, + }, + }).catch((error) => { + this.concurrencyManager.release(concurrencyKey) + throw error + }) + + if (createResult.error) { + this.concurrencyManager.release(concurrencyKey) + throw new Error(`Failed to create background session: ${createResult.error}`) + } + + const sessionID = createResult.data.id + subagentSessions.add(sessionID) + + log("[background-agent] tmux callback check", { + hasCallback: !!this.onSubagentSessionCreated, + tmuxEnabled: this.tmuxEnabled, + isInsideTmux: isInsideTmux(), + sessionID, + parentID: input.parentSessionID, + }) + + if (this.onSubagentSessionCreated && this.tmuxEnabled && isInsideTmux()) { + log("[background-agent] Invoking tmux callback NOW", { sessionID }) + await this.onSubagentSessionCreated({ + sessionID, + parentID: input.parentSessionID, + title: input.description, + }).catch((err) => { + log("[background-agent] Failed to spawn tmux pane:", err) + }) + log("[background-agent] tmux callback completed, waiting 200ms") + await new Promise(r => setTimeout(r, 200)) + } else { + log("[background-agent] SKIP tmux callback - conditions not met") + } + + // Update task to running state + task.status = "running" + task.startedAt = new Date() + task.sessionID = sessionID + task.progress = { + toolCalls: 0, + lastUpdate: new Date(), + } + task.concurrencyKey = concurrencyKey + task.concurrencyGroup = concurrencyKey + + this.startPolling() + + log("[background-agent] Launching task:", { taskId: task.id, sessionID, agent: input.agent }) + + const toastManager = getTaskToastManager() + if (toastManager) { + toastManager.updateTask(task.id, "running") + } + + log("[background-agent] Calling prompt (fire-and-forget) for launch with:", { + sessionID, + agent: input.agent, + model: input.model, + hasSkillContent: !!input.skillContent, + promptLength: input.prompt.length, + }) + + // Use prompt() instead of promptAsync() to properly initialize agent loop (fire-and-forget) + // Include model if caller provided one (e.g., from Sisyphus category configs) + // IMPORTANT: variant must be a top-level field in the body, NOT nested inside model + // OpenCode's PromptInput schema expects: { model: { providerID, modelID }, variant: "max" } + const launchModel = input.model + ? { providerID: input.model.providerID, modelID: input.model.modelID } + : undefined + const launchVariant = input.model?.variant + + promptWithModelSuggestionRetry(this.client, { + path: { id: sessionID }, + body: { + agent: input.agent, + ...(launchModel ? { model: launchModel } : {}), + ...(launchVariant ? { variant: launchVariant } : {}), + system: input.skillContent, + tools: { + ...getAgentToolRestrictions(input.agent), + task: false, + delegate_task: false, + call_omo_agent: true, + question: false, + }, + parts: [{ type: "text", text: input.prompt }], + }, + }).catch((error) => { + log("[background-agent] promptAsync error:", error) + const existingTask = this.findBySession(sessionID) + if (existingTask) { + existingTask.status = "error" + const errorMessage = error instanceof Error ? error.message : String(error) + if (errorMessage.includes("agent.name") || errorMessage.includes("undefined")) { + existingTask.error = `Agent "${input.agent}" not found. Make sure the agent is registered in your opencode.json or provided by a plugin.` + } else { + existingTask.error = errorMessage + } + existingTask.completedAt = new Date() + if (existingTask.concurrencyKey) { + this.concurrencyManager.release(existingTask.concurrencyKey) + existingTask.concurrencyKey = undefined + } + + this.markForNotification(existingTask) + this.notifyParentSession(existingTask).catch(err => { + log("[background-agent] Failed to notify on error:", err) + }) + } + }) + } + getTask(id: string): BackgroundTask | undefined { - return this.state.getTask(id) + return this.tasks.get(id) } getTasksByParentSession(sessionID: string): BackgroundTask[] { - return this.state.getTasksByParentSession(sessionID) + const result: BackgroundTask[] = [] + for (const task of this.tasks.values()) { + if (task.parentSessionID === sessionID) { + result.push(task) + } + } + return result } getAllDescendantTasks(sessionID: string): BackgroundTask[] { - return this.state.getAllDescendantTasks(sessionID) + const result: BackgroundTask[] = [] + const directChildren = this.getTasksByParentSession(sessionID) + + for (const child of directChildren) { + result.push(child) + if (child.sessionID) { + const descendants = this.getAllDescendantTasks(child.sessionID) + result.push(...descendants) + } + } + + return result } findBySession(sessionID: string): BackgroundTask | undefined { - return this.state.findBySession(sessionID) + for (const task of this.tasks.values()) { + if (task.sessionID === sessionID) { + return task + } + } + return undefined } + private getConcurrencyKeyFromInput(input: LaunchInput): string { + if (input.model) { + return `${input.model.providerID}/${input.model.modelID}` + } + return input.agent + } + + /** + * Track a task created elsewhere (e.g., from delegate_task) for notification tracking. + * This allows tasks created by other tools to receive the same toast/prompt notifications. + */ async trackTask(input: { taskId: string sessionID: string @@ -209,11 +407,13 @@ export class BackgroundManager { parentAgent?: string concurrencyKey?: string }): Promise { - const existingTask = this.state.getTask(input.taskId) + const existingTask = this.tasks.get(input.taskId) if (existingTask) { + // P2 fix: Clean up old parent's pending set BEFORE changing parent + // Otherwise cleanupPendingByParent would use the new parent ID const parentChanged = input.parentSessionID !== existingTask.parentSessionID if (parentChanged) { - this.state.cleanupPendingByParent(existingTask) + this.cleanupPendingByParent(existingTask) // Clean from OLD parent existingTask.parentSessionID = input.parentSessionID } if (input.parentAgent !== undefined) { @@ -228,10 +428,14 @@ export class BackgroundManager { } this.startPolling() + // Track for batched notifications if task is pending or running if (existingTask.status === "pending" || existingTask.status === "running") { - this.state.trackPendingTask(input.parentSessionID, existingTask.id) + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(existingTask.id) + this.pendingByParent.set(input.parentSessionID, pending) } else if (!parentChanged) { - this.state.cleanupPendingByParent(existingTask) + // Only clean up if parent didn't change (already cleaned above if it did) + this.cleanupPendingByParent(existingTask) } log("[background-agent] External task already registered:", { taskId: existingTask.id, sessionID: existingTask.sessionID, status: existingTask.status }) @@ -241,6 +445,7 @@ export class BackgroundManager { const concurrencyGroup = input.concurrencyKey ?? input.agent ?? "delegate_task" + // Acquire concurrency slot if a key is provided if (input.concurrencyKey) { await this.concurrencyManager.acquire(input.concurrencyKey) } @@ -264,12 +469,14 @@ export class BackgroundManager { concurrencyGroup, } - this.state.addTask(task) + this.tasks.set(task.id, task) subagentSessions.add(input.sessionID) this.startPolling() if (input.parentSessionID) { - this.state.trackPendingTask(input.parentSessionID, task.id) + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(task.id) + this.pendingByParent.set(input.parentSessionID, pending) } log("[background-agent] Registered external task:", { taskId: task.id, sessionID: input.sessionID }) @@ -278,16 +485,45 @@ export class BackgroundManager { } async resume(input: ResumeInput): Promise { - const existingTask = this.state.findBySession(input.sessionId) + const existingTask = this.findBySession(input.sessionId) if (!existingTask) { throw new Error(`Task not found for session: ${input.sessionId}`) } - await resumeTask(existingTask, input, { - client: this.client, - concurrencyManager: this.concurrencyManager, - onTaskError: (task, error) => this.handleTaskError(task, error), - }) + if (!existingTask.sessionID) { + throw new Error(`Task has no sessionID: ${existingTask.id}`) + } + + if (existingTask.status === "running") { + log("[background-agent] Resume skipped - task already running:", { + taskId: existingTask.id, + sessionID: existingTask.sessionID, + }) + return existingTask + } + + // Re-acquire concurrency using the persisted concurrency group + const concurrencyKey = existingTask.concurrencyGroup ?? existingTask.agent + await this.concurrencyManager.acquire(concurrencyKey) + existingTask.concurrencyKey = concurrencyKey + existingTask.concurrencyGroup = concurrencyKey + + + existingTask.status = "running" + existingTask.completedAt = undefined + existingTask.error = undefined + existingTask.parentSessionID = input.parentSessionID + existingTask.parentMessageID = input.parentMessageID + existingTask.parentModel = input.parentModel + existingTask.parentAgent = input.parentAgent + // Reset startedAt on resume to prevent immediate completion + // The MIN_IDLE_TIME_MS check uses startedAt, so resumed tasks need fresh timing + existingTask.startedAt = new Date() + + existingTask.progress = { + toolCalls: existingTask.progress?.toolCalls ?? 0, + lastUpdate: new Date(), + } this.startPolling() if (existingTask.sessionID) { @@ -295,13 +531,92 @@ export class BackgroundManager { } if (input.parentSessionID) { - this.state.trackPendingTask(input.parentSessionID, existingTask.id) + const pending = this.pendingByParent.get(input.parentSessionID) ?? new Set() + pending.add(existingTask.id) + this.pendingByParent.set(input.parentSessionID, pending) } + const toastManager = getTaskToastManager() + if (toastManager) { + toastManager.addTask({ + id: existingTask.id, + description: existingTask.description, + agent: existingTask.agent, + isBackground: true, + }) + } + + log("[background-agent] Resuming task:", { taskId: existingTask.id, sessionID: existingTask.sessionID }) + + log("[background-agent] Resuming task - calling prompt (fire-and-forget) with:", { + sessionID: existingTask.sessionID, + agent: existingTask.agent, + model: existingTask.model, + promptLength: input.prompt.length, + }) + + // Use prompt() instead of promptAsync() to properly initialize agent loop + // Include model if task has one (preserved from original launch with category config) + // variant must be top-level in body, not nested inside model (OpenCode PromptInput schema) + const resumeModel = existingTask.model + ? { providerID: existingTask.model.providerID, modelID: existingTask.model.modelID } + : undefined + const resumeVariant = existingTask.model?.variant + + this.client.session.prompt({ + path: { id: existingTask.sessionID }, + body: { + agent: existingTask.agent, + ...(resumeModel ? { model: resumeModel } : {}), + ...(resumeVariant ? { variant: resumeVariant } : {}), + tools: { + ...getAgentToolRestrictions(existingTask.agent), + task: false, + delegate_task: false, + call_omo_agent: true, + question: false, + }, + parts: [{ type: "text", text: input.prompt }], + }, + }).catch((error) => { + log("[background-agent] resume prompt error:", error) + existingTask.status = "error" + const errorMessage = error instanceof Error ? error.message : String(error) + existingTask.error = errorMessage + existingTask.completedAt = new Date() + + // Release concurrency on error to prevent slot leaks + if (existingTask.concurrencyKey) { + this.concurrencyManager.release(existingTask.concurrencyKey) + existingTask.concurrencyKey = undefined + } + this.markForNotification(existingTask) + this.notifyParentSession(existingTask).catch(err => { + log("[background-agent] Failed to notify on resume error:", err) + }) + }) + return existingTask } - handleEvent(event: BackgroundEvent): void { + private async checkSessionTodos(sessionID: string): Promise { + try { + const response = await this.client.session.todo({ + path: { id: sessionID }, + }) + const todos = (response.data ?? response) as Todo[] + if (!todos || todos.length === 0) return false + + const incomplete = todos.filter( + (t) => t.status !== "completed" && t.status !== "cancelled" + ) + return incomplete.length > 0 + } catch { + return false + } + } + + handleEvent(event: Event): void { const props = event.properties if (event.type === "message.part.updated") { @@ -310,7 +625,7 @@ export class BackgroundManager { const sessionID = partInfo?.sessionID if (!sessionID) return - const task = this.state.findBySession(sessionID) + const task = this.findBySession(sessionID) if (!task) return if (partInfo?.type === "tool" || partInfo?.tool) { @@ -330,19 +645,23 @@ export class BackgroundManager { const sessionID = props?.sessionID as string | undefined if (!sessionID) return - const task = this.state.findBySession(sessionID) + const task = this.findBySession(sessionID) if (!task || task.status !== "running") return const startedAt = task.startedAt if (!startedAt) return + // Edge guard: Require minimum elapsed time (5 seconds) before accepting idle const elapsedMs = Date.now() - startedAt.getTime() + const MIN_IDLE_TIME_MS = 5000 if (elapsedMs < MIN_IDLE_TIME_MS) { log("[background-agent] Ignoring early session.idle, elapsed:", { elapsedMs, taskId: task.id }) return } - validateSessionHasOutput(this.client, sessionID).then(async (hasValidOutput) => { + // Edge guard: Verify session has actual assistant output before completing + this.validateSessionHasOutput(sessionID).then(async (hasValidOutput) => { + // Re-check status after async operation (could have been completed by polling) if (task.status !== "running") { log("[background-agent] Task status changed during validation, skipping:", { taskId: task.id, status: task.status }) return @@ -353,8 +672,9 @@ export class BackgroundManager { return } - const hasIncompleteTodos = await checkSessionTodos(this.client, sessionID) + const hasIncompleteTodos = await this.checkSessionTodos(sessionID) + // Re-check status after async operation again if (task.status !== "running") { log("[background-agent] Task status changed during todo check, skipping:", { taskId: task.id, status: task.status }) return @@ -365,7 +685,7 @@ export class BackgroundManager { return } - await tryCompleteTask(task, "session.idle event", this.getResultHandlerContext()) + await this.tryCompleteTask(task, "session.idle event") }).catch(err => { log("[background-agent] Error in session.idle handler:", err) }) @@ -376,7 +696,7 @@ export class BackgroundManager { if (!info || typeof info.id !== "string") return const sessionID = info.id - const task = this.state.findBySession(sessionID) + const task = this.findBySession(sessionID) if (!task) return if (task.status === "running") { @@ -385,40 +705,156 @@ export class BackgroundManager { task.error = "Session deleted" } - if (task.concurrencyKey) { - this.concurrencyManager.release(task.concurrencyKey) - task.concurrencyKey = undefined + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + const existingTimer = this.completionTimers.get(task.id) + if (existingTimer) { + clearTimeout(existingTimer) + this.completionTimers.delete(task.id) } - this.state.clearCompletionTimer(task.id) - this.state.cleanupPendingByParent(task) - this.state.removeTask(task.id) - this.state.clearNotificationsForTask(task.id) + this.cleanupPendingByParent(task) + this.tasks.delete(task.id) + this.clearNotificationsForTask(task.id) subagentSessions.delete(sessionID) } } markForNotification(task: BackgroundTask): void { - this.state.markForNotification(task) + const queue = this.notifications.get(task.parentSessionID) ?? [] + queue.push(task) + this.notifications.set(task.parentSessionID, queue) } getPendingNotifications(sessionID: string): BackgroundTask[] { - return this.state.getPendingNotifications(sessionID) + return this.notifications.get(sessionID) ?? [] } clearNotifications(sessionID: string): void { - this.state.clearNotifications(sessionID) + this.notifications.delete(sessionID) } + /** + * Validates that a session has actual assistant/tool output before marking complete. + * Prevents premature completion when session.idle fires before agent responds. + */ + private async validateSessionHasOutput(sessionID: string): Promise { + try { + const response = await this.client.session.messages({ + path: { id: sessionID }, + }) + + const messages = response.data ?? [] + + // Check for at least one assistant or tool message + const hasAssistantOrToolMessage = messages.some( + (m: { info?: { role?: string } }) => + m.info?.role === "assistant" || m.info?.role === "tool" + ) + + if (!hasAssistantOrToolMessage) { + log("[background-agent] No assistant/tool messages found in session:", sessionID) + return false + } + + // Additionally check that at least one message has content (not just empty) + // OpenCode API uses different part types than Anthropic's API: + // - "reasoning" with .text property (thinking/reasoning content) + // - "tool" with .state.output property (tool call results) + // - "text" with .text property (final text output) + // - "step-start"/"step-finish" (metadata, no content) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const hasContent = messages.some((m: any) => { + if (m.info?.role !== "assistant" && m.info?.role !== "tool") return false + const parts = m.parts ?? [] + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return parts.some((p: any) => + // Text content (final output) + (p.type === "text" && p.text && p.text.trim().length > 0) || + // Reasoning content (thinking blocks) + (p.type === "reasoning" && p.text && p.text.trim().length > 0) || + // Tool calls (indicates work was done) + p.type === "tool" || + // Tool results (output from executed tools) - important for tool-only tasks + (p.type === "tool_result" && p.content && + (typeof p.content === "string" ? p.content.trim().length > 0 : p.content.length > 0)) + ) + }) + + if (!hasContent) { + log("[background-agent] Messages exist but no content found in session:", sessionID) + return false + } + + return true + } catch (error) { + log("[background-agent] Error validating session output:", error) + // On error, allow completion to proceed (don't block indefinitely) + return true + } + } + + private clearNotificationsForTask(taskId: string): void { + for (const [sessionID, tasks] of this.notifications.entries()) { + const filtered = tasks.filter((t) => t.id !== taskId) + if (filtered.length === 0) { + this.notifications.delete(sessionID) + } else { + this.notifications.set(sessionID, filtered) + } + } + } + + /** + * Remove task from pending tracking for its parent session. + * Cleans up the parent entry if no pending tasks remain. + */ + private cleanupPendingByParent(task: BackgroundTask): void { + if (!task.parentSessionID) return + const pending = this.pendingByParent.get(task.parentSessionID) + if (pending) { + pending.delete(task.id) + if (pending.size === 0) { + this.pendingByParent.delete(task.parentSessionID) + } + } + } + + /** + * Cancels a pending task by removing it from queue and marking as cancelled. + * Does NOT abort session (no session exists yet) or release concurrency slot (wasn't acquired). + */ cancelPendingTask(taskId: string): boolean { - return this.state.cancelPendingTask(taskId) - } + const task = this.tasks.get(taskId) + if (!task || task.status !== "pending") { + return false + } - getRunningTasks(): BackgroundTask[] { - return this.state.getRunningTasks() - } + // Find and remove from queue + const key = task.model + ? `${task.model.providerID}/${task.model.modelID}` + : task.agent + const queue = this.queuesByKey.get(key) + if (queue) { + const index = queue.findIndex(item => item.task.id === taskId) + if (index !== -1) { + queue.splice(index, 1) + if (queue.length === 0) { + this.queuesByKey.delete(key) + } + } + } - getCompletedTasks(): BackgroundTask[] { - return this.state.getCompletedTasks() + // Mark as cancelled + task.status = "cancelled" + task.completedAt = new Date() + + // Clean up pendingByParent + this.cleanupPendingByParent(task) + + log("[background-agent] Cancelled pending task:", { taskId, key }) + return true } private startPolling(): void { @@ -426,7 +862,7 @@ export class BackgroundManager { this.pollingInterval = setInterval(() => { this.pollRunningTasks() - }, POLLING_INTERVAL_MS) + }, 2000) this.pollingInterval.unref() } @@ -479,10 +915,211 @@ export class BackgroundManager { BackgroundManager.cleanupRegistered = false } + + /** + * Get all running tasks (for compaction hook) + */ + getRunningTasks(): BackgroundTask[] { + return Array.from(this.tasks.values()).filter(t => t.status === "running") + } + + /** + * Get all completed tasks still in memory (for compaction hook) + */ + getCompletedTasks(): BackgroundTask[] { + return Array.from(this.tasks.values()).filter(t => t.status !== "running") + } + + /** + * Safely complete a task with race condition protection. + * Returns true if task was successfully completed, false if already completed by another path. + */ + private async tryCompleteTask(task: BackgroundTask, source: string): Promise { + // Guard: Check if task is still running (could have been completed by another path) + if (task.status !== "running") { + log("[background-agent] Task already completed, skipping:", { taskId: task.id, status: task.status, source }) + return false + } + + // Atomically mark as completed to prevent race conditions + task.status = "completed" + task.completedAt = new Date() + + // Release concurrency BEFORE any async operations to prevent slot leaks + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + + this.markForNotification(task) + + if (task.sessionID) { + this.client.session.abort({ + path: { id: task.sessionID }, + }).catch(() => {}) + } + + try { + await this.notifyParentSession(task) + log(`[background-agent] Task completed via ${source}:`, task.id) + } catch (err) { + log("[background-agent] Error in notifyParentSession:", { taskId: task.id, error: err }) + // Concurrency already released, notification failed but task is complete + } + + return true + } + + private async notifyParentSession(task: BackgroundTask): Promise { + // Note: Callers must release concurrency before calling this method + // to ensure slots are freed even if notification fails + + const duration = this.formatDuration(task.startedAt ?? new Date(), task.completedAt) + + log("[background-agent] notifyParentSession called for task:", task.id) + + // Show toast notification + const toastManager = getTaskToastManager() + if (toastManager) { + toastManager.showCompletionToast({ + id: task.id, + description: task.description, + duration, + }) + } + + // Update pending tracking and check if all tasks complete + const pendingSet = this.pendingByParent.get(task.parentSessionID) + if (pendingSet) { + pendingSet.delete(task.id) + if (pendingSet.size === 0) { + this.pendingByParent.delete(task.parentSessionID) + } + } + + const allComplete = !pendingSet || pendingSet.size === 0 + const remainingCount = pendingSet?.size ?? 0 + + const statusText = task.status === "completed" ? "COMPLETED" : "CANCELLED" + const errorInfo = task.error ? `\n**Error:** ${task.error}` : "" + + let notification: string + if (allComplete) { + const completedTasks = Array.from(this.tasks.values()) + .filter(t => t.parentSessionID === task.parentSessionID && t.status !== "running" && t.status !== "pending") + .map(t => `- \`${t.id}\`: ${t.description}`) + .join("\n") + + notification = ` +[ALL BACKGROUND TASKS COMPLETE] + +**Completed:** +${completedTasks || `- \`${task.id}\`: ${task.description}`} + +Use \`background_output(task_id="")\` to retrieve each result. +` + } else { + // Individual completion - silent notification + notification = ` +[BACKGROUND TASK ${statusText}] +**ID:** \`${task.id}\` +**Description:** ${task.description} +**Duration:** ${duration}${errorInfo} + +**${remainingCount} task${remainingCount === 1 ? "" : "s"} still in progress.** You WILL be notified when ALL complete. +Do NOT poll - continue productive work. + +Use \`background_output(task_id="${task.id}")\` to retrieve this result when ready. +` + } + + let agent: string | undefined = task.parentAgent + let model: { providerID: string; modelID: string } | undefined + + try { + const messagesResp = await this.client.session.messages({ path: { id: task.parentSessionID } }) + const messages = (messagesResp.data ?? []) as Array<{ + info?: { agent?: string; model?: { providerID: string; modelID: string }; modelID?: string; providerID?: string } + }> + for (let i = messages.length - 1; i >= 0; i--) { + const info = messages[i].info + if (info?.agent || info?.model || (info?.modelID && info?.providerID)) { + agent = info.agent ?? task.parentAgent + model = info.model ?? (info.providerID && info.modelID ? { providerID: info.providerID, modelID: info.modelID } : undefined) + break + } + } + } catch { + const messageDir = getMessageDir(task.parentSessionID) + const currentMessage = messageDir ? findNearestMessageWithFields(messageDir) : null + agent = currentMessage?.agent ?? task.parentAgent + model = currentMessage?.model?.providerID && currentMessage?.model?.modelID + ? { providerID: currentMessage.model.providerID, modelID: currentMessage.model.modelID } + : undefined + } + + log("[background-agent] notifyParentSession context:", { + taskId: task.id, + resolvedAgent: agent, + resolvedModel: model, + }) + + try { + await this.client.session.prompt({ + path: { id: task.parentSessionID }, + body: { + noReply: !allComplete, + ...(agent !== undefined ? { agent } : {}), + ...(model !== undefined ? { model } : {}), + parts: [{ type: "text", text: notification }], + }, + }) + log("[background-agent] Sent notification to parent session:", { + taskId: task.id, + allComplete, + noReply: !allComplete, + }) + } catch (error) { + log("[background-agent] Failed to send notification:", error) + } + + const taskId = task.id + const timer = setTimeout(() => { + this.completionTimers.delete(taskId) + if (this.tasks.has(taskId)) { + this.clearNotificationsForTask(taskId) + this.tasks.delete(taskId) + log("[background-agent] Removed completed task from memory:", taskId) + } + }, 5 * 60 * 1000) + this.completionTimers.set(taskId, timer) + } + + private formatDuration(start: Date, end?: Date): string { + const duration = (end ?? new Date()).getTime() - start.getTime() + const seconds = Math.floor(duration / 1000) + const minutes = Math.floor(seconds / 60) + const hours = Math.floor(minutes / 60) + + if (hours > 0) { + return `${hours}h ${minutes % 60}m ${seconds % 60}s` + } else if (minutes > 0) { + return `${minutes}m ${seconds % 60}s` + } + return `${seconds}s` + } + + private hasRunningTasks(): boolean { + for (const task of this.tasks.values()) { + if (task.status === "running") return true + } + return false + } + private pruneStaleTasksAndNotifications(): void { const now = Date.now() - for (const [taskId, task] of this.state.tasks.entries()) { + for (const [taskId, task] of this.tasks.entries()) { const timestamp = task.status === "pending" ? task.queuedAt?.getTime() : task.startedAt?.getTime() @@ -505,15 +1142,19 @@ export class BackgroundManager { this.concurrencyManager.release(task.concurrencyKey) task.concurrencyKey = undefined } - this.state.cleanupPendingByParent(task) - this.state.clearNotificationsForTask(taskId) - this.state.removeTask(taskId) + // Clean up pendingByParent to prevent stale entries + this.cleanupPendingByParent(task) + this.clearNotificationsForTask(taskId) + this.tasks.delete(taskId) + if (task.sessionID) { + subagentSessions.delete(task.sessionID) + } } } - for (const [sessionID, notifications] of this.state.notifications.entries()) { + for (const [sessionID, notifications] of this.notifications.entries()) { if (notifications.length === 0) { - this.state.notifications.delete(sessionID) + this.notifications.delete(sessionID) continue } const validNotifications = notifications.filter((task) => { @@ -522,9 +1163,9 @@ export class BackgroundManager { return age <= TASK_TTL_MS }) if (validNotifications.length === 0) { - this.state.notifications.delete(sessionID) + this.notifications.delete(sessionID) } else if (validNotifications.length !== notifications.length) { - this.state.notifications.set(sessionID, validNotifications) + this.notifications.set(sessionID, validNotifications) } } } @@ -533,7 +1174,7 @@ export class BackgroundManager { const staleTimeoutMs = this.config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS const now = Date.now() - for (const task of this.state.tasks.values()) { + for (const task of this.tasks.values()) { if (task.status !== "running") continue if (!task.progress?.lastUpdate) continue @@ -566,7 +1207,7 @@ export class BackgroundManager { log(`[background-agent] Task ${task.id} interrupted: stale timeout`) try { - await notifyParentSession(task, this.getResultHandlerContext()) + await this.notifyParentSession(task) } catch (err) { log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err }) } @@ -580,7 +1221,7 @@ export class BackgroundManager { const statusResult = await this.client.session.status() const allStatuses = (statusResult.data ?? {}) as Record - for (const task of this.state.tasks.values()) { + for (const task of this.tasks.values()) { if (task.status !== "running") continue const sessionID = task.sessionID @@ -589,22 +1230,25 @@ export class BackgroundManager { try { const sessionStatus = allStatuses[sessionID] + // Don't skip if session not in status - fall through to message-based detection if (sessionStatus?.type === "idle") { - const hasValidOutput = await validateSessionHasOutput(this.client, sessionID) + // Edge guard: Validate session has actual output before completing + const hasValidOutput = await this.validateSessionHasOutput(sessionID) if (!hasValidOutput) { log("[background-agent] Polling idle but no valid output yet, waiting:", task.id) continue } + // Re-check status after async operation if (task.status !== "running") continue - const hasIncompleteTodos = await checkSessionTodos(this.client, sessionID) + const hasIncompleteTodos = await this.checkSessionTodos(sessionID) if (hasIncompleteTodos) { log("[background-agent] Task has incomplete todos via polling, waiting:", task.id) continue } - await tryCompleteTask(task, "polling (idle status)", this.getResultHandlerContext()) + await this.tryCompleteTask(task, "polling (idle status)") continue } @@ -649,6 +1293,7 @@ export class BackgroundManager { task.progress.lastMessageAt = new Date() } + // Stability detection: complete when message count unchanged for 3 polls const currentMsgCount = messages.length const startedAt = task.startedAt if (!startedAt) continue @@ -659,6 +1304,7 @@ export class BackgroundManager { if (task.lastMsgCount === currentMsgCount) { task.stablePolls = (task.stablePolls ?? 0) + 1 if (task.stablePolls >= 3) { + // Re-fetch session status to confirm agent is truly idle const recheckStatus = await this.client.session.status() const recheckData = (recheckStatus.data ?? {}) as Record const currentStatus = recheckData[sessionID] @@ -672,17 +1318,19 @@ export class BackgroundManager { continue } - const hasValidOutput = await validateSessionHasOutput(this.client, sessionID) + // Edge guard: Validate session has actual output before completing + const hasValidOutput = await this.validateSessionHasOutput(sessionID) if (!hasValidOutput) { log("[background-agent] Stability reached but no valid output, waiting:", task.id) continue } + // Re-check status after async operation if (task.status !== "running") continue - const hasIncompleteTodos = await checkSessionTodos(this.client, sessionID) + const hasIncompleteTodos = await this.checkSessionTodos(sessionID) if (!hasIncompleteTodos) { - await tryCompleteTask(task, "stability detection", this.getResultHandlerContext()) + await this.tryCompleteTask(task, "stability detection") continue } } @@ -697,18 +1345,24 @@ export class BackgroundManager { } } - if (!this.state.hasRunningTasks()) { + if (!this.hasRunningTasks()) { this.stopPolling() } } + /** + * Shutdown the manager gracefully. + * Cancels all pending concurrency waiters and clears timers. + * Should be called when the plugin is unloaded. + */ shutdown(): void { if (this.shutdownTriggered) return this.shutdownTriggered = true log("[background-agent] Shutting down BackgroundManager") this.stopPolling() - for (const task of this.state.tasks.values()) { + // Abort all running sessions to prevent zombie processes (#1240) + for (const task of this.tasks.values()) { if (task.status === "running" && task.sessionID) { this.client.session.abort({ path: { id: task.sessionID }, @@ -716,6 +1370,7 @@ export class BackgroundManager { } } + // Notify shutdown listeners (e.g., tmux cleanup) if (this.onShutdown) { try { this.onShutdown() @@ -724,17 +1379,28 @@ export class BackgroundManager { } } - for (const task of this.state.tasks.values()) { + // Release concurrency for all running tasks + for (const task of this.tasks.values()) { if (task.concurrencyKey) { this.concurrencyManager.release(task.concurrencyKey) task.concurrencyKey = undefined } } - this.state.clear() + for (const timer of this.completionTimers.values()) { + clearTimeout(timer) + } + this.completionTimers.clear() + this.concurrencyManager.clear() + this.tasks.clear() + this.notifications.clear() + this.pendingByParent.clear() + this.queuesByKey.clear() + this.processingKeys.clear() this.unregisterProcessCleanup() log("[background-agent] Shutdown complete") + } } @@ -746,6 +1412,8 @@ function registerProcessSignal( const listener = () => { handler() if (exitAfter) { + // Set exitCode and schedule exit after delay to allow other handlers to complete async cleanup + // Use 6s delay to accommodate LSP cleanup (5s timeout + 1s SIGKILL wait) process.exitCode = 0 setTimeout(() => process.exit(), 6000) } @@ -753,3 +1421,17 @@ function registerProcessSignal( process.on(signal, listener) return listener } + + +function getMessageDir(sessionID: string): string | null { + if (!existsSync(MESSAGE_STORAGE)) return null + + const directPath = join(MESSAGE_STORAGE, sessionID) + if (existsSync(directPath)) return directPath + + for (const dir of readdirSync(MESSAGE_STORAGE)) { + const sessionPath = join(MESSAGE_STORAGE, dir, sessionID) + if (existsSync(sessionPath)) return sessionPath + } + return null +}