fix: wire deduplication into compaction recovery for prompt-too-long errors (#96)
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
@@ -0,0 +1,71 @@
|
||||
import type { ParsedTokenLimitError } from "./types"
|
||||
import type { ExperimentalConfig } from "../../config"
|
||||
import type { DeduplicationConfig } from "./pruning-deduplication"
|
||||
import type { PruningState } from "./pruning-types"
|
||||
import { executeDeduplication } from "./pruning-deduplication"
|
||||
import { truncateToolOutputsByCallId } from "./pruning-tool-output-truncation"
|
||||
import { log } from "../../shared/logger"
|
||||
|
||||
function createPruningState(): PruningState {
|
||||
return {
|
||||
toolIdsToPrune: new Set<string>(),
|
||||
currentTurn: 0,
|
||||
fileOperations: new Map(),
|
||||
toolSignatures: new Map(),
|
||||
erroredTools: new Map(),
|
||||
}
|
||||
}
|
||||
|
||||
function isPromptTooLongError(parsed: ParsedTokenLimitError): boolean {
|
||||
return !parsed.errorType.toLowerCase().includes("non-empty content")
|
||||
}
|
||||
|
||||
function getDeduplicationPlan(
|
||||
experimental?: ExperimentalConfig,
|
||||
): { config: DeduplicationConfig; protectedTools: Set<string> } | null {
|
||||
const pruningConfig = experimental?.dynamic_context_pruning
|
||||
if (!pruningConfig?.enabled) return null
|
||||
|
||||
const deduplicationEnabled = pruningConfig.strategies?.deduplication?.enabled
|
||||
if (deduplicationEnabled === false) return null
|
||||
|
||||
const protectedTools = new Set(pruningConfig.protected_tools ?? [])
|
||||
return {
|
||||
config: {
|
||||
enabled: true,
|
||||
protectedTools: pruningConfig.protected_tools ?? [],
|
||||
},
|
||||
protectedTools,
|
||||
}
|
||||
}
|
||||
|
||||
export async function attemptDeduplicationRecovery(
|
||||
sessionID: string,
|
||||
parsed: ParsedTokenLimitError,
|
||||
experimental: ExperimentalConfig | undefined,
|
||||
): Promise<void> {
|
||||
if (!isPromptTooLongError(parsed)) return
|
||||
|
||||
const plan = getDeduplicationPlan(experimental)
|
||||
if (!plan) return
|
||||
|
||||
const pruningState = createPruningState()
|
||||
const prunedCount = executeDeduplication(
|
||||
sessionID,
|
||||
pruningState,
|
||||
plan.config,
|
||||
plan.protectedTools,
|
||||
)
|
||||
const { truncatedCount } = truncateToolOutputsByCallId(
|
||||
sessionID,
|
||||
pruningState.toolIdsToPrune,
|
||||
)
|
||||
|
||||
if (prunedCount > 0 || truncatedCount > 0) {
|
||||
log("[auto-compact] deduplication recovery applied", {
|
||||
sessionID,
|
||||
prunedCount,
|
||||
truncatedCount,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
import { existsSync, readdirSync, readFileSync } from "node:fs"
|
||||
import { join } from "node:path"
|
||||
import { getOpenCodeStorageDir } from "../../shared/data-path"
|
||||
import { truncateToolResult } from "./storage"
|
||||
import { log } from "../../shared/logger"
|
||||
|
||||
interface StoredToolPart {
|
||||
type?: string
|
||||
callID?: string
|
||||
truncated?: boolean
|
||||
state?: {
|
||||
output?: string
|
||||
}
|
||||
}
|
||||
|
||||
const OPENCODE_STORAGE = getOpenCodeStorageDir()
|
||||
const MESSAGE_STORAGE = join(OPENCODE_STORAGE, "message")
|
||||
const PART_STORAGE = join(OPENCODE_STORAGE, "part")
|
||||
|
||||
function getMessageDir(sessionID: string): string | null {
|
||||
if (!existsSync(MESSAGE_STORAGE)) return null
|
||||
|
||||
const directPath = join(MESSAGE_STORAGE, sessionID)
|
||||
if (existsSync(directPath)) return directPath
|
||||
|
||||
for (const dir of readdirSync(MESSAGE_STORAGE)) {
|
||||
const sessionPath = join(MESSAGE_STORAGE, dir, sessionID)
|
||||
if (existsSync(sessionPath)) return sessionPath
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
function getMessageIds(sessionID: string): string[] {
|
||||
const messageDir = getMessageDir(sessionID)
|
||||
if (!messageDir) return []
|
||||
|
||||
const messageIds: string[] = []
|
||||
for (const file of readdirSync(messageDir)) {
|
||||
if (!file.endsWith(".json")) continue
|
||||
messageIds.push(file.replace(".json", ""))
|
||||
}
|
||||
|
||||
return messageIds
|
||||
}
|
||||
|
||||
export function truncateToolOutputsByCallId(
|
||||
sessionID: string,
|
||||
callIds: Set<string>,
|
||||
): { truncatedCount: number } {
|
||||
if (callIds.size === 0) return { truncatedCount: 0 }
|
||||
|
||||
const messageIds = getMessageIds(sessionID)
|
||||
if (messageIds.length === 0) return { truncatedCount: 0 }
|
||||
|
||||
let truncatedCount = 0
|
||||
|
||||
for (const messageID of messageIds) {
|
||||
const partDir = join(PART_STORAGE, messageID)
|
||||
if (!existsSync(partDir)) continue
|
||||
|
||||
for (const file of readdirSync(partDir)) {
|
||||
if (!file.endsWith(".json")) continue
|
||||
const partPath = join(partDir, file)
|
||||
|
||||
try {
|
||||
const content = readFileSync(partPath, "utf-8")
|
||||
const part = JSON.parse(content) as StoredToolPart
|
||||
|
||||
if (part.type !== "tool" || !part.callID) continue
|
||||
if (!callIds.has(part.callID)) continue
|
||||
if (!part.state?.output || part.truncated) continue
|
||||
|
||||
const result = truncateToolResult(partPath)
|
||||
if (result.success) {
|
||||
truncatedCount++
|
||||
}
|
||||
} catch {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (truncatedCount > 0) {
|
||||
log("[auto-compact] pruned duplicate tool outputs", {
|
||||
sessionID,
|
||||
truncatedCount,
|
||||
})
|
||||
}
|
||||
|
||||
return { truncatedCount }
|
||||
}
|
||||
@@ -0,0 +1,155 @@
|
||||
import { describe, test, expect, mock } from "bun:test"
|
||||
import type { PluginInput } from "@opencode-ai/plugin"
|
||||
import type { ExperimentalConfig } from "../../config"
|
||||
import { mkdtempSync, mkdirSync, rmSync, writeFileSync, readFileSync } from "node:fs"
|
||||
import { join } from "node:path"
|
||||
import { tmpdir } from "node:os"
|
||||
|
||||
function createImmediateTimeouts(): () => void {
|
||||
const originalSetTimeout = globalThis.setTimeout
|
||||
const originalClearTimeout = globalThis.clearTimeout
|
||||
|
||||
globalThis.setTimeout = ((callback: (...args: unknown[]) => void, _delay?: number, ...args: unknown[]) => {
|
||||
callback(...args)
|
||||
return 0 as unknown as ReturnType<typeof setTimeout>
|
||||
}) as typeof setTimeout
|
||||
|
||||
globalThis.clearTimeout = ((_: ReturnType<typeof setTimeout>) => {}) as typeof clearTimeout
|
||||
|
||||
return () => {
|
||||
globalThis.setTimeout = originalSetTimeout
|
||||
globalThis.clearTimeout = originalClearTimeout
|
||||
}
|
||||
}
|
||||
|
||||
function writeJson(filePath: string, data: unknown): void {
|
||||
writeFileSync(filePath, JSON.stringify(data, null, 2))
|
||||
}
|
||||
|
||||
describe("createAnthropicContextWindowLimitRecoveryHook", () => {
|
||||
test("attempts deduplication recovery when compaction hits prompt too long errors", async () => {
|
||||
const restoreTimeouts = createImmediateTimeouts()
|
||||
const originalDataHome = process.env.XDG_DATA_HOME
|
||||
const tempHome = mkdtempSync(join(tmpdir(), "omo-context-"))
|
||||
process.env.XDG_DATA_HOME = tempHome
|
||||
|
||||
const storageRoot = join(tempHome, "opencode", "storage")
|
||||
const messageDir = join(storageRoot, "message", "session-96")
|
||||
const partDir = join(storageRoot, "part", "message-1")
|
||||
const partDirTwo = join(storageRoot, "part", "message-2")
|
||||
|
||||
mkdirSync(messageDir, { recursive: true })
|
||||
mkdirSync(partDir, { recursive: true })
|
||||
mkdirSync(partDirTwo, { recursive: true })
|
||||
|
||||
writeJson(join(messageDir, "message-1.json"), {
|
||||
parts: [
|
||||
{
|
||||
type: "tool",
|
||||
callID: "call-1",
|
||||
tool: "read",
|
||||
state: { input: { filePath: "/tmp/a.txt" } },
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
writeJson(join(messageDir, "message-2.json"), {
|
||||
parts: [
|
||||
{
|
||||
type: "tool",
|
||||
callID: "call-2",
|
||||
tool: "read",
|
||||
state: { input: { filePath: "/tmp/a.txt" } },
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
writeJson(join(partDir, "part-1.json"), {
|
||||
id: "part-1",
|
||||
sessionID: "session-96",
|
||||
messageID: "message-1",
|
||||
type: "tool",
|
||||
callID: "call-1",
|
||||
tool: "read",
|
||||
state: {
|
||||
status: "completed",
|
||||
input: { filePath: "/tmp/a.txt" },
|
||||
output: "duplicate output",
|
||||
},
|
||||
})
|
||||
|
||||
writeJson(join(partDirTwo, "part-2.json"), {
|
||||
id: "part-2",
|
||||
sessionID: "session-96",
|
||||
messageID: "message-2",
|
||||
type: "tool",
|
||||
callID: "call-2",
|
||||
tool: "read",
|
||||
state: {
|
||||
status: "completed",
|
||||
input: { filePath: "/tmp/a.txt" },
|
||||
output: "latest output",
|
||||
},
|
||||
})
|
||||
|
||||
const experimental = {
|
||||
dynamic_context_pruning: {
|
||||
enabled: true,
|
||||
strategies: {
|
||||
deduplication: { enabled: true },
|
||||
},
|
||||
},
|
||||
} satisfies ExperimentalConfig
|
||||
|
||||
let resolveSummarize: (() => void) | null = null
|
||||
const summarizePromise = new Promise<void>((resolve) => {
|
||||
resolveSummarize = resolve
|
||||
})
|
||||
|
||||
const mockClient = {
|
||||
session: {
|
||||
messages: mock(() => Promise.resolve({ data: [] })),
|
||||
summarize: mock(() => summarizePromise),
|
||||
revert: mock(() => Promise.resolve()),
|
||||
prompt_async: mock(() => Promise.resolve()),
|
||||
},
|
||||
tui: {
|
||||
showToast: mock(() => Promise.resolve()),
|
||||
},
|
||||
}
|
||||
|
||||
try {
|
||||
const { createAnthropicContextWindowLimitRecoveryHook } = await import("./recovery-hook")
|
||||
const ctx = { client: mockClient, directory: "/tmp" } as PluginInput
|
||||
const hook = createAnthropicContextWindowLimitRecoveryHook(ctx, { experimental })
|
||||
|
||||
// given - initial token limit error schedules compaction
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.error",
|
||||
properties: { sessionID: "session-96", error: "prompt is too long" },
|
||||
},
|
||||
})
|
||||
|
||||
// when - compaction hits another prompt-too-long error
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.error",
|
||||
properties: { sessionID: "session-96", error: "prompt is too long" },
|
||||
},
|
||||
})
|
||||
|
||||
// then - duplicate tool output is truncated
|
||||
const prunedPart = JSON.parse(
|
||||
readFileSync(join(partDir, "part-1.json"), "utf-8"),
|
||||
) as { truncated?: boolean }
|
||||
|
||||
expect(prunedPart.truncated).toBe(true)
|
||||
} finally {
|
||||
if (resolveSummarize) resolveSummarize()
|
||||
restoreTimeouts()
|
||||
process.env.XDG_DATA_HOME = originalDataHome
|
||||
rmSync(tempHome, { recursive: true, force: true })
|
||||
}
|
||||
})
|
||||
})
|
||||
@@ -3,6 +3,7 @@ import type { AutoCompactState, ParsedTokenLimitError } from "./types"
|
||||
import type { ExperimentalConfig } from "../../config"
|
||||
import { parseAnthropicTokenLimitError } from "./parser"
|
||||
import { executeCompact, getLastAssistant } from "./executor"
|
||||
import { attemptDeduplicationRecovery } from "./deduplication-recovery"
|
||||
import { log } from "../../shared/logger"
|
||||
|
||||
export interface AnthropicContextWindowLimitRecoveryOptions {
|
||||
@@ -56,6 +57,7 @@ export function createAnthropicContextWindowLimitRecoveryHook(
|
||||
autoCompactState.errorDataBySession.set(sessionID, parsed)
|
||||
|
||||
if (autoCompactState.compactionInProgress.has(sessionID)) {
|
||||
await attemptDeduplicationRecovery(sessionID, parsed, experimental)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user