refactor(background-agent): split manager.ts into focused modules
Extract 30+ single-responsibility modules from manager.ts (1556 LOC): - task lifecycle: task-starter, task-completer, task-canceller, task-resumer - task queries: task-queries, task-poller, task-queue-processor - notifications: notification-builder, notification-tracker, parent-session-notifier - session handling: session-validator, session-output-validator, session-todo-checker - spawner: spawner/ directory with focused spawn modules - utilities: duration-formatter, error-classifier, message-storage-locator - result handling: result-handler-context, background-task-completer - shutdown: background-manager-shutdown, process-signal
This commit is contained in:
52
src/features/background-agent/task-queue-processor.ts
Normal file
52
src/features/background-agent/task-queue-processor.ts
Normal file
@@ -0,0 +1,52 @@
|
||||
import { log } from "../../shared"
|
||||
|
||||
import type { BackgroundTask } from "./types"
|
||||
import type { ConcurrencyManager } from "./concurrency"
|
||||
|
||||
type QueueItem = {
|
||||
task: BackgroundTask
|
||||
input: import("./types").LaunchInput
|
||||
}
|
||||
|
||||
export async function processConcurrencyKeyQueue(args: {
|
||||
key: string
|
||||
queuesByKey: Map<string, QueueItem[]>
|
||||
processingKeys: Set<string>
|
||||
concurrencyManager: ConcurrencyManager
|
||||
startTask: (item: QueueItem) => Promise<void>
|
||||
}): Promise<void> {
|
||||
const { key, queuesByKey, processingKeys, concurrencyManager, startTask } = args
|
||||
|
||||
if (processingKeys.has(key)) return
|
||||
processingKeys.add(key)
|
||||
|
||||
try {
|
||||
const queue = queuesByKey.get(key)
|
||||
while (queue && queue.length > 0) {
|
||||
const item = queue[0]
|
||||
|
||||
await concurrencyManager.acquire(key)
|
||||
|
||||
if (item.task.status === "cancelled") {
|
||||
concurrencyManager.release(key)
|
||||
queue.shift()
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
await startTask(item)
|
||||
} catch (error) {
|
||||
log("[background-agent] Error starting task:", error)
|
||||
// Release concurrency slot if startTask failed and didn't release it itself
|
||||
// This prevents slot leaks when errors occur after acquire but before task.concurrencyKey is set
|
||||
if (!item.task.concurrencyKey) {
|
||||
concurrencyManager.release(key)
|
||||
}
|
||||
}
|
||||
|
||||
queue.shift()
|
||||
}
|
||||
} finally {
|
||||
processingKeys.delete(key)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user