fix(compaction): add timeout and ensure cleanup to prevent indefinite hangs on rate limit
Closes #2062
This commit is contained in:
@@ -6,7 +6,7 @@ export function getOrCreateRetryState(
|
||||
): RetryState {
|
||||
let state = autoCompactState.retryStateBySession.get(sessionID)
|
||||
if (!state) {
|
||||
state = { attempt: 0, lastAttemptTime: 0 }
|
||||
state = { attempt: 0, lastAttemptTime: 0, firstAttemptTime: 0 }
|
||||
autoCompactState.retryStateBySession.set(sessionID, state)
|
||||
}
|
||||
return state
|
||||
|
||||
@@ -0,0 +1,122 @@
|
||||
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
|
||||
import { runSummarizeRetryStrategy } from "./summarize-retry-strategy"
|
||||
import type { AutoCompactState, ParsedTokenLimitError, RetryState } from "./types"
|
||||
import type { OhMyOpenCodeConfig } from "../../config"
|
||||
|
||||
type TimeoutCall = {
|
||||
delay: number
|
||||
}
|
||||
|
||||
function createAutoCompactState(): AutoCompactState {
|
||||
return {
|
||||
pendingCompact: new Set<string>(),
|
||||
errorDataBySession: new Map<string, ParsedTokenLimitError>(),
|
||||
retryStateBySession: new Map<string, RetryState>(),
|
||||
truncateStateBySession: new Map(),
|
||||
emptyContentAttemptBySession: new Map(),
|
||||
compactionInProgress: new Set<string>(),
|
||||
}
|
||||
}
|
||||
|
||||
describe("runSummarizeRetryStrategy", () => {
|
||||
const sessionID = "ses_retry_timeout"
|
||||
const directory = "/tmp"
|
||||
let autoCompactState: AutoCompactState
|
||||
|
||||
const summarizeMock = mock(() => Promise.resolve())
|
||||
const showToastMock = mock(() => Promise.resolve())
|
||||
const client = {
|
||||
session: {
|
||||
summarize: summarizeMock,
|
||||
messages: mock(() => Promise.resolve({ data: [] })),
|
||||
promptAsync: mock(() => Promise.resolve()),
|
||||
revert: mock(() => Promise.resolve()),
|
||||
},
|
||||
tui: {
|
||||
showToast: showToastMock,
|
||||
},
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
autoCompactState = createAutoCompactState()
|
||||
summarizeMock.mockReset()
|
||||
showToastMock.mockReset()
|
||||
summarizeMock.mockResolvedValue(undefined)
|
||||
showToastMock.mockResolvedValue(undefined)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
globalThis.setTimeout = originalSetTimeout
|
||||
})
|
||||
|
||||
const originalSetTimeout = globalThis.setTimeout
|
||||
|
||||
test("stops retries when total summarize timeout is exceeded", async () => {
|
||||
//#given
|
||||
autoCompactState.pendingCompact.add(sessionID)
|
||||
autoCompactState.errorDataBySession.set(sessionID, {
|
||||
currentTokens: 250000,
|
||||
maxTokens: 200000,
|
||||
errorType: "token_limit_exceeded",
|
||||
})
|
||||
autoCompactState.retryStateBySession.set(sessionID, {
|
||||
attempt: 1,
|
||||
lastAttemptTime: Date.now(),
|
||||
firstAttemptTime: Date.now() - 130000,
|
||||
})
|
||||
|
||||
//#when
|
||||
await runSummarizeRetryStrategy({
|
||||
sessionID,
|
||||
msg: { providerID: "anthropic", modelID: "claude-sonnet-4-6" },
|
||||
autoCompactState,
|
||||
client: client as never,
|
||||
directory,
|
||||
pluginConfig: {} as OhMyOpenCodeConfig,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(summarizeMock).not.toHaveBeenCalled()
|
||||
expect(autoCompactState.pendingCompact.has(sessionID)).toBe(false)
|
||||
expect(autoCompactState.errorDataBySession.has(sessionID)).toBe(false)
|
||||
expect(autoCompactState.retryStateBySession.has(sessionID)).toBe(false)
|
||||
expect(showToastMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
body: expect.objectContaining({
|
||||
title: "Auto Compact Timed Out",
|
||||
}),
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
test("caps retry delay by remaining total timeout window", async () => {
|
||||
//#given
|
||||
const timeoutCalls: TimeoutCall[] = []
|
||||
globalThis.setTimeout = ((_: (...args: unknown[]) => void, delay?: number) => {
|
||||
timeoutCalls.push({ delay: delay ?? 0 })
|
||||
return 1 as unknown as ReturnType<typeof setTimeout>
|
||||
}) as typeof setTimeout
|
||||
|
||||
autoCompactState.retryStateBySession.set(sessionID, {
|
||||
attempt: 1,
|
||||
lastAttemptTime: Date.now(),
|
||||
firstAttemptTime: Date.now() - 119700,
|
||||
})
|
||||
summarizeMock.mockRejectedValueOnce(new Error("rate limited"))
|
||||
|
||||
//#when
|
||||
await runSummarizeRetryStrategy({
|
||||
sessionID,
|
||||
msg: { providerID: "anthropic", modelID: "claude-sonnet-4-6" },
|
||||
autoCompactState,
|
||||
client: client as never,
|
||||
directory,
|
||||
pluginConfig: {} as OhMyOpenCodeConfig,
|
||||
})
|
||||
|
||||
//#then
|
||||
expect(timeoutCalls.length).toBe(1)
|
||||
expect(timeoutCalls[0]!.delay).toBeGreaterThan(0)
|
||||
expect(timeoutCalls[0]!.delay).toBeLessThanOrEqual(500)
|
||||
})
|
||||
})
|
||||
@@ -7,6 +7,8 @@ import { sanitizeEmptyMessagesBeforeSummarize } from "./message-builder"
|
||||
import { fixEmptyMessages } from "./empty-content-recovery"
|
||||
|
||||
import { resolveCompactionModel } from "../shared/compaction-model-resolver"
|
||||
|
||||
const SUMMARIZE_RETRY_TOTAL_TIMEOUT_MS = 120_000
|
||||
export async function runSummarizeRetryStrategy(params: {
|
||||
sessionID: string
|
||||
msg: Record<string, unknown>
|
||||
@@ -18,6 +20,27 @@ export async function runSummarizeRetryStrategy(params: {
|
||||
messageIndex?: number
|
||||
}): Promise<void> {
|
||||
const retryState = getOrCreateRetryState(params.autoCompactState, params.sessionID)
|
||||
const now = Date.now()
|
||||
|
||||
if (retryState.firstAttemptTime === 0) {
|
||||
retryState.firstAttemptTime = now
|
||||
}
|
||||
|
||||
const elapsedTimeMs = now - retryState.firstAttemptTime
|
||||
if (elapsedTimeMs >= SUMMARIZE_RETRY_TOTAL_TIMEOUT_MS) {
|
||||
clearSessionState(params.autoCompactState, params.sessionID)
|
||||
await params.client.tui
|
||||
.showToast({
|
||||
body: {
|
||||
title: "Auto Compact Timed Out",
|
||||
message: "Compaction retries exceeded the timeout window. Please start a new session.",
|
||||
variant: "error",
|
||||
duration: 5000,
|
||||
},
|
||||
})
|
||||
.catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
if (params.errorType?.includes("non-empty content")) {
|
||||
const attempt = getEmptyContentAttempt(params.autoCompactState, params.sessionID)
|
||||
@@ -52,6 +75,7 @@ export async function runSummarizeRetryStrategy(params: {
|
||||
|
||||
if (Date.now() - retryState.lastAttemptTime > 300000) {
|
||||
retryState.attempt = 0
|
||||
retryState.firstAttemptTime = Date.now()
|
||||
params.autoCompactState.truncateStateBySession.delete(params.sessionID)
|
||||
}
|
||||
|
||||
@@ -92,10 +116,26 @@ export async function runSummarizeRetryStrategy(params: {
|
||||
})
|
||||
return
|
||||
} catch {
|
||||
const remainingTimeMs = SUMMARIZE_RETRY_TOTAL_TIMEOUT_MS - (Date.now() - retryState.firstAttemptTime)
|
||||
if (remainingTimeMs <= 0) {
|
||||
clearSessionState(params.autoCompactState, params.sessionID)
|
||||
await params.client.tui
|
||||
.showToast({
|
||||
body: {
|
||||
title: "Auto Compact Timed Out",
|
||||
message: "Compaction retries exceeded the timeout window. Please start a new session.",
|
||||
variant: "error",
|
||||
duration: 5000,
|
||||
},
|
||||
})
|
||||
.catch(() => {})
|
||||
return
|
||||
}
|
||||
|
||||
const delay =
|
||||
RETRY_CONFIG.initialDelayMs *
|
||||
Math.pow(RETRY_CONFIG.backoffFactor, retryState.attempt - 1)
|
||||
const cappedDelay = Math.min(delay, RETRY_CONFIG.maxDelayMs)
|
||||
const cappedDelay = Math.min(delay, RETRY_CONFIG.maxDelayMs, remainingTimeMs)
|
||||
|
||||
setTimeout(() => {
|
||||
void runSummarizeRetryStrategy(params)
|
||||
|
||||
@@ -11,6 +11,7 @@ export interface ParsedTokenLimitError {
|
||||
export interface RetryState {
|
||||
attempt: number
|
||||
lastAttemptTime: number
|
||||
firstAttemptTime: number
|
||||
}
|
||||
|
||||
export interface TruncateState {
|
||||
|
||||
@@ -45,6 +45,23 @@ function createMockCtx() {
|
||||
}
|
||||
}
|
||||
|
||||
function setupImmediateTimeouts(): () => void {
|
||||
const originalSetTimeout = globalThis.setTimeout
|
||||
const originalClearTimeout = globalThis.clearTimeout
|
||||
|
||||
globalThis.setTimeout = ((callback: (...args: unknown[]) => void, _delay?: number, ...args: unknown[]) => {
|
||||
callback(...args)
|
||||
return 1 as unknown as ReturnType<typeof setTimeout>
|
||||
}) as typeof setTimeout
|
||||
|
||||
globalThis.clearTimeout = (() => {}) as typeof clearTimeout
|
||||
|
||||
return () => {
|
||||
globalThis.setTimeout = originalSetTimeout
|
||||
globalThis.clearTimeout = originalClearTimeout
|
||||
}
|
||||
}
|
||||
|
||||
describe("preemptive-compaction", () => {
|
||||
let ctx: ReturnType<typeof createMockCtx>
|
||||
|
||||
@@ -63,7 +80,7 @@ describe("preemptive-compaction", () => {
|
||||
// #when tool.execute.after is called
|
||||
// #then session.messages() should NOT be called
|
||||
it("should use cached token info instead of fetching session.messages()", async () => {
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_test1"
|
||||
|
||||
// Simulate message.updated with token info below threshold
|
||||
@@ -101,7 +118,7 @@ describe("preemptive-compaction", () => {
|
||||
// #when tool.execute.after is called
|
||||
// #then should skip without fetching
|
||||
it("should skip gracefully when no cached token info exists", async () => {
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
|
||||
const output = { title: "", output: "test", metadata: null }
|
||||
await hook["tool.execute.after"](
|
||||
@@ -116,7 +133,7 @@ describe("preemptive-compaction", () => {
|
||||
// #when tool.execute.after runs
|
||||
// #then should trigger summarize
|
||||
it("should trigger compaction when usage exceeds threshold", async () => {
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_high"
|
||||
|
||||
// 170K input + 10K cache = 180K → 90% of 200K
|
||||
@@ -153,7 +170,7 @@ describe("preemptive-compaction", () => {
|
||||
|
||||
it("should trigger compaction for google-vertex-anthropic provider", async () => {
|
||||
//#given google-vertex-anthropic usage above threshold
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_vertex_anthropic_high"
|
||||
|
||||
await hook.event({
|
||||
@@ -191,7 +208,7 @@ describe("preemptive-compaction", () => {
|
||||
// #given session deleted
|
||||
// #then cache should be cleaned up
|
||||
it("should clean up cache on session.deleted", async () => {
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_del"
|
||||
|
||||
await hook.event({
|
||||
@@ -228,7 +245,7 @@ describe("preemptive-compaction", () => {
|
||||
|
||||
it("should log summarize errors instead of swallowing them", async () => {
|
||||
//#given
|
||||
const hook = createPreemptiveCompactionHook(ctx as never)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_log_error"
|
||||
const summarizeError = new Error("summarize failed")
|
||||
ctx.client.session.summarize.mockRejectedValueOnce(summarizeError)
|
||||
@@ -343,4 +360,58 @@ describe("preemptive-compaction", () => {
|
||||
//#then
|
||||
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it("should clear in-progress lock when summarize times out", async () => {
|
||||
//#given
|
||||
const restoreTimeouts = setupImmediateTimeouts()
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_timeout"
|
||||
|
||||
ctx.client.session.summarize
|
||||
.mockImplementationOnce(() => new Promise(() => {}))
|
||||
.mockResolvedValueOnce({})
|
||||
|
||||
try {
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "message.updated",
|
||||
properties: {
|
||||
info: {
|
||||
role: "assistant",
|
||||
sessionID,
|
||||
providerID: "anthropic",
|
||||
modelID: "claude-sonnet-4-6",
|
||||
finish: true,
|
||||
tokens: {
|
||||
input: 170000,
|
||||
output: 0,
|
||||
reasoning: 0,
|
||||
cache: { read: 10000, write: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
//#when
|
||||
await hook["tool.execute.after"](
|
||||
{ tool: "bash", sessionID, callID: "call_timeout_1" },
|
||||
{ title: "", output: "test", metadata: null },
|
||||
)
|
||||
|
||||
await hook["tool.execute.after"](
|
||||
{ tool: "bash", sessionID, callID: "call_timeout_2" },
|
||||
{ title: "", output: "test", metadata: null },
|
||||
)
|
||||
|
||||
//#then
|
||||
expect(ctx.client.session.summarize).toHaveBeenCalledTimes(2)
|
||||
expect(logMock).toHaveBeenCalledWith("[preemptive-compaction] Compaction failed", {
|
||||
sessionID,
|
||||
error: expect.stringContaining("Compaction summarize timed out"),
|
||||
})
|
||||
} finally {
|
||||
restoreTimeouts()
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
@@ -3,6 +3,7 @@ import type { OhMyOpenCodeConfig } from "../config"
|
||||
|
||||
import { resolveCompactionModel } from "./shared/compaction-model-resolver"
|
||||
const DEFAULT_ACTUAL_LIMIT = 200_000
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
|
||||
type ModelCacheStateLike = {
|
||||
anthropicContext1MEnabled: boolean
|
||||
@@ -31,6 +32,26 @@ interface CachedCompactionState {
|
||||
tokens: TokenInfo
|
||||
}
|
||||
|
||||
function withTimeout<TValue>(
|
||||
promise: Promise<TValue>,
|
||||
timeoutMs: number,
|
||||
errorMessage: string,
|
||||
): Promise<TValue> {
|
||||
let timeoutID: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timeoutID = setTimeout(() => {
|
||||
reject(new Error(errorMessage))
|
||||
}, timeoutMs)
|
||||
})
|
||||
|
||||
return Promise.race([promise, timeoutPromise]).finally(() => {
|
||||
if (timeoutID !== undefined) {
|
||||
clearTimeout(timeoutID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function isAnthropicProvider(providerID: string): boolean {
|
||||
return providerID === "anthropic" || providerID === "google-vertex-anthropic"
|
||||
}
|
||||
@@ -94,11 +115,15 @@ export function createPreemptiveCompactionHook(
|
||||
modelID
|
||||
)
|
||||
|
||||
await ctx.client.session.summarize({
|
||||
path: { id: sessionID },
|
||||
body: { providerID: targetProviderID, modelID: targetModelID, auto: true } as never,
|
||||
query: { directory: ctx.directory },
|
||||
})
|
||||
await withTimeout(
|
||||
ctx.client.session.summarize({
|
||||
path: { id: sessionID },
|
||||
body: { providerID: targetProviderID, modelID: targetModelID, auto: true } as never,
|
||||
query: { directory: ctx.directory },
|
||||
}),
|
||||
PREEMPTIVE_COMPACTION_TIMEOUT_MS,
|
||||
`Compaction summarize timed out after ${PREEMPTIVE_COMPACTION_TIMEOUT_MS}ms`,
|
||||
)
|
||||
|
||||
compactedSessions.add(sessionID)
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user