diff --git a/src/features/background-agent/manager.test.ts b/src/features/background-agent/manager.test.ts index 036678647..3ac87fc0d 100644 --- a/src/features/background-agent/manager.test.ts +++ b/src/features/background-agent/manager.test.ts @@ -47,8 +47,10 @@ class MockBackgroundManager { for (const child of directChildren) { result.push(child) - const descendants = this.getAllDescendantTasks(child.sessionID) - result.push(...descendants) + if (child.sessionID) { + const descendants = this.getAllDescendantTasks(child.sessionID) + result.push(...descendants) + } } return result @@ -81,6 +83,7 @@ class MockBackgroundManager { let prunedNotifications = 0 for (const [taskId, task] of this.tasks.entries()) { + if (!task.startedAt) continue const age = now - task.startedAt.getTime() if (age > TASK_TTL_MS) { prunedTasks.push(taskId) @@ -95,6 +98,7 @@ class MockBackgroundManager { continue } const validNotifications = notifications.filter((task) => { + if (!task.startedAt) return false const age = now - task.startedAt.getTime() return age <= TASK_TTL_MS }) @@ -1147,6 +1151,531 @@ describe("BackgroundManager process cleanup", () => { }) }) +describe("BackgroundManager - Non-blocking Queue Integration", () => { + let manager: BackgroundManager + let mockClient: ReturnType + + function createMockClient() { + return { + session: { + create: async () => ({ data: { id: `ses_${crypto.randomUUID()}` } }), + get: async () => ({ data: { directory: "/test/dir" } }), + prompt: async () => ({}), + messages: async () => ({ data: [] }), + todo: async () => ({ data: [] }), + status: async () => ({ data: {} }), + abort: async () => ({}), + }, + } + } + + beforeEach(() => { + // #given + mockClient = createMockClient() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput) + }) + + afterEach(() => { + manager.shutdown() + }) + + describe("launch() returns immediately with pending status", () => { + test("should return task with pending status immediately", async () => { + // #given + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task = await manager.launch(input) + + // #then + expect(task.status).toBe("pending") + expect(task.id).toMatch(/^bg_/) + expect(task.description).toBe("Test task") + expect(task.agent).toBe("test-agent") + expect(task.queuedAt).toBeInstanceOf(Date) + expect(task.startedAt).toBeUndefined() + expect(task.sessionID).toBeUndefined() + }) + + test("should return immediately even with concurrency limit", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const startTime = Date.now() + const task1 = await manager.launch(input) + const task2 = await manager.launch(input) + const endTime = Date.now() + + // #then + expect(endTime - startTime).toBeLessThan(100) // Should be instant + expect(task1.status).toBe("pending") + expect(task2.status).toBe("pending") + }) + + test("should queue multiple tasks without blocking", async () => { + // #given + const config = { defaultConcurrency: 2 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const tasks = await Promise.all([ + manager.launch(input), + manager.launch(input), + manager.launch(input), + manager.launch(input), + manager.launch(input), + ]) + + // #then + expect(tasks).toHaveLength(5) + tasks.forEach(task => { + expect(task.status).toBe("pending") + expect(task.queuedAt).toBeInstanceOf(Date) + }) + }) + }) + + describe("task transitions pending→running when slot available", () => { + test("should transition first task to running immediately", async () => { + // #given + const config = { defaultConcurrency: 5 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task = await manager.launch(input) + + // Give processKey time to run + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then + const updatedTask = manager.getTask(task.id) + expect(updatedTask?.status).toBe("running") + expect(updatedTask?.startedAt).toBeInstanceOf(Date) + expect(updatedTask?.sessionID).toBeDefined() + expect(updatedTask?.sessionID).toBeTruthy() + }) + + test("should set startedAt when transitioning to running", async () => { + // #given + const config = { defaultConcurrency: 5 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task = await manager.launch(input) + const queuedAt = task.queuedAt + + // Wait for transition + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then + const updatedTask = manager.getTask(task.id) + expect(updatedTask?.startedAt).toBeInstanceOf(Date) + if (updatedTask?.startedAt && queuedAt) { + expect(updatedTask.startedAt.getTime()).toBeGreaterThanOrEqual(queuedAt.getTime()) + } + }) + }) + + describe("pending task can be cancelled", () => { + test("should cancel pending task successfully", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + const task1 = await manager.launch(input) + const task2 = await manager.launch(input) + + // Wait for first task to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #when + const cancelled = manager.cancelPendingTask(task2.id) + + // #then + expect(cancelled).toBe(true) + const updatedTask2 = manager.getTask(task2.id) + expect(updatedTask2?.status).toBe("cancelled") + expect(updatedTask2?.completedAt).toBeInstanceOf(Date) + }) + + test("should not cancel running task", async () => { + // #given + const config = { defaultConcurrency: 5 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + const task = await manager.launch(input) + + // Wait for task to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #when + const cancelled = manager.cancelPendingTask(task.id) + + // #then + expect(cancelled).toBe(false) + const updatedTask = manager.getTask(task.id) + expect(updatedTask?.status).toBe("running") + }) + + test("should remove cancelled task from queue", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + const task1 = await manager.launch(input) + const task2 = await manager.launch(input) + const task3 = await manager.launch(input) + + // Wait for first task to start + await new Promise(resolve => setTimeout(resolve, 100)) + + // #when - cancel middle task + const cancelledTask2 = manager.getTask(task2.id) + expect(cancelledTask2?.status).toBe("pending") + + manager.cancelPendingTask(task2.id) + + const afterCancel = manager.getTask(task2.id) + expect(afterCancel?.status).toBe("cancelled") + + // #then - verify task3 is still pending (task1 still running) + const task3BeforeRelease = manager.getTask(task3.id) + expect(task3BeforeRelease?.status).toBe("pending") + }) + }) + + describe("multiple keys process in parallel", () => { + test("should process different concurrency keys in parallel", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input1 = { + description: "Task 1", + prompt: "Do something", + agent: "agent-a", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + const input2 = { + description: "Task 2", + prompt: "Do something else", + agent: "agent-b", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task1 = await manager.launch(input1) + const task2 = await manager.launch(input2) + + // Wait for both to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then - both should be running despite limit of 1 (different keys) + const updatedTask1 = manager.getTask(task1.id) + const updatedTask2 = manager.getTask(task2.id) + + expect(updatedTask1?.status).toBe("running") + expect(updatedTask2?.status).toBe("running") + }) + + test("should respect per-key concurrency limits", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task1 = await manager.launch(input) + const task2 = await manager.launch(input) + + // Wait for processing + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then - same key should respect limit + const updatedTask1 = manager.getTask(task1.id) + const updatedTask2 = manager.getTask(task2.id) + + expect(updatedTask1?.status).toBe("running") + expect(updatedTask2?.status).toBe("pending") + }) + + test("should process model-based keys in parallel", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input1 = { + description: "Task 1", + prompt: "Do something", + agent: "test-agent", + model: { providerID: "anthropic", modelID: "claude-opus-4-5" }, + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + const input2 = { + description: "Task 2", + prompt: "Do something else", + agent: "test-agent", + model: { providerID: "openai", modelID: "gpt-5.2" }, + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task1 = await manager.launch(input1) + const task2 = await manager.launch(input2) + + // Wait for both to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then - different models should run in parallel + const updatedTask1 = manager.getTask(task1.id) + const updatedTask2 = manager.getTask(task2.id) + + expect(updatedTask1?.status).toBe("running") + expect(updatedTask2?.status).toBe("running") + }) + }) + + describe("TTL uses queuedAt for pending, startedAt for running", () => { + test("should use queuedAt for pending task TTL", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // Launch two tasks (second will be pending) + await manager.launch(input) + const task2 = await manager.launch(input) + + // Wait for first to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #when + const pendingTask = manager.getTask(task2.id) + + // #then + expect(pendingTask?.status).toBe("pending") + expect(pendingTask?.queuedAt).toBeInstanceOf(Date) + expect(pendingTask?.startedAt).toBeUndefined() + + // Verify TTL would use queuedAt (implementation detail check) + const now = Date.now() + const age = now - pendingTask!.queuedAt!.getTime() + expect(age).toBeGreaterThanOrEqual(0) + }) + + test("should use startedAt for running task TTL", async () => { + // #given + const config = { defaultConcurrency: 5 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const task = await manager.launch(input) + + // Wait for task to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // #then + const runningTask = manager.getTask(task.id) + expect(runningTask?.status).toBe("running") + expect(runningTask?.startedAt).toBeInstanceOf(Date) + + // Verify TTL would use startedAt (implementation detail check) + const now = Date.now() + const age = now - runningTask!.startedAt!.getTime() + expect(age).toBeGreaterThanOrEqual(0) + }) + + test("should have different timestamps for queuedAt and startedAt", async () => { + // #given + const config = { defaultConcurrency: 1 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // Launch task that will queue + await manager.launch(input) + const task2 = await manager.launch(input) + + const queuedAt = task2.queuedAt! + + // Wait for first task to complete and second to start + await new Promise(resolve => setTimeout(resolve, 50)) + + // Simulate first task completion + const tasks = Array.from(getTaskMap(manager).values()) + const runningTask = tasks.find(t => t.status === "running" && t.id !== task2.id) + if (runningTask?.concurrencyKey) { + runningTask.status = "completed" + getConcurrencyManager(manager).release(runningTask.concurrencyKey) + } + + // Wait for second task to start + await new Promise(resolve => setTimeout(resolve, 100)) + + // #then + const startedTask = manager.getTask(task2.id) + if (startedTask?.status === "running" && startedTask.startedAt) { + expect(startedTask.startedAt).toBeInstanceOf(Date) + expect(startedTask.startedAt.getTime()).toBeGreaterThan(queuedAt.getTime()) + } + }) + }) + + describe("manual verification scenario", () => { + test("should handle 10 tasks with limit 5 returning immediately", async () => { + // #given + const config = { defaultConcurrency: 5 } + manager.shutdown() + manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config) + + const input = { + description: "Test task", + prompt: "Do something", + agent: "test-agent", + parentSessionID: "parent-session", + parentMessageID: "parent-message", + } + + // #when + const startTime = Date.now() + const tasks = await Promise.all( + Array.from({ length: 10 }, () => manager.launch(input)) + ) + const endTime = Date.now() + + // #then + expect(endTime - startTime).toBeLessThan(200) // Should be very fast + expect(tasks).toHaveLength(10) + tasks.forEach(task => { + expect(task.status).toBe("pending") + expect(task.id).toMatch(/^bg_/) + }) + + // Wait for processing + await new Promise(resolve => setTimeout(resolve, 100)) + + // Verify 5 running, 5 pending + const updatedTasks = tasks.map(t => manager.getTask(t.id)) + const runningCount = updatedTasks.filter(t => t?.status === "running").length + const pendingCount = updatedTasks.filter(t => t?.status === "pending").length + + expect(runningCount).toBe(5) + expect(pendingCount).toBe(5) + }) + }) +}) + describe("BackgroundManager.checkAndInterruptStaleTasks", () => { test("should NOT interrupt task running less than 30 seconds (min runtime guard)", async () => { const client = {