diff --git a/src/hooks/anthropic-context-window-limit-recovery/deduplication-recovery.ts b/src/hooks/anthropic-context-window-limit-recovery/deduplication-recovery.ts new file mode 100644 index 000000000..d7cb0314e --- /dev/null +++ b/src/hooks/anthropic-context-window-limit-recovery/deduplication-recovery.ts @@ -0,0 +1,71 @@ +import type { ParsedTokenLimitError } from "./types" +import type { ExperimentalConfig } from "../../config" +import type { DeduplicationConfig } from "./pruning-deduplication" +import type { PruningState } from "./pruning-types" +import { executeDeduplication } from "./pruning-deduplication" +import { truncateToolOutputsByCallId } from "./pruning-tool-output-truncation" +import { log } from "../../shared/logger" + +function createPruningState(): PruningState { + return { + toolIdsToPrune: new Set(), + currentTurn: 0, + fileOperations: new Map(), + toolSignatures: new Map(), + erroredTools: new Map(), + } +} + +function isPromptTooLongError(parsed: ParsedTokenLimitError): boolean { + return !parsed.errorType.toLowerCase().includes("non-empty content") +} + +function getDeduplicationPlan( + experimental?: ExperimentalConfig, +): { config: DeduplicationConfig; protectedTools: Set } | null { + const pruningConfig = experimental?.dynamic_context_pruning + if (!pruningConfig?.enabled) return null + + const deduplicationEnabled = pruningConfig.strategies?.deduplication?.enabled + if (deduplicationEnabled === false) return null + + const protectedTools = new Set(pruningConfig.protected_tools ?? []) + return { + config: { + enabled: true, + protectedTools: pruningConfig.protected_tools ?? [], + }, + protectedTools, + } +} + +export async function attemptDeduplicationRecovery( + sessionID: string, + parsed: ParsedTokenLimitError, + experimental: ExperimentalConfig | undefined, +): Promise { + if (!isPromptTooLongError(parsed)) return + + const plan = getDeduplicationPlan(experimental) + if (!plan) return + + const pruningState = createPruningState() + const prunedCount = executeDeduplication( + sessionID, + pruningState, + plan.config, + plan.protectedTools, + ) + const { truncatedCount } = truncateToolOutputsByCallId( + sessionID, + pruningState.toolIdsToPrune, + ) + + if (prunedCount > 0 || truncatedCount > 0) { + log("[auto-compact] deduplication recovery applied", { + sessionID, + prunedCount, + truncatedCount, + }) + } +} diff --git a/src/hooks/anthropic-context-window-limit-recovery/pruning-tool-output-truncation.ts b/src/hooks/anthropic-context-window-limit-recovery/pruning-tool-output-truncation.ts new file mode 100644 index 000000000..3958724a9 --- /dev/null +++ b/src/hooks/anthropic-context-window-limit-recovery/pruning-tool-output-truncation.ts @@ -0,0 +1,92 @@ +import { existsSync, readdirSync, readFileSync } from "node:fs" +import { join } from "node:path" +import { getOpenCodeStorageDir } from "../../shared/data-path" +import { truncateToolResult } from "./storage" +import { log } from "../../shared/logger" + +interface StoredToolPart { + type?: string + callID?: string + truncated?: boolean + state?: { + output?: string + } +} + +const OPENCODE_STORAGE = getOpenCodeStorageDir() +const MESSAGE_STORAGE = join(OPENCODE_STORAGE, "message") +const PART_STORAGE = join(OPENCODE_STORAGE, "part") + +function getMessageDir(sessionID: string): string | null { + if (!existsSync(MESSAGE_STORAGE)) return null + + const directPath = join(MESSAGE_STORAGE, sessionID) + if (existsSync(directPath)) return directPath + + for (const dir of readdirSync(MESSAGE_STORAGE)) { + const sessionPath = join(MESSAGE_STORAGE, dir, sessionID) + if (existsSync(sessionPath)) return sessionPath + } + + return null +} + +function getMessageIds(sessionID: string): string[] { + const messageDir = getMessageDir(sessionID) + if (!messageDir) return [] + + const messageIds: string[] = [] + for (const file of readdirSync(messageDir)) { + if (!file.endsWith(".json")) continue + messageIds.push(file.replace(".json", "")) + } + + return messageIds +} + +export function truncateToolOutputsByCallId( + sessionID: string, + callIds: Set, +): { truncatedCount: number } { + if (callIds.size === 0) return { truncatedCount: 0 } + + const messageIds = getMessageIds(sessionID) + if (messageIds.length === 0) return { truncatedCount: 0 } + + let truncatedCount = 0 + + for (const messageID of messageIds) { + const partDir = join(PART_STORAGE, messageID) + if (!existsSync(partDir)) continue + + for (const file of readdirSync(partDir)) { + if (!file.endsWith(".json")) continue + const partPath = join(partDir, file) + + try { + const content = readFileSync(partPath, "utf-8") + const part = JSON.parse(content) as StoredToolPart + + if (part.type !== "tool" || !part.callID) continue + if (!callIds.has(part.callID)) continue + if (!part.state?.output || part.truncated) continue + + const result = truncateToolResult(partPath) + if (result.success) { + truncatedCount++ + } + } catch { + continue + } + } + } + + if (truncatedCount > 0) { + log("[auto-compact] pruned duplicate tool outputs", { + sessionID, + truncatedCount, + }) + } + + return { truncatedCount } +} diff --git a/src/hooks/anthropic-context-window-limit-recovery/recovery-deduplication.test.ts b/src/hooks/anthropic-context-window-limit-recovery/recovery-deduplication.test.ts new file mode 100644 index 000000000..2b82051af --- /dev/null +++ b/src/hooks/anthropic-context-window-limit-recovery/recovery-deduplication.test.ts @@ -0,0 +1,155 @@ +import { describe, test, expect, mock } from "bun:test" +import type { PluginInput } from "@opencode-ai/plugin" +import type { ExperimentalConfig } from "../../config" +import { mkdtempSync, mkdirSync, rmSync, writeFileSync, readFileSync } from "node:fs" +import { join } from "node:path" +import { tmpdir } from "node:os" + +function createImmediateTimeouts(): () => void { + const originalSetTimeout = globalThis.setTimeout + const originalClearTimeout = globalThis.clearTimeout + + globalThis.setTimeout = ((callback: (...args: unknown[]) => void, _delay?: number, ...args: unknown[]) => { + callback(...args) + return 0 as unknown as ReturnType + }) as typeof setTimeout + + globalThis.clearTimeout = ((_: ReturnType) => {}) as typeof clearTimeout + + return () => { + globalThis.setTimeout = originalSetTimeout + globalThis.clearTimeout = originalClearTimeout + } +} + +function writeJson(filePath: string, data: unknown): void { + writeFileSync(filePath, JSON.stringify(data, null, 2)) +} + +describe("createAnthropicContextWindowLimitRecoveryHook", () => { + test("attempts deduplication recovery when compaction hits prompt too long errors", async () => { + const restoreTimeouts = createImmediateTimeouts() + const originalDataHome = process.env.XDG_DATA_HOME + const tempHome = mkdtempSync(join(tmpdir(), "omo-context-")) + process.env.XDG_DATA_HOME = tempHome + + const storageRoot = join(tempHome, "opencode", "storage") + const messageDir = join(storageRoot, "message", "session-96") + const partDir = join(storageRoot, "part", "message-1") + const partDirTwo = join(storageRoot, "part", "message-2") + + mkdirSync(messageDir, { recursive: true }) + mkdirSync(partDir, { recursive: true }) + mkdirSync(partDirTwo, { recursive: true }) + + writeJson(join(messageDir, "message-1.json"), { + parts: [ + { + type: "tool", + callID: "call-1", + tool: "read", + state: { input: { filePath: "/tmp/a.txt" } }, + }, + ], + }) + + writeJson(join(messageDir, "message-2.json"), { + parts: [ + { + type: "tool", + callID: "call-2", + tool: "read", + state: { input: { filePath: "/tmp/a.txt" } }, + }, + ], + }) + + writeJson(join(partDir, "part-1.json"), { + id: "part-1", + sessionID: "session-96", + messageID: "message-1", + type: "tool", + callID: "call-1", + tool: "read", + state: { + status: "completed", + input: { filePath: "/tmp/a.txt" }, + output: "duplicate output", + }, + }) + + writeJson(join(partDirTwo, "part-2.json"), { + id: "part-2", + sessionID: "session-96", + messageID: "message-2", + type: "tool", + callID: "call-2", + tool: "read", + state: { + status: "completed", + input: { filePath: "/tmp/a.txt" }, + output: "latest output", + }, + }) + + const experimental = { + dynamic_context_pruning: { + enabled: true, + strategies: { + deduplication: { enabled: true }, + }, + }, + } satisfies ExperimentalConfig + + let resolveSummarize: (() => void) | null = null + const summarizePromise = new Promise((resolve) => { + resolveSummarize = resolve + }) + + const mockClient = { + session: { + messages: mock(() => Promise.resolve({ data: [] })), + summarize: mock(() => summarizePromise), + revert: mock(() => Promise.resolve()), + prompt_async: mock(() => Promise.resolve()), + }, + tui: { + showToast: mock(() => Promise.resolve()), + }, + } + + try { + const { createAnthropicContextWindowLimitRecoveryHook } = await import("./recovery-hook") + const ctx = { client: mockClient, directory: "/tmp" } as PluginInput + const hook = createAnthropicContextWindowLimitRecoveryHook(ctx, { experimental }) + + // given - initial token limit error schedules compaction + await hook.event({ + event: { + type: "session.error", + properties: { sessionID: "session-96", error: "prompt is too long" }, + }, + }) + + // when - compaction hits another prompt-too-long error + await hook.event({ + event: { + type: "session.error", + properties: { sessionID: "session-96", error: "prompt is too long" }, + }, + }) + + // then - duplicate tool output is truncated + const prunedPart = JSON.parse( + readFileSync(join(partDir, "part-1.json"), "utf-8"), + ) as { truncated?: boolean } + + expect(prunedPart.truncated).toBe(true) + } finally { + if (resolveSummarize) resolveSummarize() + restoreTimeouts() + process.env.XDG_DATA_HOME = originalDataHome + rmSync(tempHome, { recursive: true, force: true }) + } + }) +}) diff --git a/src/hooks/anthropic-context-window-limit-recovery/recovery-hook.ts b/src/hooks/anthropic-context-window-limit-recovery/recovery-hook.ts index 62adbd9e1..f4bcb0f25 100644 --- a/src/hooks/anthropic-context-window-limit-recovery/recovery-hook.ts +++ b/src/hooks/anthropic-context-window-limit-recovery/recovery-hook.ts @@ -3,6 +3,7 @@ import type { AutoCompactState, ParsedTokenLimitError } from "./types" import type { ExperimentalConfig } from "../../config" import { parseAnthropicTokenLimitError } from "./parser" import { executeCompact, getLastAssistant } from "./executor" +import { attemptDeduplicationRecovery } from "./deduplication-recovery" import { log } from "../../shared/logger" export interface AnthropicContextWindowLimitRecoveryOptions { @@ -56,6 +57,7 @@ export function createAnthropicContextWindowLimitRecoveryHook( autoCompactState.errorDataBySession.set(sessionID, parsed) if (autoCompactState.compactionInProgress.has(sessionID)) { + await attemptDeduplicationRecovery(sessionID, parsed, experimental) return }