fix(session): detect post-compaction no-text degradation and trigger recovery (fixes #2232)
This commit is contained in:
180
src/hooks/preemptive-compaction-degradation-monitor.ts
Normal file
180
src/hooks/preemptive-compaction-degradation-monitor.ts
Normal file
@@ -0,0 +1,180 @@
|
||||
import type { OhMyOpenCodeConfig } from "../config"
|
||||
import { log } from "../shared/logger"
|
||||
import { isStepOnlyNoTextParts, resolveNoTextTailFromSession } from "./preemptive-compaction-no-text-tail"
|
||||
import { resolveCompactionModel } from "./shared/compaction-model-resolver"
|
||||
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
const POST_COMPACTION_MONITOR_COUNT = 5
|
||||
const POST_COMPACTION_NO_TEXT_THRESHOLD = 3
|
||||
|
||||
interface CompactionTargetState {
|
||||
providerID: string
|
||||
modelID: string
|
||||
}
|
||||
|
||||
interface ClientLike {
|
||||
session: {
|
||||
summarize: (input: {
|
||||
path: { id: string }
|
||||
body: { providerID: string; modelID: string; auto: true }
|
||||
query: { directory: string }
|
||||
}) => Promise<unknown>
|
||||
messages: (input: {
|
||||
path: { id: string }
|
||||
query: { directory: string }
|
||||
}) => Promise<unknown>
|
||||
}
|
||||
tui: {
|
||||
showToast: (input: {
|
||||
body: {
|
||||
title: string
|
||||
message: string
|
||||
variant: "warning"
|
||||
duration: number
|
||||
}
|
||||
}) => Promise<unknown>
|
||||
}
|
||||
}
|
||||
|
||||
export interface AssistantCompactionMessageInfo {
|
||||
sessionID: string
|
||||
id?: string
|
||||
parts?: unknown
|
||||
}
|
||||
|
||||
async function withTimeout<TValue>(
|
||||
promise: Promise<TValue>,
|
||||
timeoutMs: number,
|
||||
errorMessage: string,
|
||||
): Promise<TValue> {
|
||||
let timeoutID: ReturnType<typeof setTimeout> | undefined
|
||||
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timeoutID = setTimeout(() => {
|
||||
reject(new Error(errorMessage))
|
||||
}, timeoutMs)
|
||||
})
|
||||
|
||||
return await Promise.race([promise, timeoutPromise]).finally(() => {
|
||||
if (timeoutID !== undefined) clearTimeout(timeoutID)
|
||||
})
|
||||
}
|
||||
|
||||
export function createPostCompactionDegradationMonitor(args: {
|
||||
client: ClientLike
|
||||
directory: string
|
||||
pluginConfig: OhMyOpenCodeConfig
|
||||
tokenCache: Map<string, CompactionTargetState>
|
||||
compactionInProgress: Set<string>
|
||||
}) {
|
||||
const { client, directory, pluginConfig, tokenCache, compactionInProgress } = args
|
||||
const postCompactionRemaining = new Map<string, number>()
|
||||
const postCompactionNoTextStreak = new Map<string, number>()
|
||||
const postCompactionRecoveryTriggered = new Set<string>()
|
||||
|
||||
const clear = (sessionID: string): void => {
|
||||
postCompactionRemaining.delete(sessionID)
|
||||
postCompactionNoTextStreak.delete(sessionID)
|
||||
postCompactionRecoveryTriggered.delete(sessionID)
|
||||
}
|
||||
|
||||
const onSessionCompacted = (sessionID: string): void => {
|
||||
postCompactionRemaining.set(sessionID, POST_COMPACTION_MONITOR_COUNT)
|
||||
postCompactionNoTextStreak.set(sessionID, 0)
|
||||
postCompactionRecoveryTriggered.delete(sessionID)
|
||||
}
|
||||
|
||||
const triggerRecovery = async (sessionID: string): Promise<void> => {
|
||||
if (postCompactionRecoveryTriggered.has(sessionID) || compactionInProgress.has(sessionID)) return
|
||||
|
||||
const cached = tokenCache.get(sessionID)
|
||||
if (!cached?.modelID) {
|
||||
log("[preemptive-compaction] No-text tail detected but compaction model is unavailable", { sessionID })
|
||||
return
|
||||
}
|
||||
|
||||
postCompactionRecoveryTriggered.add(sessionID)
|
||||
compactionInProgress.add(sessionID)
|
||||
|
||||
try {
|
||||
const { providerID: targetProviderID, modelID: targetModelID } = resolveCompactionModel(
|
||||
pluginConfig,
|
||||
sessionID,
|
||||
cached.providerID,
|
||||
cached.modelID,
|
||||
)
|
||||
|
||||
await client.tui
|
||||
.showToast({
|
||||
body: {
|
||||
title: "Session Degradation Detected",
|
||||
message: "Detected repeated no-text assistant responses after compaction. Retrying compaction recovery.",
|
||||
variant: "warning",
|
||||
duration: 5000,
|
||||
},
|
||||
})
|
||||
.catch(() => {})
|
||||
|
||||
await withTimeout(
|
||||
client.session.summarize({
|
||||
path: { id: sessionID },
|
||||
body: { providerID: targetProviderID, modelID: targetModelID, auto: true },
|
||||
query: { directory },
|
||||
}),
|
||||
PREEMPTIVE_COMPACTION_TIMEOUT_MS,
|
||||
`Compaction recovery summarize timed out after ${PREEMPTIVE_COMPACTION_TIMEOUT_MS}ms`,
|
||||
)
|
||||
|
||||
log("[preemptive-compaction] Triggered recovery after post-compaction no-text tail", { sessionID })
|
||||
} catch (error) {
|
||||
log("[preemptive-compaction] Failed to recover post-compaction no-text tail", {
|
||||
sessionID,
|
||||
error: String(error),
|
||||
})
|
||||
} finally {
|
||||
compactionInProgress.delete(sessionID)
|
||||
clear(sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
const onAssistantMessageUpdated = async (info: AssistantCompactionMessageInfo): Promise<void> => {
|
||||
const remaining = postCompactionRemaining.get(info.sessionID)
|
||||
if (!remaining || remaining <= 0) return
|
||||
|
||||
if (remaining === 1) {
|
||||
postCompactionRemaining.delete(info.sessionID)
|
||||
} else {
|
||||
postCompactionRemaining.set(info.sessionID, remaining - 1)
|
||||
}
|
||||
|
||||
const isNoTextTail = isStepOnlyNoTextParts(info.parts)
|
||||
|| await resolveNoTextTailFromSession({
|
||||
client,
|
||||
sessionID: info.sessionID,
|
||||
messageID: info.id,
|
||||
directory,
|
||||
})
|
||||
|
||||
if (!isNoTextTail) {
|
||||
postCompactionNoTextStreak.set(info.sessionID, 0)
|
||||
return
|
||||
}
|
||||
|
||||
const nextStreak = (postCompactionNoTextStreak.get(info.sessionID) ?? 0) + 1
|
||||
postCompactionNoTextStreak.set(info.sessionID, nextStreak)
|
||||
|
||||
if (nextStreak >= POST_COMPACTION_NO_TEXT_THRESHOLD) {
|
||||
log("[preemptive-compaction] Detected post-compaction no-text tail pattern", {
|
||||
sessionID: info.sessionID,
|
||||
streak: nextStreak,
|
||||
})
|
||||
await triggerRecovery(info.sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
clear,
|
||||
onSessionCompacted,
|
||||
onAssistantMessageUpdated,
|
||||
}
|
||||
}
|
||||
70
src/hooks/preemptive-compaction-no-text-tail.ts
Normal file
70
src/hooks/preemptive-compaction-no-text-tail.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { normalizeSDKResponse } from "../shared/normalize-sdk-response"
|
||||
|
||||
const STEP_ONLY_TYPES = new Set(["step-start", "step-finish"])
|
||||
|
||||
interface MessagePart {
|
||||
type?: unknown
|
||||
text?: unknown
|
||||
}
|
||||
|
||||
interface SessionMessage {
|
||||
info?: {
|
||||
id?: string
|
||||
role?: string
|
||||
}
|
||||
parts?: MessagePart[]
|
||||
}
|
||||
|
||||
export function isStepOnlyNoTextParts(parts: unknown): boolean {
|
||||
if (!Array.isArray(parts) || parts.length === 0) return false
|
||||
|
||||
return parts.every((part) => {
|
||||
const type = (part as MessagePart | undefined)?.type
|
||||
if (typeof type !== "string") return false
|
||||
if (!STEP_ONLY_TYPES.has(type)) return false
|
||||
|
||||
const text = (part as MessagePart | undefined)?.text
|
||||
if (typeof text === "string" && text.trim().length > 0) return false
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
function findMessageByID(messages: SessionMessage[], messageID?: string): SessionMessage | undefined {
|
||||
if (!messageID) return undefined
|
||||
return messages.find((message) => message.info?.id === messageID)
|
||||
}
|
||||
|
||||
export async function resolveNoTextTailFromSession(args: {
|
||||
client: {
|
||||
session: {
|
||||
messages: (input: {
|
||||
path: { id: string }
|
||||
query: { directory: string }
|
||||
}) => Promise<unknown>
|
||||
}
|
||||
}
|
||||
sessionID: string
|
||||
messageID?: string
|
||||
directory: string
|
||||
}): Promise<boolean> {
|
||||
const { client, sessionID, messageID, directory } = args
|
||||
|
||||
try {
|
||||
const response = await client.session.messages({
|
||||
path: { id: sessionID },
|
||||
query: { directory },
|
||||
})
|
||||
|
||||
const messages = normalizeSDKResponse(response, [] as SessionMessage[], {
|
||||
preferResponseOnMissingData: true,
|
||||
})
|
||||
if (!Array.isArray(messages) || messages.length === 0) return false
|
||||
|
||||
const target = findMessageByID(messages, messageID) ?? messages[messages.length - 1]
|
||||
if (target.info?.role !== "assistant") return false
|
||||
|
||||
return isStepOnlyNoTextParts(target.parts)
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
138
src/hooks/preemptive-compaction.degradation-monitor.test.ts
Normal file
138
src/hooks/preemptive-compaction.degradation-monitor.test.ts
Normal file
@@ -0,0 +1,138 @@
|
||||
import { beforeEach, describe, expect, it, mock } from "bun:test"
|
||||
|
||||
const logMock = mock(() => {})
|
||||
|
||||
mock.module("../shared/logger", () => ({
|
||||
log: logMock,
|
||||
}))
|
||||
|
||||
const { createPreemptiveCompactionHook } = await import("./preemptive-compaction")
|
||||
|
||||
function createMockCtx() {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
messages: mock(() => Promise.resolve({ data: [] })),
|
||||
summarize: mock(() => Promise.resolve({})),
|
||||
},
|
||||
tui: {
|
||||
showToast: mock(() => Promise.resolve({})),
|
||||
},
|
||||
},
|
||||
directory: "/tmp/test",
|
||||
}
|
||||
}
|
||||
|
||||
function buildAssistantUpdate(input: {
|
||||
sessionID: string
|
||||
id: string
|
||||
parts: unknown[]
|
||||
}): {
|
||||
event: {
|
||||
type: string
|
||||
properties: {
|
||||
info: {
|
||||
id: string
|
||||
role: string
|
||||
sessionID: string
|
||||
providerID: string
|
||||
modelID: string
|
||||
finish: boolean
|
||||
tokens: { input: number; output: number; reasoning: number; cache: { read: number; write: number } }
|
||||
parts: unknown[]
|
||||
}
|
||||
}
|
||||
}
|
||||
} {
|
||||
return {
|
||||
event: {
|
||||
type: "message.updated",
|
||||
properties: {
|
||||
info: {
|
||||
id: input.id,
|
||||
role: "assistant",
|
||||
sessionID: input.sessionID,
|
||||
providerID: "anthropic",
|
||||
modelID: "claude-sonnet-4-6",
|
||||
finish: true,
|
||||
tokens: { input: 1000, output: 10, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
parts: input.parts,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
describe("preemptive-compaction post-compaction degradation monitor", () => {
|
||||
beforeEach(() => {
|
||||
logMock.mockClear()
|
||||
})
|
||||
|
||||
it("triggers recovery summarize after three consecutive no-text tail messages", async () => {
|
||||
const ctx = createMockCtx()
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_tail_recovery"
|
||||
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.compacted",
|
||||
properties: { sessionID },
|
||||
},
|
||||
})
|
||||
|
||||
const stepOnlyParts = [{ type: "step-start" }, { type: "step-finish" }]
|
||||
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_1", parts: stepOnlyParts }))
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_2", parts: stepOnlyParts }))
|
||||
await hook.event(buildAssistantUpdate({ sessionID, id: "msg_3", parts: stepOnlyParts }))
|
||||
|
||||
expect(ctx.client.session.summarize).toHaveBeenCalledTimes(1)
|
||||
expect(ctx.client.tui.showToast).toHaveBeenCalledTimes(1)
|
||||
expect(logMock).toHaveBeenCalledWith(
|
||||
"[preemptive-compaction] Detected post-compaction no-text tail pattern",
|
||||
{
|
||||
sessionID,
|
||||
streak: 3,
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
it("resets no-text streak when assistant emits text content", async () => {
|
||||
const ctx = createMockCtx()
|
||||
const hook = createPreemptiveCompactionHook(ctx as never, {} as never)
|
||||
const sessionID = "ses_tail_reset"
|
||||
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.compacted",
|
||||
properties: { sessionID },
|
||||
},
|
||||
})
|
||||
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_1",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_2",
|
||||
parts: [{ type: "text", text: "Recovered response" }],
|
||||
}))
|
||||
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_3",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
await hook.event(buildAssistantUpdate({
|
||||
sessionID,
|
||||
id: "msg_4",
|
||||
parts: [{ type: "step-start" }, { type: "step-finish" }],
|
||||
}))
|
||||
|
||||
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
@@ -6,8 +6,9 @@ import {
|
||||
} from "../shared/context-limit-resolver"
|
||||
|
||||
import { resolveCompactionModel } from "./shared/compaction-model-resolver"
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
import { createPostCompactionDegradationMonitor } from "./preemptive-compaction-degradation-monitor"
|
||||
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
const PREEMPTIVE_COMPACTION_THRESHOLD = 0.78
|
||||
|
||||
interface TokenInfo {
|
||||
@@ -68,6 +69,14 @@ export function createPreemptiveCompactionHook(
|
||||
const compactedSessions = new Set<string>()
|
||||
const tokenCache = new Map<string, CachedCompactionState>()
|
||||
|
||||
const postCompactionMonitor = createPostCompactionDegradationMonitor({
|
||||
client: ctx.client,
|
||||
directory: ctx.directory,
|
||||
pluginConfig,
|
||||
tokenCache,
|
||||
compactionInProgress,
|
||||
})
|
||||
|
||||
const toolExecuteAfter = async (
|
||||
input: { tool: string; sessionID: string; callID: string },
|
||||
_output: { title: string; output: string; metadata: unknown }
|
||||
@@ -92,14 +101,9 @@ export function createPreemptiveCompactionHook(
|
||||
return
|
||||
}
|
||||
|
||||
const lastTokens = cached.tokens
|
||||
const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0)
|
||||
const totalInputTokens = (cached.tokens.input ?? 0) + (cached.tokens.cache?.read ?? 0)
|
||||
const usageRatio = totalInputTokens / actualLimit
|
||||
|
||||
if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD) return
|
||||
|
||||
const modelID = cached.modelID
|
||||
if (!modelID) return
|
||||
if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD || !cached.modelID) return
|
||||
|
||||
compactionInProgress.add(sessionID)
|
||||
|
||||
@@ -108,7 +112,7 @@ export function createPreemptiveCompactionHook(
|
||||
pluginConfig,
|
||||
sessionID,
|
||||
cached.providerID,
|
||||
modelID
|
||||
cached.modelID,
|
||||
)
|
||||
|
||||
await withTimeout(
|
||||
@@ -133,34 +137,53 @@ export function createPreemptiveCompactionHook(
|
||||
const props = event.properties as Record<string, unknown> | undefined
|
||||
|
||||
if (event.type === "session.deleted") {
|
||||
const sessionInfo = props?.info as { id?: string } | undefined
|
||||
if (sessionInfo?.id) {
|
||||
compactionInProgress.delete(sessionInfo.id)
|
||||
compactedSessions.delete(sessionInfo.id)
|
||||
tokenCache.delete(sessionInfo.id)
|
||||
const sessionID = (props?.info as { id?: string } | undefined)?.id
|
||||
if (sessionID) {
|
||||
compactionInProgress.delete(sessionID)
|
||||
compactedSessions.delete(sessionID)
|
||||
tokenCache.delete(sessionID)
|
||||
postCompactionMonitor.clear(sessionID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (event.type === "session.compacted") {
|
||||
const sessionID = (props?.sessionID as string | undefined)
|
||||
?? (props?.info as { id?: string } | undefined)?.id
|
||||
if (sessionID) {
|
||||
postCompactionMonitor.onSessionCompacted(sessionID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (event.type === "message.updated") {
|
||||
const info = props?.info as {
|
||||
id?: string
|
||||
role?: string
|
||||
sessionID?: string
|
||||
providerID?: string
|
||||
modelID?: string
|
||||
finish?: boolean
|
||||
tokens?: TokenInfo
|
||||
parts?: unknown
|
||||
} | undefined
|
||||
|
||||
if (!info || info.role !== "assistant" || !info.finish) return
|
||||
if (!info.sessionID || !info.providerID || !info.tokens) return
|
||||
if (!info || info.role !== "assistant" || !info.finish || !info.sessionID) return
|
||||
|
||||
tokenCache.set(info.sessionID, {
|
||||
providerID: info.providerID,
|
||||
modelID: info.modelID ?? "",
|
||||
tokens: info.tokens,
|
||||
})
|
||||
if (info.providerID && info.tokens) {
|
||||
tokenCache.set(info.sessionID, {
|
||||
providerID: info.providerID,
|
||||
modelID: info.modelID ?? "",
|
||||
tokens: info.tokens,
|
||||
})
|
||||
}
|
||||
compactedSessions.delete(info.sessionID)
|
||||
|
||||
await postCompactionMonitor.onAssistantMessageUpdated({
|
||||
sessionID: info.sessionID,
|
||||
id: info.id,
|
||||
parts: info.parts,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user