Merge pull request #2802 from code-yeongyu/fix/b1-preemptive-compaction-epoch-guard
fix: handle repeated compaction epochs in continuation guard
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { beforeEach, describe, expect, it, mock } from "bun:test"
|
||||
|
||||
const logMock = mock(() => {})
|
||||
@@ -8,11 +10,19 @@ mock.module("../shared/logger", () => ({
|
||||
|
||||
const { createPreemptiveCompactionHook } = await import("./preemptive-compaction")
|
||||
|
||||
function createMockCtx() {
|
||||
type AssistantHistoryMessage = {
|
||||
info: {
|
||||
id: string
|
||||
role: "assistant"
|
||||
}
|
||||
parts: Array<{ type: string; text?: string }>
|
||||
}
|
||||
|
||||
function createMockCtx(sessionHistory: AssistantHistoryMessage[]) {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
messages: mock(() => Promise.resolve({ data: [] })),
|
||||
messages: mock(() => Promise.resolve({ data: sessionHistory })),
|
||||
summarize: mock(() => Promise.resolve({})),
|
||||
},
|
||||
tui: {
|
||||
@@ -23,6 +33,22 @@ function createMockCtx() {
|
||||
}
|
||||
}
|
||||
|
||||
function appendAssistantHistory(
|
||||
sessionHistory: AssistantHistoryMessage[],
|
||||
input: {
|
||||
id: string
|
||||
parts: AssistantHistoryMessage["parts"]
|
||||
},
|
||||
): void {
|
||||
sessionHistory.push({
|
||||
info: {
|
||||
id: input.id,
|
||||
role: "assistant",
|
||||
},
|
||||
parts: input.parts,
|
||||
})
|
||||
}
|
||||
|
||||
function buildAssistantUpdate(input: {
|
||||
sessionID: string
|
||||
id: string
|
||||
@@ -69,7 +95,9 @@ describe("preemptive-compaction post-compaction degradation monitor", () => {
|
||||
})
|
||||
|
||||
it("triggers recovery summarize after three consecutive no-text tail messages", async () => {
|
||||
const ctx = createMockCtx()
|
||||
// given
|
||||
const sessionHistory: AssistantHistoryMessage[] = []
|
||||
const ctx = createMockCtx(sessionHistory)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_tail_recovery"
|
||||
|
||||
@@ -82,10 +110,17 @@ describe("preemptive-compaction post-compaction degradation monitor", () => {
|
||||
|
||||
const stepOnlyParts = [{ type: "step-start" }, { type: "step-finish" }]
|
||||
|
||||
// when
|
||||
appendAssistantHistory(sessionHistory, { id: "msg_1", parts: stepOnlyParts })
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_1", parts: stepOnlyParts }))
|
||||
|
||||
appendAssistantHistory(sessionHistory, { id: "msg_2", parts: stepOnlyParts })
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_2", parts: stepOnlyParts }))
|
||||
|
||||
appendAssistantHistory(sessionHistory, { id: "msg_3", parts: stepOnlyParts })
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_3", parts: stepOnlyParts }))
|
||||
|
||||
// then
|
||||
expect(ctx.client.session.summarize).toHaveBeenCalledTimes(1)
|
||||
expect(ctx.client.tui.showToast).toHaveBeenCalledTimes(1)
|
||||
expect(logMock).toHaveBeenCalledWith(
|
||||
@@ -98,7 +133,9 @@ describe("preemptive-compaction post-compaction degradation monitor", () => {
|
||||
})
|
||||
|
||||
it("resets no-text streak when assistant emits text content", async () => {
|
||||
const ctx = createMockCtx()
|
||||
// given
|
||||
const sessionHistory: AssistantHistoryMessage[] = []
|
||||
const ctx = createMockCtx(sessionHistory)
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_tail_reset"
|
||||
|
||||
@@ -109,30 +146,48 @@ describe("preemptive-compaction post-compaction degradation monitor", () => {
|
||||
},
|
||||
})
|
||||
|
||||
// when
|
||||
appendAssistantHistory(sessionHistory, {
|
||||
id: "msg_1",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
})
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_1",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
appendAssistantHistory(sessionHistory, {
|
||||
id: "msg_2",
|
||||
parts: [{ type: "text", text: "Recovered response" }],
|
||||
})
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_2",
|
||||
parts: [{ type: "text", text: "Recovered response" }],
|
||||
}))
|
||||
|
||||
appendAssistantHistory(sessionHistory, {
|
||||
id: "msg_3",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
})
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_3",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
appendAssistantHistory(sessionHistory, {
|
||||
id: "msg_4",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
})
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_4",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
// then
|
||||
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
import { describe, expect, it as test } from "bun:test"
|
||||
|
||||
import { COMPACTION_GUARD_MS } from "./constants"
|
||||
import {
|
||||
acknowledgeCompactionGuard,
|
||||
armCompactionGuard,
|
||||
isCompactionGuardActive,
|
||||
} from "./compaction-guard"
|
||||
import type { SessionState } from "./types"
|
||||
|
||||
function createSessionState(): SessionState {
|
||||
return {
|
||||
stagnationCount: 0,
|
||||
consecutiveFailures: 0,
|
||||
}
|
||||
}
|
||||
|
||||
describe("compaction guard regressions", () => {
|
||||
describe("#given a compaction epoch was already acknowledged", () => {
|
||||
describe("#when a newer compaction epoch is armed", () => {
|
||||
test("#then the guard re-arms for the newer epoch", () => {
|
||||
const state = createSessionState()
|
||||
|
||||
const firstEpoch = armCompactionGuard(state, 1_000)
|
||||
expect(acknowledgeCompactionGuard(state, firstEpoch)).toBe(true)
|
||||
expect(isCompactionGuardActive(state, 1_001)).toBe(false)
|
||||
|
||||
const secondEpoch = armCompactionGuard(state, 2_000)
|
||||
|
||||
expect(secondEpoch).toBe(firstEpoch + 1)
|
||||
expect(state.recentCompactionEpoch).toBe(secondEpoch)
|
||||
expect(isCompactionGuardActive(state, 2_001)).toBe(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("#given a newer compaction epoch is armed before an older idle check finishes", () => {
|
||||
describe("#when the older epoch tries to acknowledge the guard", () => {
|
||||
test("#then it does not clear the newer epoch", () => {
|
||||
const state = createSessionState()
|
||||
|
||||
const firstEpoch = armCompactionGuard(state, 1_000)
|
||||
const secondEpoch = armCompactionGuard(state, 2_000)
|
||||
|
||||
expect(acknowledgeCompactionGuard(state, firstEpoch)).toBe(false)
|
||||
expect(state.acknowledgedCompactionEpoch).toBeUndefined()
|
||||
expect(state.recentCompactionEpoch).toBe(secondEpoch)
|
||||
expect(isCompactionGuardActive(state, 2_001)).toBe(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("#given the current compaction epoch is still inside the guard window", () => {
|
||||
describe("#when that same epoch is acknowledged", () => {
|
||||
test("#then continuation can proceed again without waiting for the window to expire", () => {
|
||||
const state = createSessionState()
|
||||
|
||||
const currentEpoch = armCompactionGuard(state, 1_000)
|
||||
|
||||
expect(isCompactionGuardActive(state, 1_000 + COMPACTION_GUARD_MS - 1)).toBe(true)
|
||||
expect(acknowledgeCompactionGuard(state, currentEpoch)).toBe(true)
|
||||
expect(isCompactionGuardActive(state, 1_001)).toBe(false)
|
||||
expect(isCompactionGuardActive(state, 1_000 + COMPACTION_GUARD_MS - 1)).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,8 +1,37 @@
|
||||
import { COMPACTION_GUARD_MS } from "./constants"
|
||||
import type { SessionState } from "./types"
|
||||
|
||||
export function armCompactionGuard(state: SessionState, now: number): number {
|
||||
const nextEpoch = (state.recentCompactionEpoch ?? 0) + 1
|
||||
|
||||
state.recentCompactionAt = now
|
||||
state.recentCompactionEpoch = nextEpoch
|
||||
|
||||
return nextEpoch
|
||||
}
|
||||
|
||||
export function acknowledgeCompactionGuard(
|
||||
state: SessionState,
|
||||
compactionEpoch: number | undefined
|
||||
): boolean {
|
||||
if (compactionEpoch === undefined) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (state.recentCompactionEpoch !== compactionEpoch) {
|
||||
return false
|
||||
}
|
||||
|
||||
state.acknowledgedCompactionEpoch = compactionEpoch
|
||||
return true
|
||||
}
|
||||
|
||||
export function isCompactionGuardActive(state: SessionState, now: number): boolean {
|
||||
if (!state.recentCompactionAt) {
|
||||
if (state.recentCompactionAt === undefined || state.recentCompactionEpoch === undefined) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (state.acknowledgedCompactionEpoch === state.recentCompactionEpoch) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import {
|
||||
import { log } from "../../shared/logger"
|
||||
|
||||
import { DEFAULT_SKIP_AGENTS, HOOK_NAME } from "./constants"
|
||||
import { armCompactionGuard } from "./compaction-guard"
|
||||
import type { SessionStateStore } from "./session-state"
|
||||
import { handleSessionIdle } from "./idle-event"
|
||||
import { handleNonIdleEvent } from "./non-idle-events"
|
||||
@@ -64,9 +65,9 @@ export function createTodoContinuationHandler(args: {
|
||||
const sessionID = (props?.sessionID ?? (props?.info as { id?: string } | undefined)?.id) as string | undefined
|
||||
if (sessionID) {
|
||||
const state = sessionStateStore.getState(sessionID)
|
||||
state.recentCompactionAt = Date.now()
|
||||
const compactionEpoch = armCompactionGuard(state, Date.now())
|
||||
sessionStateStore.cancelCountdown(sessionID)
|
||||
log(`[${HOOK_NAME}] Session compacted: marked recentCompactionAt`, { sessionID })
|
||||
log(`[${HOOK_NAME}] Session compacted: armed compaction guard`, { sessionID, compactionEpoch })
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -5,21 +5,14 @@ import { normalizeSDKResponse } from "../../shared"
|
||||
import { log } from "../../shared/logger"
|
||||
import { getAgentConfigKey } from "../../shared/agent-display-names"
|
||||
|
||||
import {
|
||||
ABORT_WINDOW_MS,
|
||||
CONTINUATION_COOLDOWN_MS,
|
||||
DEFAULT_SKIP_AGENTS,
|
||||
FAILURE_RESET_WINDOW_MS,
|
||||
HOOK_NAME,
|
||||
MAX_CONSECUTIVE_FAILURES,
|
||||
} from "./constants"
|
||||
import { ABORT_WINDOW_MS, CONTINUATION_COOLDOWN_MS, DEFAULT_SKIP_AGENTS, FAILURE_RESET_WINDOW_MS, HOOK_NAME, MAX_CONSECUTIVE_FAILURES } from "./constants"
|
||||
import { isLastAssistantMessageAborted } from "./abort-detection"
|
||||
import { hasUnansweredQuestion } from "./pending-question-detection"
|
||||
import { shouldStopForStagnation } from "./stagnation-detection"
|
||||
import { getIncompleteCount } from "./todo"
|
||||
import type { MessageInfo, ResolvedMessageInfo, Todo } from "./types"
|
||||
import { resolveLatestMessageInfo } from "./resolve-message-info"
|
||||
import { isCompactionGuardActive } from "./compaction-guard"
|
||||
import { acknowledgeCompactionGuard, isCompactionGuardActive } from "./compaction-guard"
|
||||
import type { SessionStateStore } from "./session-state"
|
||||
import { startCountdown } from "./countdown"
|
||||
|
||||
@@ -43,6 +36,7 @@ export async function handleSessionIdle(args: {
|
||||
log(`[${HOOK_NAME}] session.idle`, { sessionID })
|
||||
|
||||
const state = sessionStateStore.getState(sessionID)
|
||||
const observedCompactionEpoch = state.recentCompactionEpoch
|
||||
if (state.isRecovering) {
|
||||
log(`[${HOOK_NAME}] Skipped: in recovery`, { sessionID })
|
||||
return
|
||||
@@ -150,9 +144,18 @@ export async function handleSessionIdle(args: {
|
||||
resolvedInfo = { ...resolvedInfo, agent: sessionAgent }
|
||||
}
|
||||
|
||||
const acknowledgedCompaction = resolvedInfo?.agent ? acknowledgeCompactionGuard(state, observedCompactionEpoch) : false
|
||||
const compactionGuardActive = isCompactionGuardActive(state, Date.now())
|
||||
|
||||
log(`[${HOOK_NAME}] Agent check`, { sessionID, agentName: resolvedInfo?.agent, skipAgents, compactionGuardActive })
|
||||
log(`[${HOOK_NAME}] Agent check`, {
|
||||
sessionID,
|
||||
agentName: resolvedInfo?.agent,
|
||||
skipAgents,
|
||||
compactionGuardActive,
|
||||
observedCompactionEpoch,
|
||||
currentCompactionEpoch: state.recentCompactionEpoch,
|
||||
acknowledgedCompaction,
|
||||
})
|
||||
|
||||
const resolvedAgentName = resolvedInfo?.agent
|
||||
if (resolvedAgentName && skipAgents.some(s => getAgentConfigKey(s) === getAgentConfigKey(resolvedAgentName))) {
|
||||
@@ -163,8 +166,9 @@ export async function handleSessionIdle(args: {
|
||||
log(`[${HOOK_NAME}] Skipped: compaction occurred but no agent info resolved`, { sessionID })
|
||||
return
|
||||
}
|
||||
if (state.recentCompactionAt && resolvedInfo?.agent) {
|
||||
state.recentCompactionAt = undefined
|
||||
if (compactionGuardActive) {
|
||||
log(`[${HOOK_NAME}] Skipped: compaction guard still armed for current epoch`, { sessionID, observedCompactionEpoch, currentCompactionEpoch: state.recentCompactionEpoch })
|
||||
return
|
||||
}
|
||||
|
||||
if (isContinuationStopped?.(sessionID)) {
|
||||
|
||||
@@ -35,6 +35,8 @@ export interface SessionState {
|
||||
stagnationCount: number
|
||||
consecutiveFailures: number
|
||||
recentCompactionAt?: number
|
||||
recentCompactionEpoch?: number
|
||||
acknowledgedCompactionEpoch?: number
|
||||
}
|
||||
|
||||
export interface MessageInfo {
|
||||
|
||||
Reference in New Issue
Block a user