refactor(background-agent): optimize lifecycle and simplify tools (#1411)
* refactor(background-agent): optimize cache timer lifecycle and result handling Ultraworked with Sisyphus Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai> * refactor(background-task): simplify tool implementation and expand test coverage Ultraworked with Sisyphus Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai> * fix(background-task): fix BackgroundCancel tool parameter handling Correct parameter names and types in BackgroundCancel tool to match actual usage patterns. Add comprehensive test coverage for parameter validation. --------- Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -184,6 +184,10 @@ function getTaskMap(manager: BackgroundManager): Map<string, BackgroundTask> {
|
||||
return (manager as unknown as { tasks: Map<string, BackgroundTask> }).tasks
|
||||
}
|
||||
|
||||
function getPendingByParent(manager: BackgroundManager): Map<string, Set<string>> {
|
||||
return (manager as unknown as { pendingByParent: Map<string, Set<string>> }).pendingByParent
|
||||
}
|
||||
|
||||
async function tryCompleteTaskForTest(manager: BackgroundManager, task: BackgroundTask): Promise<boolean> {
|
||||
return (manager as unknown as { tryCompleteTask: (task: BackgroundTask, source: string) => Promise<boolean> })
|
||||
.tryCompleteTask(task, "test")
|
||||
@@ -1454,6 +1458,44 @@ describe("BackgroundManager - Non-blocking Queue Integration", () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe("cancelTask", () => {
|
||||
test("should cancel running task and release concurrency", async () => {
|
||||
// given
|
||||
const manager = createBackgroundManager()
|
||||
stubNotifyParentSession(manager)
|
||||
|
||||
const concurrencyManager = getConcurrencyManager(manager)
|
||||
const concurrencyKey = "test-provider/test-model"
|
||||
await concurrencyManager.acquire(concurrencyKey)
|
||||
|
||||
const task = createMockTask({
|
||||
id: "task-cancel-running",
|
||||
sessionID: "session-cancel-running",
|
||||
parentSessionID: "parent-cancel",
|
||||
status: "running",
|
||||
concurrencyKey,
|
||||
})
|
||||
|
||||
getTaskMap(manager).set(task.id, task)
|
||||
const pendingByParent = getPendingByParent(manager)
|
||||
pendingByParent.set(task.parentSessionID, new Set([task.id]))
|
||||
|
||||
// when
|
||||
const cancelled = await manager.cancelTask(task.id, { source: "test" })
|
||||
|
||||
// then
|
||||
const updatedTask = manager.getTask(task.id)
|
||||
expect(cancelled).toBe(true)
|
||||
expect(updatedTask?.status).toBe("cancelled")
|
||||
expect(updatedTask?.completedAt).toBeInstanceOf(Date)
|
||||
expect(updatedTask?.concurrencyKey).toBeUndefined()
|
||||
expect(concurrencyManager.getCount(concurrencyKey)).toBe(0)
|
||||
|
||||
const pendingSet = pendingByParent.get(task.parentSessionID)
|
||||
expect(pendingSet?.has(task.id) ?? false).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe("multiple keys process in parallel", () => {
|
||||
test("should process different concurrency keys in parallel", async () => {
|
||||
// given
|
||||
|
||||
@@ -830,6 +830,72 @@ export class BackgroundManager {
|
||||
}
|
||||
}
|
||||
|
||||
async cancelTask(
|
||||
taskId: string,
|
||||
options?: { source?: string; reason?: string; abortSession?: boolean }
|
||||
): Promise<boolean> {
|
||||
const task = this.tasks.get(taskId)
|
||||
if (!task || (task.status !== "running" && task.status !== "pending")) {
|
||||
return false
|
||||
}
|
||||
|
||||
const source = options?.source ?? "cancel"
|
||||
const abortSession = options?.abortSession !== false
|
||||
const reason = options?.reason
|
||||
|
||||
if (task.status === "pending") {
|
||||
const key = task.model
|
||||
? `${task.model.providerID}/${task.model.modelID}`
|
||||
: task.agent
|
||||
const queue = this.queuesByKey.get(key)
|
||||
if (queue) {
|
||||
const index = queue.findIndex(item => item.task.id === taskId)
|
||||
if (index !== -1) {
|
||||
queue.splice(index, 1)
|
||||
if (queue.length === 0) {
|
||||
this.queuesByKey.delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
log("[background-agent] Cancelled pending task:", { taskId, key })
|
||||
}
|
||||
|
||||
task.status = "cancelled"
|
||||
task.completedAt = new Date()
|
||||
if (reason) {
|
||||
task.error = reason
|
||||
}
|
||||
|
||||
if (task.concurrencyKey) {
|
||||
this.concurrencyManager.release(task.concurrencyKey)
|
||||
task.concurrencyKey = undefined
|
||||
}
|
||||
|
||||
const existingTimer = this.completionTimers.get(task.id)
|
||||
if (existingTimer) {
|
||||
clearTimeout(existingTimer)
|
||||
this.completionTimers.delete(task.id)
|
||||
}
|
||||
|
||||
this.cleanupPendingByParent(task)
|
||||
this.markForNotification(task)
|
||||
|
||||
if (abortSession && task.sessionID) {
|
||||
this.client.session.abort({
|
||||
path: { id: task.sessionID },
|
||||
}).catch(() => {})
|
||||
}
|
||||
|
||||
try {
|
||||
await this.notifyParentSession(task)
|
||||
log(`[background-agent] Task cancelled via ${source}:`, task.id)
|
||||
} catch (err) {
|
||||
log("[background-agent] Error in notifyParentSession for cancelled task:", { taskId: task.id, error: err })
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels a pending task by removing it from queue and marking as cancelled.
|
||||
* Does NOT abort session (no session exists yet) or release concurrency slot (wasn't acquired).
|
||||
@@ -840,29 +906,7 @@ export class BackgroundManager {
|
||||
return false
|
||||
}
|
||||
|
||||
// Find and remove from queue
|
||||
const key = task.model
|
||||
? `${task.model.providerID}/${task.model.modelID}`
|
||||
: task.agent
|
||||
const queue = this.queuesByKey.get(key)
|
||||
if (queue) {
|
||||
const index = queue.findIndex(item => item.task.id === taskId)
|
||||
if (index !== -1) {
|
||||
queue.splice(index, 1)
|
||||
if (queue.length === 0) {
|
||||
this.queuesByKey.delete(key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark as cancelled
|
||||
task.status = "cancelled"
|
||||
task.completedAt = new Date()
|
||||
|
||||
// Clean up pendingByParent
|
||||
this.cleanupPendingByParent(task)
|
||||
|
||||
log("[background-agent] Cancelled pending task:", { taskId, key })
|
||||
void this.cancelTask(taskId, { source: "cancelPendingTask", abortSession: false })
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { createBackgroundOutput } from "./tools"
|
||||
import type { BackgroundTask } from "../../features/background-agent"
|
||||
import { createBackgroundCancel, createBackgroundOutput } from "./tools"
|
||||
import type { BackgroundManager, BackgroundTask } from "../../features/background-agent"
|
||||
import type { ToolContext } from "@opencode-ai/plugin/tool"
|
||||
import type { BackgroundOutputManager, BackgroundOutputClient } from "./tools"
|
||||
import type { BackgroundCancelClient, BackgroundOutputManager, BackgroundOutputClient } from "./tools"
|
||||
|
||||
const projectDir = "/Users/yeongyu/local-workspaces/oh-my-opencode"
|
||||
|
||||
@@ -253,6 +253,82 @@ describe("background_output full_session", () => {
|
||||
expect(output).not.toContain("y".repeat(2100))
|
||||
})
|
||||
})
|
||||
|
||||
describe("background_cancel", () => {
|
||||
test("cancels a running task via manager", async () => {
|
||||
// #given
|
||||
const task = createTask({ status: "running" })
|
||||
const cancelled: string[] = []
|
||||
const manager = {
|
||||
getTask: (id: string) => (id === task.id ? task : undefined),
|
||||
getAllDescendantTasks: () => [task],
|
||||
cancelTask: async (taskId: string) => {
|
||||
cancelled.push(taskId)
|
||||
task.status = "cancelled"
|
||||
return true
|
||||
},
|
||||
} as unknown as BackgroundManager
|
||||
const client = { session: { abort: async () => ({}) } } as BackgroundCancelClient
|
||||
const tool = createBackgroundCancel(manager, client)
|
||||
|
||||
// #when
|
||||
const output = await tool.execute({ taskId: task.id }, mockContext)
|
||||
|
||||
// #then
|
||||
expect(cancelled).toEqual([task.id])
|
||||
expect(output).toContain("Task cancelled successfully")
|
||||
})
|
||||
|
||||
test("cancels all running or pending tasks", async () => {
|
||||
// #given
|
||||
const taskA = createTask({ id: "task-a", status: "running" })
|
||||
const taskB = createTask({ id: "task-b", status: "pending" })
|
||||
const cancelled: string[] = []
|
||||
const manager = {
|
||||
getTask: () => undefined,
|
||||
getAllDescendantTasks: () => [taskA, taskB],
|
||||
cancelTask: async (taskId: string) => {
|
||||
cancelled.push(taskId)
|
||||
const task = taskId === taskA.id ? taskA : taskB
|
||||
task.status = "cancelled"
|
||||
return true
|
||||
},
|
||||
} as unknown as BackgroundManager
|
||||
const client = { session: { abort: async () => ({}) } } as BackgroundCancelClient
|
||||
const tool = createBackgroundCancel(manager, client)
|
||||
|
||||
// #when
|
||||
const output = await tool.execute({ all: true }, mockContext)
|
||||
|
||||
// #then
|
||||
expect(cancelled).toEqual([taskA.id, taskB.id])
|
||||
expect(output).toContain("Cancelled 2 background task(s)")
|
||||
})
|
||||
|
||||
test("preserves original status in cancellation table", async () => {
|
||||
// #given
|
||||
const taskA = createTask({ id: "task-a", status: "running", sessionID: "ses-a", description: "running task" })
|
||||
const taskB = createTask({ id: "task-b", status: "pending", sessionID: undefined, description: "pending task" })
|
||||
const manager = {
|
||||
getTask: () => undefined,
|
||||
getAllDescendantTasks: () => [taskA, taskB],
|
||||
cancelTask: async (taskId: string) => {
|
||||
const task = taskId === taskA.id ? taskA : taskB
|
||||
task.status = "cancelled"
|
||||
return true
|
||||
},
|
||||
} as unknown as BackgroundManager
|
||||
const client = { session: { abort: async () => ({}) } } as BackgroundCancelClient
|
||||
const tool = createBackgroundCancel(manager, client)
|
||||
|
||||
// #when
|
||||
const output = await tool.execute({ all: true }, mockContext)
|
||||
|
||||
// #then
|
||||
expect(output).toContain("| `task-a` | running task | running | `ses-a` |")
|
||||
expect(output).toContain("| `task-b` | pending task | pending | (not started) |")
|
||||
})
|
||||
})
|
||||
type BackgroundOutputMessage = {
|
||||
id?: string
|
||||
info?: { role?: string; time?: string | { created?: number }; agent?: string }
|
||||
|
||||
@@ -638,28 +638,18 @@ export function createBackgroundCancel(manager: BackgroundManager, client: Backg
|
||||
}> = []
|
||||
|
||||
for (const task of cancellableTasks) {
|
||||
if (task.status === "pending") {
|
||||
manager.cancelPendingTask(task.id)
|
||||
cancelledInfo.push({
|
||||
id: task.id,
|
||||
description: task.description,
|
||||
status: "pending",
|
||||
sessionID: undefined,
|
||||
})
|
||||
} else if (task.sessionID) {
|
||||
client.session.abort({
|
||||
path: { id: task.sessionID },
|
||||
}).catch(() => {})
|
||||
|
||||
task.status = "cancelled"
|
||||
task.completedAt = new Date()
|
||||
cancelledInfo.push({
|
||||
id: task.id,
|
||||
description: task.description,
|
||||
status: "running",
|
||||
sessionID: task.sessionID,
|
||||
})
|
||||
}
|
||||
const originalStatus = task.status
|
||||
const cancelled = await manager.cancelTask(task.id, {
|
||||
source: "background_cancel",
|
||||
abortSession: originalStatus === "running",
|
||||
})
|
||||
if (!cancelled) continue
|
||||
cancelledInfo.push({
|
||||
id: task.id,
|
||||
description: task.description,
|
||||
status: originalStatus === "pending" ? "pending" : "running",
|
||||
sessionID: task.sessionID,
|
||||
})
|
||||
}
|
||||
|
||||
const tableRows = cancelledInfo
|
||||
@@ -679,7 +669,7 @@ Continuable sessions:
|
||||
${resumableTasks.map(t => `- \`${t.sessionID}\` (${t.description})`).join("\n")}`
|
||||
: ""
|
||||
|
||||
return `Cancelled ${cancellableTasks.length} background task(s):
|
||||
return `Cancelled ${cancelledInfo.length} background task(s):
|
||||
|
||||
| Task ID | Description | Status | Session ID |
|
||||
|---------|-------------|--------|------------|
|
||||
@@ -697,13 +687,15 @@ ${resumeSection}`
|
||||
Only running or pending tasks can be cancelled.`
|
||||
}
|
||||
|
||||
if (task.status === "pending") {
|
||||
// Pending task: use manager method (no session to abort, no slot to release)
|
||||
const cancelled = manager.cancelPendingTask(task.id)
|
||||
if (!cancelled) {
|
||||
return `[ERROR] Failed to cancel pending task: ${task.id}`
|
||||
}
|
||||
const cancelled = await manager.cancelTask(task.id, {
|
||||
source: "background_cancel",
|
||||
abortSession: task.status === "running",
|
||||
})
|
||||
if (!cancelled) {
|
||||
return `[ERROR] Failed to cancel task: ${task.id}`
|
||||
}
|
||||
|
||||
if (task.status === "pending") {
|
||||
return `Pending task cancelled successfully
|
||||
|
||||
Task ID: ${task.id}
|
||||
@@ -711,18 +703,6 @@ Description: ${task.description}
|
||||
Status: ${task.status}`
|
||||
}
|
||||
|
||||
// Running task: abort session
|
||||
// Fire-and-forget: abort 요청을 보내고 await 하지 않음
|
||||
// await 하면 메인 세션까지 abort 되는 문제 발생
|
||||
if (task.sessionID) {
|
||||
client.session.abort({
|
||||
path: { id: task.sessionID },
|
||||
}).catch(() => {})
|
||||
}
|
||||
|
||||
task.status = "cancelled"
|
||||
task.completedAt = new Date()
|
||||
|
||||
return `Task cancelled successfully
|
||||
|
||||
Task ID: ${task.id}
|
||||
|
||||
Reference in New Issue
Block a user