Merge pull request #1685 from code-yeongyu/fix/run-completion-race-condition
fix: prevent run completion race condition with consecutive stability checks
This commit is contained in:
219
src/cli/run/poll-for-completion.test.ts
Normal file
219
src/cli/run/poll-for-completion.test.ts
Normal file
@@ -0,0 +1,219 @@
|
||||
import { describe, it, expect, mock, spyOn } from "bun:test"
|
||||
import type { RunContext, Todo, ChildSession, SessionStatus } from "./types"
|
||||
import { createEventState } from "./events"
|
||||
import { pollForCompletion } from "./poll-for-completion"
|
||||
|
||||
const createMockContext = (overrides: {
|
||||
todo?: Todo[]
|
||||
childrenBySession?: Record<string, ChildSession[]>
|
||||
statuses?: Record<string, SessionStatus>
|
||||
} = {}): RunContext => {
|
||||
const {
|
||||
todo = [],
|
||||
childrenBySession = { "test-session": [] },
|
||||
statuses = {},
|
||||
} = overrides
|
||||
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
todo: mock(() => Promise.resolve({ data: todo })),
|
||||
children: mock((opts: { path: { id: string } }) =>
|
||||
Promise.resolve({ data: childrenBySession[opts.path.id] ?? [] })
|
||||
),
|
||||
status: mock(() => Promise.resolve({ data: statuses })),
|
||||
},
|
||||
} as unknown as RunContext["client"],
|
||||
sessionID: "test-session",
|
||||
directory: "/test",
|
||||
abortController: new AbortController(),
|
||||
}
|
||||
}
|
||||
|
||||
describe("pollForCompletion", () => {
|
||||
it("requires consecutive stability checks before exiting - not immediate", async () => {
|
||||
//#given - 0 todos, 0 children, session idle, meaningful work done
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.hasReceivedMeaningfulWork = true
|
||||
const abortController = new AbortController()
|
||||
|
||||
//#when
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then - exits with 0 but only after 3 consecutive checks
|
||||
expect(result).toBe(0)
|
||||
const todoCallCount = (ctx.client.session.todo as ReturnType<typeof mock>).mock.calls.length
|
||||
expect(todoCallCount).toBeGreaterThanOrEqual(3)
|
||||
})
|
||||
|
||||
it("does not exit when currentTool is set - resets consecutive counter", async () => {
|
||||
//#given
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.hasReceivedMeaningfulWork = true
|
||||
eventState.currentTool = "task"
|
||||
const abortController = new AbortController()
|
||||
|
||||
//#when - abort after enough time to verify it didn't exit
|
||||
setTimeout(() => abortController.abort(), 100)
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then - should be aborted, not completed (tool blocked exit)
|
||||
expect(result).toBe(130)
|
||||
const todoCallCount = (ctx.client.session.todo as ReturnType<typeof mock>).mock.calls.length
|
||||
expect(todoCallCount).toBe(0)
|
||||
})
|
||||
|
||||
it("resets consecutive counter when session becomes busy between checks", async () => {
|
||||
//#given
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.hasReceivedMeaningfulWork = true
|
||||
const abortController = new AbortController()
|
||||
let todoCallCount = 0
|
||||
let busyInserted = false
|
||||
|
||||
;(ctx.client.session as any).todo = mock(async () => {
|
||||
todoCallCount++
|
||||
if (todoCallCount === 1 && !busyInserted) {
|
||||
busyInserted = true
|
||||
eventState.mainSessionIdle = false
|
||||
setTimeout(() => { eventState.mainSessionIdle = true }, 15)
|
||||
}
|
||||
return { data: [] }
|
||||
})
|
||||
;(ctx.client.session as any).children = mock(() =>
|
||||
Promise.resolve({ data: [] })
|
||||
)
|
||||
;(ctx.client.session as any).status = mock(() =>
|
||||
Promise.resolve({ data: {} })
|
||||
)
|
||||
|
||||
//#when
|
||||
const startMs = Date.now()
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
const elapsedMs = Date.now() - startMs
|
||||
|
||||
//#then - took longer than 3 polls because busy interrupted the streak
|
||||
expect(result).toBe(0)
|
||||
expect(elapsedMs).toBeGreaterThan(30)
|
||||
})
|
||||
|
||||
it("returns 1 on session error", async () => {
|
||||
//#given
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.mainSessionError = true
|
||||
eventState.lastError = "Test error"
|
||||
const abortController = new AbortController()
|
||||
|
||||
//#when
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(result).toBe(1)
|
||||
})
|
||||
|
||||
it("returns 130 when aborted", async () => {
|
||||
//#given
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
const abortController = new AbortController()
|
||||
|
||||
//#when
|
||||
setTimeout(() => abortController.abort(), 50)
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(result).toBe(130)
|
||||
})
|
||||
|
||||
it("does not check completion when hasReceivedMeaningfulWork is false", async () => {
|
||||
//#given
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.hasReceivedMeaningfulWork = false
|
||||
const abortController = new AbortController()
|
||||
|
||||
//#when
|
||||
setTimeout(() => abortController.abort(), 100)
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(result).toBe(130)
|
||||
const todoCallCount = (ctx.client.session.todo as ReturnType<typeof mock>).mock.calls.length
|
||||
expect(todoCallCount).toBe(0)
|
||||
})
|
||||
|
||||
it("simulates race condition: brief idle with 0 todos does not cause immediate exit", async () => {
|
||||
//#given - simulate Sisyphus outputting text, session goes idle briefly, then tool fires
|
||||
spyOn(console, "log").mockImplementation(() => {})
|
||||
spyOn(console, "error").mockImplementation(() => {})
|
||||
const ctx = createMockContext()
|
||||
const eventState = createEventState()
|
||||
eventState.mainSessionIdle = true
|
||||
eventState.hasReceivedMeaningfulWork = true
|
||||
const abortController = new AbortController()
|
||||
let pollTick = 0
|
||||
|
||||
;(ctx.client.session as any).todo = mock(async () => {
|
||||
pollTick++
|
||||
if (pollTick === 2) {
|
||||
eventState.currentTool = "task"
|
||||
}
|
||||
return { data: [] }
|
||||
})
|
||||
;(ctx.client.session as any).children = mock(() =>
|
||||
Promise.resolve({ data: [] })
|
||||
)
|
||||
;(ctx.client.session as any).status = mock(() =>
|
||||
Promise.resolve({ data: {} })
|
||||
)
|
||||
|
||||
//#when - abort after tool stays in-flight
|
||||
setTimeout(() => abortController.abort(), 200)
|
||||
const result = await pollForCompletion(ctx, eventState, abortController, {
|
||||
pollIntervalMs: 10,
|
||||
requiredConsecutive: 3,
|
||||
})
|
||||
|
||||
//#then - should NOT have exited with 0 (tool blocked it, then aborted)
|
||||
expect(result).toBe(130)
|
||||
})
|
||||
})
|
||||
66
src/cli/run/poll-for-completion.ts
Normal file
66
src/cli/run/poll-for-completion.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import pc from "picocolors"
|
||||
import type { RunContext } from "./types"
|
||||
import type { EventState } from "./events"
|
||||
import { checkCompletionConditions } from "./completion"
|
||||
|
||||
const DEFAULT_POLL_INTERVAL_MS = 500
|
||||
const DEFAULT_REQUIRED_CONSECUTIVE = 3
|
||||
|
||||
export interface PollOptions {
|
||||
pollIntervalMs?: number
|
||||
requiredConsecutive?: number
|
||||
}
|
||||
|
||||
export async function pollForCompletion(
|
||||
ctx: RunContext,
|
||||
eventState: EventState,
|
||||
abortController: AbortController,
|
||||
options: PollOptions = {}
|
||||
): Promise<number> {
|
||||
const pollIntervalMs = options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS
|
||||
const requiredConsecutive =
|
||||
options.requiredConsecutive ?? DEFAULT_REQUIRED_CONSECUTIVE
|
||||
let consecutiveCompleteChecks = 0
|
||||
|
||||
while (!abortController.signal.aborted) {
|
||||
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs))
|
||||
|
||||
if (!eventState.mainSessionIdle) {
|
||||
consecutiveCompleteChecks = 0
|
||||
continue
|
||||
}
|
||||
|
||||
if (eventState.currentTool !== null) {
|
||||
consecutiveCompleteChecks = 0
|
||||
continue
|
||||
}
|
||||
|
||||
if (eventState.mainSessionError) {
|
||||
console.error(
|
||||
pc.red(`\n\nSession ended with error: ${eventState.lastError}`)
|
||||
)
|
||||
console.error(
|
||||
pc.yellow("Check if todos were completed before the error.")
|
||||
)
|
||||
return 1
|
||||
}
|
||||
|
||||
if (!eventState.hasReceivedMeaningfulWork) {
|
||||
consecutiveCompleteChecks = 0
|
||||
continue
|
||||
}
|
||||
|
||||
const shouldExit = await checkCompletionConditions(ctx)
|
||||
if (shouldExit) {
|
||||
consecutiveCompleteChecks++
|
||||
if (consecutiveCompleteChecks >= requiredConsecutive) {
|
||||
console.log(pc.green("\n\nAll tasks completed."))
|
||||
return 0
|
||||
}
|
||||
} else {
|
||||
consecutiveCompleteChecks = 0
|
||||
}
|
||||
}
|
||||
|
||||
return 130
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
import pc from "picocolors"
|
||||
import type { RunOptions, RunContext } from "./types"
|
||||
import { checkCompletionConditions } from "./completion"
|
||||
import { createEventState, processEvents, serializeError } from "./events"
|
||||
import { loadPluginConfig } from "../../plugin-config"
|
||||
import { createServerConnection } from "./server-connection"
|
||||
@@ -8,10 +7,10 @@ import { resolveSession } from "./session-resolver"
|
||||
import { createJsonOutputManager } from "./json-output"
|
||||
import { executeOnCompleteHook } from "./on-complete-hook"
|
||||
import { resolveRunAgent } from "./agent-resolver"
|
||||
import { pollForCompletion } from "./poll-for-completion"
|
||||
|
||||
export { resolveRunAgent }
|
||||
|
||||
const POLL_INTERVAL_MS = 500
|
||||
const DEFAULT_TIMEOUT_MS = 0
|
||||
|
||||
export async function run(options: RunOptions): Promise<number> {
|
||||
@@ -124,30 +123,4 @@ export async function run(options: RunOptions): Promise<number> {
|
||||
}
|
||||
}
|
||||
|
||||
async function pollForCompletion(
|
||||
ctx: RunContext,
|
||||
eventState: ReturnType<typeof createEventState>,
|
||||
abortController: AbortController
|
||||
): Promise<number> {
|
||||
while (!abortController.signal.aborted) {
|
||||
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
|
||||
|
||||
if (!eventState.mainSessionIdle) continue
|
||||
|
||||
if (eventState.mainSessionError) {
|
||||
console.error(pc.red(`\n\nSession ended with error: ${eventState.lastError}`))
|
||||
console.error(pc.yellow("Check if todos were completed before the error."))
|
||||
return 1
|
||||
}
|
||||
|
||||
if (!eventState.hasReceivedMeaningfulWork) continue
|
||||
|
||||
const shouldExit = await checkCompletionConditions(ctx)
|
||||
if (shouldExit) {
|
||||
console.log(pc.green("\n\nAll tasks completed."))
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
return 130
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user