test(background-agent): add non-blocking queue tests
This commit is contained in:
@@ -47,8 +47,10 @@ class MockBackgroundManager {
|
||||
|
||||
for (const child of directChildren) {
|
||||
result.push(child)
|
||||
const descendants = this.getAllDescendantTasks(child.sessionID)
|
||||
result.push(...descendants)
|
||||
if (child.sessionID) {
|
||||
const descendants = this.getAllDescendantTasks(child.sessionID)
|
||||
result.push(...descendants)
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
@@ -81,6 +83,7 @@ class MockBackgroundManager {
|
||||
let prunedNotifications = 0
|
||||
|
||||
for (const [taskId, task] of this.tasks.entries()) {
|
||||
if (!task.startedAt) continue
|
||||
const age = now - task.startedAt.getTime()
|
||||
if (age > TASK_TTL_MS) {
|
||||
prunedTasks.push(taskId)
|
||||
@@ -95,6 +98,7 @@ class MockBackgroundManager {
|
||||
continue
|
||||
}
|
||||
const validNotifications = notifications.filter((task) => {
|
||||
if (!task.startedAt) return false
|
||||
const age = now - task.startedAt.getTime()
|
||||
return age <= TASK_TTL_MS
|
||||
})
|
||||
@@ -1147,6 +1151,531 @@ describe("BackgroundManager process cleanup", () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager - Non-blocking Queue Integration", () => {
|
||||
let manager: BackgroundManager
|
||||
let mockClient: ReturnType<typeof createMockClient>
|
||||
|
||||
function createMockClient() {
|
||||
return {
|
||||
session: {
|
||||
create: async () => ({ data: { id: `ses_${crypto.randomUUID()}` } }),
|
||||
get: async () => ({ data: { directory: "/test/dir" } }),
|
||||
prompt: async () => ({}),
|
||||
messages: async () => ({ data: [] }),
|
||||
todo: async () => ({ data: [] }),
|
||||
status: async () => ({ data: {} }),
|
||||
abort: async () => ({}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
// #given
|
||||
mockClient = createMockClient()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
manager.shutdown()
|
||||
})
|
||||
|
||||
describe("launch() returns immediately with pending status", () => {
|
||||
test("should return task with pending status immediately", async () => {
|
||||
// #given
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task = await manager.launch(input)
|
||||
|
||||
// #then
|
||||
expect(task.status).toBe("pending")
|
||||
expect(task.id).toMatch(/^bg_/)
|
||||
expect(task.description).toBe("Test task")
|
||||
expect(task.agent).toBe("test-agent")
|
||||
expect(task.queuedAt).toBeInstanceOf(Date)
|
||||
expect(task.startedAt).toBeUndefined()
|
||||
expect(task.sessionID).toBeUndefined()
|
||||
})
|
||||
|
||||
test("should return immediately even with concurrency limit", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const startTime = Date.now()
|
||||
const task1 = await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
const endTime = Date.now()
|
||||
|
||||
// #then
|
||||
expect(endTime - startTime).toBeLessThan(100) // Should be instant
|
||||
expect(task1.status).toBe("pending")
|
||||
expect(task2.status).toBe("pending")
|
||||
})
|
||||
|
||||
test("should queue multiple tasks without blocking", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 2 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const tasks = await Promise.all([
|
||||
manager.launch(input),
|
||||
manager.launch(input),
|
||||
manager.launch(input),
|
||||
manager.launch(input),
|
||||
manager.launch(input),
|
||||
])
|
||||
|
||||
// #then
|
||||
expect(tasks).toHaveLength(5)
|
||||
tasks.forEach(task => {
|
||||
expect(task.status).toBe("pending")
|
||||
expect(task.queuedAt).toBeInstanceOf(Date)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("task transitions pending→running when slot available", () => {
|
||||
test("should transition first task to running immediately", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 5 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task = await manager.launch(input)
|
||||
|
||||
// Give processKey time to run
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then
|
||||
const updatedTask = manager.getTask(task.id)
|
||||
expect(updatedTask?.status).toBe("running")
|
||||
expect(updatedTask?.startedAt).toBeInstanceOf(Date)
|
||||
expect(updatedTask?.sessionID).toBeDefined()
|
||||
expect(updatedTask?.sessionID).toBeTruthy()
|
||||
})
|
||||
|
||||
test("should set startedAt when transitioning to running", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 5 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task = await manager.launch(input)
|
||||
const queuedAt = task.queuedAt
|
||||
|
||||
// Wait for transition
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then
|
||||
const updatedTask = manager.getTask(task.id)
|
||||
expect(updatedTask?.startedAt).toBeInstanceOf(Date)
|
||||
if (updatedTask?.startedAt && queuedAt) {
|
||||
expect(updatedTask.startedAt.getTime()).toBeGreaterThanOrEqual(queuedAt.getTime())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("pending task can be cancelled", () => {
|
||||
test("should cancel pending task successfully", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
const task1 = await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
|
||||
// Wait for first task to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #when
|
||||
const cancelled = manager.cancelPendingTask(task2.id)
|
||||
|
||||
// #then
|
||||
expect(cancelled).toBe(true)
|
||||
const updatedTask2 = manager.getTask(task2.id)
|
||||
expect(updatedTask2?.status).toBe("cancelled")
|
||||
expect(updatedTask2?.completedAt).toBeInstanceOf(Date)
|
||||
})
|
||||
|
||||
test("should not cancel running task", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 5 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
const task = await manager.launch(input)
|
||||
|
||||
// Wait for task to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #when
|
||||
const cancelled = manager.cancelPendingTask(task.id)
|
||||
|
||||
// #then
|
||||
expect(cancelled).toBe(false)
|
||||
const updatedTask = manager.getTask(task.id)
|
||||
expect(updatedTask?.status).toBe("running")
|
||||
})
|
||||
|
||||
test("should remove cancelled task from queue", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
const task1 = await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
const task3 = await manager.launch(input)
|
||||
|
||||
// Wait for first task to start
|
||||
await new Promise(resolve => setTimeout(resolve, 100))
|
||||
|
||||
// #when - cancel middle task
|
||||
const cancelledTask2 = manager.getTask(task2.id)
|
||||
expect(cancelledTask2?.status).toBe("pending")
|
||||
|
||||
manager.cancelPendingTask(task2.id)
|
||||
|
||||
const afterCancel = manager.getTask(task2.id)
|
||||
expect(afterCancel?.status).toBe("cancelled")
|
||||
|
||||
// #then - verify task3 is still pending (task1 still running)
|
||||
const task3BeforeRelease = manager.getTask(task3.id)
|
||||
expect(task3BeforeRelease?.status).toBe("pending")
|
||||
})
|
||||
})
|
||||
|
||||
describe("multiple keys process in parallel", () => {
|
||||
test("should process different concurrency keys in parallel", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input1 = {
|
||||
description: "Task 1",
|
||||
prompt: "Do something",
|
||||
agent: "agent-a",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
const input2 = {
|
||||
description: "Task 2",
|
||||
prompt: "Do something else",
|
||||
agent: "agent-b",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task1 = await manager.launch(input1)
|
||||
const task2 = await manager.launch(input2)
|
||||
|
||||
// Wait for both to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then - both should be running despite limit of 1 (different keys)
|
||||
const updatedTask1 = manager.getTask(task1.id)
|
||||
const updatedTask2 = manager.getTask(task2.id)
|
||||
|
||||
expect(updatedTask1?.status).toBe("running")
|
||||
expect(updatedTask2?.status).toBe("running")
|
||||
})
|
||||
|
||||
test("should respect per-key concurrency limits", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task1 = await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
|
||||
// Wait for processing
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then - same key should respect limit
|
||||
const updatedTask1 = manager.getTask(task1.id)
|
||||
const updatedTask2 = manager.getTask(task2.id)
|
||||
|
||||
expect(updatedTask1?.status).toBe("running")
|
||||
expect(updatedTask2?.status).toBe("pending")
|
||||
})
|
||||
|
||||
test("should process model-based keys in parallel", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input1 = {
|
||||
description: "Task 1",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
model: { providerID: "anthropic", modelID: "claude-opus-4-5" },
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
const input2 = {
|
||||
description: "Task 2",
|
||||
prompt: "Do something else",
|
||||
agent: "test-agent",
|
||||
model: { providerID: "openai", modelID: "gpt-5.2" },
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task1 = await manager.launch(input1)
|
||||
const task2 = await manager.launch(input2)
|
||||
|
||||
// Wait for both to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then - different models should run in parallel
|
||||
const updatedTask1 = manager.getTask(task1.id)
|
||||
const updatedTask2 = manager.getTask(task2.id)
|
||||
|
||||
expect(updatedTask1?.status).toBe("running")
|
||||
expect(updatedTask2?.status).toBe("running")
|
||||
})
|
||||
})
|
||||
|
||||
describe("TTL uses queuedAt for pending, startedAt for running", () => {
|
||||
test("should use queuedAt for pending task TTL", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// Launch two tasks (second will be pending)
|
||||
await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
|
||||
// Wait for first to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #when
|
||||
const pendingTask = manager.getTask(task2.id)
|
||||
|
||||
// #then
|
||||
expect(pendingTask?.status).toBe("pending")
|
||||
expect(pendingTask?.queuedAt).toBeInstanceOf(Date)
|
||||
expect(pendingTask?.startedAt).toBeUndefined()
|
||||
|
||||
// Verify TTL would use queuedAt (implementation detail check)
|
||||
const now = Date.now()
|
||||
const age = now - pendingTask!.queuedAt!.getTime()
|
||||
expect(age).toBeGreaterThanOrEqual(0)
|
||||
})
|
||||
|
||||
test("should use startedAt for running task TTL", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 5 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const task = await manager.launch(input)
|
||||
|
||||
// Wait for task to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// #then
|
||||
const runningTask = manager.getTask(task.id)
|
||||
expect(runningTask?.status).toBe("running")
|
||||
expect(runningTask?.startedAt).toBeInstanceOf(Date)
|
||||
|
||||
// Verify TTL would use startedAt (implementation detail check)
|
||||
const now = Date.now()
|
||||
const age = now - runningTask!.startedAt!.getTime()
|
||||
expect(age).toBeGreaterThanOrEqual(0)
|
||||
})
|
||||
|
||||
test("should have different timestamps for queuedAt and startedAt", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 1 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// Launch task that will queue
|
||||
await manager.launch(input)
|
||||
const task2 = await manager.launch(input)
|
||||
|
||||
const queuedAt = task2.queuedAt!
|
||||
|
||||
// Wait for first task to complete and second to start
|
||||
await new Promise(resolve => setTimeout(resolve, 50))
|
||||
|
||||
// Simulate first task completion
|
||||
const tasks = Array.from(getTaskMap(manager).values())
|
||||
const runningTask = tasks.find(t => t.status === "running" && t.id !== task2.id)
|
||||
if (runningTask?.concurrencyKey) {
|
||||
runningTask.status = "completed"
|
||||
getConcurrencyManager(manager).release(runningTask.concurrencyKey)
|
||||
}
|
||||
|
||||
// Wait for second task to start
|
||||
await new Promise(resolve => setTimeout(resolve, 100))
|
||||
|
||||
// #then
|
||||
const startedTask = manager.getTask(task2.id)
|
||||
if (startedTask?.status === "running" && startedTask.startedAt) {
|
||||
expect(startedTask.startedAt).toBeInstanceOf(Date)
|
||||
expect(startedTask.startedAt.getTime()).toBeGreaterThan(queuedAt.getTime())
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe("manual verification scenario", () => {
|
||||
test("should handle 10 tasks with limit 5 returning immediately", async () => {
|
||||
// #given
|
||||
const config = { defaultConcurrency: 5 }
|
||||
manager.shutdown()
|
||||
manager = new BackgroundManager({ client: mockClient, directory: tmpdir() } as unknown as PluginInput, config)
|
||||
|
||||
const input = {
|
||||
description: "Test task",
|
||||
prompt: "Do something",
|
||||
agent: "test-agent",
|
||||
parentSessionID: "parent-session",
|
||||
parentMessageID: "parent-message",
|
||||
}
|
||||
|
||||
// #when
|
||||
const startTime = Date.now()
|
||||
const tasks = await Promise.all(
|
||||
Array.from({ length: 10 }, () => manager.launch(input))
|
||||
)
|
||||
const endTime = Date.now()
|
||||
|
||||
// #then
|
||||
expect(endTime - startTime).toBeLessThan(200) // Should be very fast
|
||||
expect(tasks).toHaveLength(10)
|
||||
tasks.forEach(task => {
|
||||
expect(task.status).toBe("pending")
|
||||
expect(task.id).toMatch(/^bg_/)
|
||||
})
|
||||
|
||||
// Wait for processing
|
||||
await new Promise(resolve => setTimeout(resolve, 100))
|
||||
|
||||
// Verify 5 running, 5 pending
|
||||
const updatedTasks = tasks.map(t => manager.getTask(t.id))
|
||||
const runningCount = updatedTasks.filter(t => t?.status === "running").length
|
||||
const pendingCount = updatedTasks.filter(t => t?.status === "pending").length
|
||||
|
||||
expect(runningCount).toBe(5)
|
||||
expect(pendingCount).toBe(5)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("BackgroundManager.checkAndInterruptStaleTasks", () => {
|
||||
test("should NOT interrupt task running less than 30 seconds (min runtime guard)", async () => {
|
||||
const client = {
|
||||
|
||||
Reference in New Issue
Block a user