diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index d8eba0980..789fac212 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -2289,10 +2289,221 @@ describe("BackgroundManager.checkAndInterruptStaleTasks", () => { getTaskMap(manager).set(task.id, task) - await manager["checkAndInterruptStaleTasks"]() + await manager["checkAndInterruptStaleTasks"]() expect(task.status).toBe("cancelled") }) + + test("should NOT interrupt task when session is running, even with stale lastUpdate", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 }) + + const task: BackgroundTask = { + id: "task-running-session", + sessionID: "session-running", + parentSessionID: "parent-rs", + parentMessageID: "msg-rs", + description: "Task with running session", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 300_000), + progress: { + toolCalls: 2, + lastUpdate: new Date(Date.now() - 300_000), + }, + } + + getTaskMap(manager).set(task.id, task) + + //#when — session is actively running + await manager["checkAndInterruptStaleTasks"]({ "session-running": { type: "running" } }) + + //#then — task survives because session is running + expect(task.status).toBe("running") + }) + + test("should interrupt task when session is idle and lastUpdate exceeds stale timeout", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 }) + stubNotifyParentSession(manager) + + const task: BackgroundTask = { + id: "task-idle-session", + sessionID: "session-idle", + parentSessionID: "parent-is", + parentMessageID: "msg-is", + description: "Task with idle session", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 300_000), + progress: { + toolCalls: 2, + lastUpdate: new Date(Date.now() - 300_000), + }, + } + + getTaskMap(manager).set(task.id, task) + + //#when — session is idle + await manager["checkAndInterruptStaleTasks"]({ "session-idle": { type: "idle" } }) + + //#then — killed because session is idle with stale lastUpdate + expect(task.status).toBe("cancelled") + expect(task.error).toContain("Stale timeout") + }) + + test("should NOT interrupt running session even with very old lastUpdate (no safety net)", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { staleTimeoutMs: 180_000 }) + + const task: BackgroundTask = { + id: "task-long-running", + sessionID: "session-long", + parentSessionID: "parent-lr", + parentMessageID: "msg-lr", + description: "Long running task", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 900_000), + progress: { + toolCalls: 5, + lastUpdate: new Date(Date.now() - 900_000), + }, + } + + getTaskMap(manager).set(task.id, task) + + //#when — session is running, lastUpdate 15min old + await manager["checkAndInterruptStaleTasks"]({ "session-long": { type: "running" } }) + + //#then — running sessions are NEVER stale-killed + expect(task.status).toBe("running") + }) + + test("should NOT interrupt running session with no progress (undefined lastUpdate)", async () => { + //#given — no progress at all, but session is running + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 }) + + const task: BackgroundTask = { + id: "task-running-no-progress", + sessionID: "session-rnp", + parentSessionID: "parent-rnp", + parentMessageID: "msg-rnp", + description: "Running no progress", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + } + + getTaskMap(manager).set(task.id, task) + + //#when — session is running despite no progress + await manager["checkAndInterruptStaleTasks"]({ "session-rnp": { type: "running" } }) + + //#then — running sessions are NEVER killed + expect(task.status).toBe("running") + }) + + test("should interrupt task with no lastUpdate after messageStalenessTimeout", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 }) + stubNotifyParentSession(manager) + + const task: BackgroundTask = { + id: "task-no-update", + sessionID: "session-no-update", + parentSessionID: "parent-nu", + parentMessageID: "msg-nu", + description: "No update task", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + } + + getTaskMap(manager).set(task.id, task) + + //#when — no progress update for 15 minutes + await manager["checkAndInterruptStaleTasks"]({}) + + //#then — killed after messageStalenessTimeout + expect(task.status).toBe("cancelled") + expect(task.error).toContain("no activity") + }) + + test("should NOT interrupt task with no lastUpdate within messageStalenessTimeout", async () => { + //#given + const client = { + session: { + prompt: async () => ({}), + promptAsync: async () => ({}), + abort: async () => ({}), + }, + } + const manager = new BackgroundManager({ client, directory: tmpdir() } as unknown as PluginInput, { messageStalenessTimeoutMs: 600_000 }) + + const task: BackgroundTask = { + id: "task-fresh-no-update", + sessionID: "session-fresh", + parentSessionID: "parent-fn", + parentMessageID: "msg-fn", + description: "Fresh no-update task", + prompt: "Test", + agent: "test-agent", + status: "running", + startedAt: new Date(Date.now() - 5 * 60 * 1000), + progress: undefined, + } + + getTaskMap(manager).set(task.id, task) + + //#when — only 5 min since start, within 10min timeout + await manager["checkAndInterruptStaleTasks"]({}) + + //#then — task survives + expect(task.status).toBe("running") + }) }) describe("BackgroundManager.shutdown session abort", () => { diff --git a/src/features/background-agent/manager.ts b/src/features/background-agent/manager.ts index 3a680dd6a..7d75af85b 100644 --- a/src/features/background-agent/manager.ts +++ b/src/features/background-agent/manager.ts @@ -12,6 +12,7 @@ import { ConcurrencyManager } from "./concurrency" import type { BackgroundTaskConfig, TmuxConfig } from "../../config/schema" import { isInsideTmux } from "../../shared/tmux" import { + DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS, DEFAULT_STALE_TIMEOUT_MS, MIN_IDLE_TIME_MS, MIN_RUNTIME_BEFORE_STALE_MS, @@ -1437,24 +1438,54 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea } } - private async checkAndInterruptStaleTasks(): Promise { + private async checkAndInterruptStaleTasks( + allStatuses: Record = {}, + ): Promise { const staleTimeoutMs = this.config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS + const messageStalenessMs = this.config?.messageStalenessTimeoutMs ?? DEFAULT_MESSAGE_STALENESS_TIMEOUT_MS const now = Date.now() for (const task of this.tasks.values()) { if (task.status !== "running") continue - if (!task.progress?.lastUpdate) continue - + const startedAt = task.startedAt const sessionID = task.sessionID if (!startedAt || !sessionID) continue + const sessionIsRunning = allStatuses[sessionID]?.type === "running" const runtime = now - startedAt.getTime() + + if (!task.progress?.lastUpdate) { + if (sessionIsRunning) continue + if (runtime <= messageStalenessMs) continue + + const staleMinutes = Math.round(runtime / 60000) + task.status = "cancelled" + task.error = `Stale timeout (no activity for ${staleMinutes}min since start)` + task.completedAt = new Date() + + if (task.concurrencyKey) { + this.concurrencyManager.release(task.concurrencyKey) + task.concurrencyKey = undefined + } + + this.client.session.abort({ path: { id: sessionID } }).catch(() => {}) + log(`[background-agent] Task ${task.id} interrupted: no progress since start`) + + try { + await this.enqueueNotificationForParent(task.parentSessionID, () => this.notifyParentSession(task)) + } catch (err) { + log("[background-agent] Error in notifyParentSession for stale task:", { taskId: task.id, error: err }) + } + continue + } + + if (sessionIsRunning) continue + if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime() if (timeSinceLastUpdate <= staleTimeoutMs) continue - if (task.status !== "running") continue const staleMinutes = Math.round(timeSinceLastUpdate / 60000) @@ -1467,10 +1498,7 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea task.concurrencyKey = undefined } - this.client.session.abort({ - path: { id: sessionID }, - }).catch(() => {}) - + this.client.session.abort({ path: { id: sessionID } }).catch(() => {}) log(`[background-agent] Task ${task.id} interrupted: stale timeout`) try { @@ -1483,11 +1511,12 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea private async pollRunningTasks(): Promise { this.pruneStaleTasksAndNotifications() - await this.checkAndInterruptStaleTasks() const statusResult = await this.client.session.status() const allStatuses = (statusResult.data ?? {}) as Record + await this.checkAndInterruptStaleTasks(allStatuses) + for (const task of this.tasks.values()) { if (task.status !== "running") continue @@ -1497,7 +1526,6 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea try { const sessionStatus = allStatuses[sessionID] - // Don't skip if session not in status - fall through to message-based detection if (sessionStatus?.type === "idle") { // Edge guard: Validate session has actual output before completing const hasValidOutput = await this.validateSessionHasOutput(sessionID) diff --git a/src/features/background-agent/poll-running-tasks.ts b/src/features/background-agent/poll-running-tasks.ts index 6c5a4461d..023fbf556 100644 --- a/src/features/background-agent/poll-running-tasks.ts +++ b/src/features/background-agent/poll-running-tasks.ts @@ -34,7 +34,7 @@ export async function pollRunningTasks(args: { tasks: Iterable client: OpencodeClient pruneStaleTasksAndNotifications: () => void - checkAndInterruptStaleTasks: () => Promise + checkAndInterruptStaleTasks: (statuses: Record) => Promise validateSessionHasOutput: (sessionID: string) => Promise checkSessionTodos: (sessionID: string) => Promise tryCompleteTask: (task: BackgroundTask, source: string) => Promise @@ -54,11 +54,12 @@ export async function pollRunningTasks(args: { } = args pruneStaleTasksAndNotifications() - await checkAndInterruptStaleTasks() const statusResult = await client.session.status() const allStatuses = ((statusResult as { data?: unknown }).data ?? {}) as SessionStatusMap + await checkAndInterruptStaleTasks(allStatuses) + for (const task of tasks) { if (task.status !== "running") continue diff --git a/src/features/background-agent/task-poller.test.ts b/src/features/background-agent/task-poller.test.ts index 29ff47447..128ef978d 100644 --- a/src/features/background-agent/task-poller.test.ts +++ b/src/features/background-agent/task-poller.test.ts @@ -136,6 +136,125 @@ describe("checkAndInterruptStaleTasks", () => { expect(task.error).toContain("no activity") }) + it("should NOT interrupt task when session is running, even if lastUpdate exceeds stale timeout", async () => { + //#given — lastUpdate is 5min old but session is actively running + const task = createRunningTask({ + startedAt: new Date(Date.now() - 300_000), + progress: { + toolCalls: 2, + lastUpdate: new Date(Date.now() - 300_000), + }, + }) + + //#when — session status is "running" + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + sessionStatuses: { "ses-1": { type: "running" } }, + }) + + //#then — task should survive because session is actively running + expect(task.status).toBe("running") + }) + + it("should interrupt task when session is idle and lastUpdate exceeds stale timeout", async () => { + //#given — lastUpdate is 5min old and session is idle + const task = createRunningTask({ + startedAt: new Date(Date.now() - 300_000), + progress: { + toolCalls: 2, + lastUpdate: new Date(Date.now() - 300_000), + }, + }) + + //#when — session status is "idle" + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + sessionStatuses: { "ses-1": { type: "idle" } }, + }) + + //#then — task should be killed because session is idle with stale lastUpdate + expect(task.status).toBe("cancelled") + expect(task.error).toContain("Stale timeout") + }) + + it("should NOT interrupt running session task even with very old lastUpdate", async () => { + //#given — lastUpdate is 15min old, but session is still running + const task = createRunningTask({ + startedAt: new Date(Date.now() - 900_000), + progress: { + toolCalls: 2, + lastUpdate: new Date(Date.now() - 900_000), + }, + }) + + //#when — session running, lastUpdate far exceeds any timeout + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000, messageStalenessTimeoutMs: 600_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + sessionStatuses: { "ses-1": { type: "running" } }, + }) + + //#then — running sessions are NEVER stale-killed (babysitter + TTL prune handle these) + expect(task.status).toBe("running") + }) + + it("should NOT interrupt running session even with no progress (undefined lastUpdate)", async () => { + //#given — task has no progress at all, but session is running + const task = createRunningTask({ + startedAt: new Date(Date.now() - 15 * 60 * 1000), + progress: undefined, + }) + + //#when — session is running + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { messageStalenessTimeoutMs: 600_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + sessionStatuses: { "ses-1": { type: "running" } }, + }) + + //#then — running sessions are NEVER killed, even without progress + expect(task.status).toBe("running") + }) + + it("should use default stale timeout when session status is unknown/missing", async () => { + //#given — lastUpdate exceeds stale timeout, session not in status map + const task = createRunningTask({ + startedAt: new Date(Date.now() - 300_000), + progress: { + toolCalls: 1, + lastUpdate: new Date(Date.now() - 200_000), + }, + }) + + //#when — empty sessionStatuses (session not found) + await checkAndInterruptStaleTasks({ + tasks: [task], + client: mockClient as never, + config: { staleTimeoutMs: 180_000 }, + concurrencyManager: mockConcurrencyManager as never, + notifyParentSession: mockNotify, + sessionStatuses: {}, + }) + + //#then — unknown session treated as potentially stale, apply default timeout + expect(task.status).toBe("cancelled") + expect(task.error).toContain("Stale timeout") + }) + it("should release concurrency key when interrupting a never-updated task", async () => { //#given const releaseMock = mock(() => {}) diff --git a/src/features/background-agent/task-poller.ts b/src/features/background-agent/task-poller.ts index a9f63a9a8..c8cafd724 100644 --- a/src/features/background-agent/task-poller.ts +++ b/src/features/background-agent/task-poller.ts @@ -57,14 +57,17 @@ export function pruneStaleTasksAndNotifications(args: { } } +export type SessionStatusMap = Record + export async function checkAndInterruptStaleTasks(args: { tasks: Iterable client: OpencodeClient config: BackgroundTaskConfig | undefined concurrencyManager: ConcurrencyManager notifyParentSession: (task: BackgroundTask) => Promise + sessionStatuses?: SessionStatusMap }): Promise { - const { tasks, client, config, concurrencyManager, notifyParentSession } = args + const { tasks, client, config, concurrencyManager, notifyParentSession, sessionStatuses } = args const staleTimeoutMs = config?.staleTimeoutMs ?? DEFAULT_STALE_TIMEOUT_MS const now = Date.now() @@ -77,9 +80,11 @@ export async function checkAndInterruptStaleTasks(args: { const sessionID = task.sessionID if (!startedAt || !sessionID) continue + const sessionIsRunning = sessionStatuses?.[sessionID]?.type === "running" const runtime = now - startedAt.getTime() if (!task.progress?.lastUpdate) { + if (sessionIsRunning) continue if (runtime <= messageStalenessMs) continue const staleMinutes = Math.round(runtime / 60000) @@ -103,6 +108,8 @@ export async function checkAndInterruptStaleTasks(args: { continue } + if (sessionIsRunning) continue + if (runtime < MIN_RUNTIME_BEFORE_STALE_MS) continue const timeSinceLastUpdate = now - task.progress.lastUpdate.getTime()