fix(cli-run): bounded shutdown wait for event stream processor
Prevents Run CLI from hanging indefinitely when the event stream fails to close after abort. Fixes #1825 Co-authored-by: cloudwaddie-agent <cloudwaddie-agent@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
import { describe, it, expect } from "bun:test"
|
/// <reference types="bun-types" />
|
||||||
|
|
||||||
|
import { describe, it, expect, spyOn, afterEach } from "bun:test"
|
||||||
import type { OhMyOpenCodeConfig } from "../../config"
|
import type { OhMyOpenCodeConfig } from "../../config"
|
||||||
import { resolveRunAgent } from "./runner"
|
import { resolveRunAgent, waitForEventProcessorShutdown } from "./runner"
|
||||||
|
|
||||||
const createConfig = (overrides: Partial<OhMyOpenCodeConfig> = {}): OhMyOpenCodeConfig => ({
|
const createConfig = (overrides: Partial<OhMyOpenCodeConfig> = {}): OhMyOpenCodeConfig => ({
|
||||||
...overrides,
|
...overrides,
|
||||||
@@ -68,3 +70,58 @@ describe("resolveRunAgent", () => {
|
|||||||
expect(agent).toBe("hephaestus")
|
expect(agent).toBe("hephaestus")
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
describe("waitForEventProcessorShutdown", () => {
|
||||||
|
let consoleLogSpy: ReturnType<typeof spyOn<typeof console, "log">> | null = null
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
if (consoleLogSpy) {
|
||||||
|
consoleLogSpy.mockRestore()
|
||||||
|
consoleLogSpy = null
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
it("returns quickly when event processor completes", async () => {
|
||||||
|
//#given
|
||||||
|
const eventProcessor = new Promise<void>((resolve) => {
|
||||||
|
setTimeout(() => {
|
||||||
|
resolve()
|
||||||
|
}, 25)
|
||||||
|
})
|
||||||
|
consoleLogSpy = spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
const start = performance.now()
|
||||||
|
|
||||||
|
//#when
|
||||||
|
await waitForEventProcessorShutdown(eventProcessor, 200)
|
||||||
|
|
||||||
|
//#then
|
||||||
|
const elapsed = performance.now() - start
|
||||||
|
expect(elapsed).toBeLessThan(200)
|
||||||
|
expect(console.log).not.toHaveBeenCalledWith(
|
||||||
|
"[run] Event stream did not close within 200ms after abort; continuing shutdown.",
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("times out and continues when event processor does not complete", async () => {
|
||||||
|
//#given
|
||||||
|
const eventProcessor = new Promise<void>(() => {})
|
||||||
|
const spy = spyOn(console, "log").mockImplementation(() => {})
|
||||||
|
consoleLogSpy = spy
|
||||||
|
const timeoutMs = 50
|
||||||
|
const start = performance.now()
|
||||||
|
|
||||||
|
try {
|
||||||
|
//#when
|
||||||
|
await waitForEventProcessorShutdown(eventProcessor, timeoutMs)
|
||||||
|
|
||||||
|
//#then
|
||||||
|
const elapsed = performance.now() - start
|
||||||
|
expect(elapsed).toBeGreaterThanOrEqual(timeoutMs)
|
||||||
|
expect(spy).toHaveBeenCalledWith(
|
||||||
|
`[run] Event stream did not close within ${timeoutMs}ms after abort; continuing shutdown.`,
|
||||||
|
)
|
||||||
|
} finally {
|
||||||
|
spy.mockRestore()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@@ -12,6 +12,25 @@ import { pollForCompletion } from "./poll-for-completion"
|
|||||||
export { resolveRunAgent }
|
export { resolveRunAgent }
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT_MS = 600_000
|
const DEFAULT_TIMEOUT_MS = 600_000
|
||||||
|
const EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS = 2_000
|
||||||
|
|
||||||
|
export async function waitForEventProcessorShutdown(
|
||||||
|
eventProcessor: Promise<void>,
|
||||||
|
timeoutMs = EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS,
|
||||||
|
): Promise<void> {
|
||||||
|
const completed = await Promise.race([
|
||||||
|
eventProcessor.then(() => true),
|
||||||
|
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), timeoutMs)),
|
||||||
|
])
|
||||||
|
|
||||||
|
if (!completed) {
|
||||||
|
console.log(
|
||||||
|
pc.dim(
|
||||||
|
`[run] Event stream did not close within ${timeoutMs}ms after abort; continuing shutdown.`,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function run(options: RunOptions): Promise<number> {
|
export async function run(options: RunOptions): Promise<number> {
|
||||||
process.env.OPENCODE_CLI_RUN_MODE = "true"
|
process.env.OPENCODE_CLI_RUN_MODE = "true"
|
||||||
@@ -81,14 +100,14 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
query: { directory },
|
query: { directory },
|
||||||
})
|
})
|
||||||
|
|
||||||
console.log(pc.dim("Waiting for completion...\n"))
|
console.log(pc.dim("Waiting for completion...\n"))
|
||||||
const exitCode = await pollForCompletion(ctx, eventState, abortController)
|
const exitCode = await pollForCompletion(ctx, eventState, abortController)
|
||||||
|
|
||||||
// Abort the event stream to stop the processor
|
// Abort the event stream to stop the processor
|
||||||
abortController.abort()
|
abortController.abort()
|
||||||
|
|
||||||
await eventProcessor
|
await waitForEventProcessorShutdown(eventProcessor)
|
||||||
cleanup()
|
cleanup()
|
||||||
|
|
||||||
const durationMs = Date.now() - startTime
|
const durationMs = Date.now() - startTime
|
||||||
|
|
||||||
@@ -127,4 +146,3 @@ export async function run(options: RunOptions): Promise<number> {
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user