fix: reduce session.messages() calls with event-based caching to prevent memory leaks

- Replace session.messages() fetch in context-window-monitor with message.updated event cache
- Replace session.messages() fetch in preemptive-compaction with message.updated event cache
- Add per-session transcript cache (5min TTL) to avoid full rebuild per tool call
- Remove session.messages() from background-agent polling (use event-based progress)
- Add TTL pruning to todo-continuation-enforcer session state Map
- Add setInterval.unref() to tool-input-cache cleanup timer

Fixes #1222
This commit is contained in:
popododo0720
2026-02-12 11:38:11 +09:00
parent e4be8cea75
commit eb56701996
9 changed files with 703 additions and 374 deletions

View File

@@ -1424,94 +1424,16 @@ Use \`background_output(task_id="${task.id}")\` to retrieve this result when rea
continue
}
const messagesResult = await this.client.session.messages({
path: { id: sessionID },
// Session is still actively running (not idle).
// Progress is already tracked via handleEvent(message.part.updated),
// so we skip the expensive session.messages() fetch here.
// Completion will be detected when session transitions to idle.
log("[background-agent] Session still running, relying on event-based progress:", {
taskId: task.id,
sessionID,
sessionStatus: sessionStatus?.type ?? "not_in_status",
toolCalls: task.progress?.toolCalls ?? 0,
})
if (!messagesResult.error && messagesResult.data) {
const messages = messagesResult.data as Array<{
info?: { role?: string }
parts?: Array<{ type?: string; tool?: string; name?: string; text?: string }>
}>
const assistantMsgs = messages.filter(
(m) => m.info?.role === "assistant"
)
let toolCalls = 0
let lastTool: string | undefined
let lastMessage: string | undefined
for (const msg of assistantMsgs) {
const parts = msg.parts ?? []
for (const part of parts) {
if (part.type === "tool_use" || part.tool) {
toolCalls++
lastTool = part.tool || part.name || "unknown"
}
if (part.type === "text" && part.text) {
lastMessage = part.text
}
}
}
if (!task.progress) {
task.progress = { toolCalls: 0, lastUpdate: new Date() }
}
task.progress.toolCalls = toolCalls
task.progress.lastTool = lastTool
task.progress.lastUpdate = new Date()
if (lastMessage) {
task.progress.lastMessage = lastMessage
task.progress.lastMessageAt = new Date()
}
// Stability detection: complete when message count unchanged for 3 polls
const currentMsgCount = messages.length
const startedAt = task.startedAt
if (!startedAt) continue
const elapsedMs = Date.now() - startedAt.getTime()
if (elapsedMs >= MIN_STABILITY_TIME_MS) {
if (task.lastMsgCount === currentMsgCount) {
task.stablePolls = (task.stablePolls ?? 0) + 1
if (task.stablePolls >= 3) {
// Re-fetch session status to confirm agent is truly idle
const recheckStatus = await this.client.session.status()
const recheckData = (recheckStatus.data ?? {}) as Record<string, { type: string }>
const currentStatus = recheckData[sessionID]
if (currentStatus?.type !== "idle") {
log("[background-agent] Stability reached but session not idle, resetting:", {
taskId: task.id,
sessionStatus: currentStatus?.type ?? "not_in_status"
})
task.stablePolls = 0
continue
}
// Edge guard: Validate session has actual output before completing
const hasValidOutput = await this.validateSessionHasOutput(sessionID)
if (!hasValidOutput) {
log("[background-agent] Stability reached but no valid output, waiting:", task.id)
continue
}
// Re-check status after async operation
if (task.status !== "running") continue
const hasIncompleteTodos = await this.checkSessionTodos(sessionID)
if (!hasIncompleteTodos) {
await this.tryCompleteTask(task, "stability detection")
continue
}
}
} else {
task.stablePolls = 0
}
}
task.lastMsgCount = currentMsgCount
}
} catch (error) {
log("[background-agent] Poll error for task:", { taskId: task.id, error })
}

View File

@@ -37,7 +37,7 @@ export function getToolInput(
}
// Periodic cleanup (every minute)
setInterval(() => {
const cleanupInterval = setInterval(() => {
const now = Date.now()
for (const [key, entry] of cache.entries()) {
if (now - entry.timestamp > CACHE_TTL) {
@@ -45,3 +45,7 @@ setInterval(() => {
}
}
}, CACHE_TTL)
// Allow process to exit naturally even if interval is running
if (typeof cleanupInterval === "object" && "unref" in cleanupInterval) {
cleanupInterval.unref()
}

View File

@@ -0,0 +1,102 @@
import { describe, it, expect, mock, beforeEach, afterEach } from "bun:test"
import { existsSync, unlinkSync, readFileSync } from "fs"
import {
buildTranscriptFromSession,
deleteTempTranscript,
clearTranscriptCache,
} from "./transcript"
function createMockClient(messages: unknown[] = []) {
return {
session: {
messages: mock(() =>
Promise.resolve({
data: messages,
})
),
},
}
}
describe("transcript caching", () => {
afterEach(() => {
clearTranscriptCache()
})
// #given same session called twice
// #when buildTranscriptFromSession is invoked
// #then session.messages() should be called only once (cached)
it("should cache transcript and not re-fetch for same session", async () => {
const client = createMockClient([
{
info: { role: "assistant" },
parts: [
{
type: "tool",
tool: "bash",
state: { status: "completed", input: { command: "ls" } },
},
],
},
])
const path1 = await buildTranscriptFromSession(
client,
"ses_cache1",
"/tmp",
"bash",
{ command: "echo hi" }
)
const path2 = await buildTranscriptFromSession(
client,
"ses_cache1",
"/tmp",
"read",
{ path: "/tmp/file" }
)
// session.messages() called only once
expect(client.session.messages).toHaveBeenCalledTimes(1)
// Both return valid paths
expect(path1).not.toBeNull()
expect(path2).not.toBeNull()
// Second call should append the new tool entry
if (path2) {
const content = readFileSync(path2, "utf-8")
expect(content).toContain("Read")
}
deleteTempTranscript(path1)
deleteTempTranscript(path2)
})
// #given different sessions
// #when buildTranscriptFromSession called for each
// #then session.messages() should be called for each
it("should not share cache between different sessions", async () => {
const client = createMockClient([])
await buildTranscriptFromSession(client, "ses_a", "/tmp", "bash", {})
await buildTranscriptFromSession(client, "ses_b", "/tmp", "bash", {})
expect(client.session.messages).toHaveBeenCalledTimes(2)
clearTranscriptCache()
})
// #given clearTranscriptCache is called
// #when buildTranscriptFromSession called again
// #then should re-fetch
it("should re-fetch after cache is cleared", async () => {
const client = createMockClient([])
await buildTranscriptFromSession(client, "ses_clear", "/tmp", "bash", {})
clearTranscriptCache()
await buildTranscriptFromSession(client, "ses_clear", "/tmp", "bash", {})
expect(client.session.messages).toHaveBeenCalledTimes(2)
})
})

View File

@@ -29,12 +29,9 @@ export function appendTranscriptEntry(
}
// ============================================================================
// Claude Code Compatible Transcript Builder (PORT FROM DISABLED)
// Claude Code Compatible Transcript Builder
// ============================================================================
/**
* OpenCode API response type (loosely typed)
*/
interface OpenCodeMessagePart {
type: string
tool?: string
@@ -51,9 +48,6 @@ interface OpenCodeMessage {
parts?: OpenCodeMessagePart[]
}
/**
* Claude Code compatible transcript entry (from disabled file)
*/
interface DisabledTranscriptEntry {
type: "assistant"
message: {
@@ -66,18 +60,93 @@ interface DisabledTranscriptEntry {
}
}
// ============================================================================
// Session-scoped transcript cache to avoid full session.messages() rebuild
// on every tool call. Cache stores base entries from initial fetch;
// subsequent calls append new tool entries without re-fetching.
// ============================================================================
interface TranscriptCacheEntry {
baseEntries: string[]
tempPath: string | null
createdAt: number
}
const TRANSCRIPT_CACHE_TTL_MS = 5 * 60 * 1000 // 5 minutes
const transcriptCache = new Map<string, TranscriptCacheEntry>()
/**
* Build Claude Code compatible transcript from session messages
*
* PORT FROM DISABLED: This calls client.session.messages() API to fetch
* the full session history and builds a JSONL file in Claude Code format.
*
* @param client OpenCode client instance
* @param sessionId Session ID
* @param directory Working directory
* @param currentToolName Current tool being executed (added as last entry)
* @param currentToolInput Current tool input
* @returns Temp file path (caller must call deleteTempTranscript!)
* Clear transcript cache for a specific session or all sessions.
* Call on session.deleted to prevent memory accumulation.
*/
export function clearTranscriptCache(sessionId?: string): void {
if (sessionId) {
const entry = transcriptCache.get(sessionId)
if (entry?.tempPath) {
try { unlinkSync(entry.tempPath) } catch { /* ignore */ }
}
transcriptCache.delete(sessionId)
} else {
for (const [, entry] of transcriptCache) {
if (entry.tempPath) {
try { unlinkSync(entry.tempPath) } catch { /* ignore */ }
}
}
transcriptCache.clear()
}
}
function isCacheValid(entry: TranscriptCacheEntry): boolean {
return Date.now() - entry.createdAt < TRANSCRIPT_CACHE_TTL_MS
}
function buildCurrentEntry(toolName: string, toolInput: Record<string, unknown>): string {
const entry: DisabledTranscriptEntry = {
type: "assistant",
message: {
role: "assistant",
content: [
{
type: "tool_use",
name: transformToolName(toolName),
input: toolInput,
},
],
},
}
return JSON.stringify(entry)
}
function parseMessagesToEntries(messages: OpenCodeMessage[]): string[] {
const entries: string[] = []
for (const msg of messages) {
if (msg.info?.role !== "assistant") continue
for (const part of msg.parts || []) {
if (part.type !== "tool") continue
if (part.state?.status !== "completed") continue
if (!part.state?.input) continue
const rawToolName = part.tool as string
const toolName = transformToolName(rawToolName)
const entry: DisabledTranscriptEntry = {
type: "assistant",
message: {
role: "assistant",
content: [{ type: "tool_use", name: toolName, input: part.state.input }],
},
}
entries.push(JSON.stringify(entry))
}
}
return entries
}
/**
* Build Claude Code compatible transcript from session messages.
* Uses per-session cache to avoid redundant session.messages() API calls.
* First call fetches and caches; subsequent calls reuse cached base entries.
*/
export async function buildTranscriptFromSession(
client: {
@@ -91,97 +160,63 @@ export async function buildTranscriptFromSession(
currentToolInput: Record<string, unknown>
): Promise<string | null> {
try {
const response = await client.session.messages({
path: { id: sessionId },
query: { directory },
})
let baseEntries: string[]
// Handle various response formats
const messages = (response as { "200"?: unknown[]; data?: unknown[] })["200"]
?? (response as { data?: unknown[] }).data
?? (Array.isArray(response) ? response : [])
const cached = transcriptCache.get(sessionId)
if (cached && isCacheValid(cached)) {
baseEntries = cached.baseEntries
} else {
// Fetch full session messages (only on first call or cache expiry)
const response = await client.session.messages({
path: { id: sessionId },
query: { directory },
})
const entries: string[] = []
const messages = (response as { "200"?: unknown[]; data?: unknown[] })["200"]
?? (response as { data?: unknown[] }).data
?? (Array.isArray(response) ? response : [])
if (Array.isArray(messages)) {
for (const msg of messages as OpenCodeMessage[]) {
if (msg.info?.role !== "assistant") continue
baseEntries = Array.isArray(messages)
? parseMessagesToEntries(messages as OpenCodeMessage[])
: []
for (const part of msg.parts || []) {
if (part.type !== "tool") continue
if (part.state?.status !== "completed") continue
if (!part.state?.input) continue
const rawToolName = part.tool as string
const toolName = transformToolName(rawToolName)
const entry: DisabledTranscriptEntry = {
type: "assistant",
message: {
role: "assistant",
content: [
{
type: "tool_use",
name: toolName,
input: part.state.input,
},
],
},
}
entries.push(JSON.stringify(entry))
}
// Clean up old temp file if exists
if (cached?.tempPath) {
try { unlinkSync(cached.tempPath) } catch { /* ignore */ }
}
transcriptCache.set(sessionId, {
baseEntries,
tempPath: null,
createdAt: Date.now(),
})
}
// Always add current tool call as the last entry
const currentEntry: DisabledTranscriptEntry = {
type: "assistant",
message: {
role: "assistant",
content: [
{
type: "tool_use",
name: transformToolName(currentToolName),
input: currentToolInput,
},
],
},
}
entries.push(JSON.stringify(currentEntry))
// Append current tool call
const allEntries = [...baseEntries, buildCurrentEntry(currentToolName, currentToolInput)]
// Write to temp file
const tempPath = join(
tmpdir(),
`opencode-transcript-${sessionId}-${randomUUID()}.jsonl`
)
writeFileSync(tempPath, entries.join("\n") + "\n")
writeFileSync(tempPath, allEntries.join("\n") + "\n")
// Update cache temp path for cleanup tracking
const cacheEntry = transcriptCache.get(sessionId)
if (cacheEntry) {
cacheEntry.tempPath = tempPath
}
return tempPath
} catch {
// CRITICAL FIX: Even on API failure, create file with current tool entry only
// (matching original disabled behavior - never return null with incompatible format)
try {
const currentEntry: DisabledTranscriptEntry = {
type: "assistant",
message: {
role: "assistant",
content: [
{
type: "tool_use",
name: transformToolName(currentToolName),
input: currentToolInput,
},
],
},
}
const tempPath = join(
tmpdir(),
`opencode-transcript-${sessionId}-${randomUUID()}.jsonl`
)
writeFileSync(tempPath, JSON.stringify(currentEntry) + "\n")
writeFileSync(tempPath, buildCurrentEntry(currentToolName, currentToolInput) + "\n")
return tempPath
} catch {
// If even this fails, return null (truly catastrophic failure)
return null
}
}
@@ -189,8 +224,6 @@ export async function buildTranscriptFromSession(
/**
* Delete temp transcript file (call in finally block)
*
* PORT FROM DISABLED: Cleanup mechanism to avoid disk accumulation
*/
export function deleteTempTranscript(path: string | null): void {
if (!path) return

View File

@@ -0,0 +1,185 @@
import { describe, it, expect, mock, beforeEach } from "bun:test"
import { createContextWindowMonitorHook } from "./context-window-monitor"
function createMockCtx() {
return {
client: {
session: {
messages: mock(() => Promise.resolve({ data: [] })),
},
},
directory: "/tmp/test",
}
}
describe("context-window-monitor", () => {
let ctx: ReturnType<typeof createMockCtx>
beforeEach(() => {
ctx = createMockCtx()
})
// #given event caches token info from message.updated
// #when tool.execute.after is called
// #then session.messages() should NOT be called
it("should use cached token info instead of fetching session.messages()", async () => {
const hook = createContextWindowMonitorHook(ctx as never)
const sessionID = "ses_test1"
// Simulate message.updated event with token info
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
finish: true,
tokens: {
input: 50000,
output: 1000,
reasoning: 0,
cache: { read: 10000, write: 0 },
},
},
},
},
})
const output = { title: "", output: "test output", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
// session.messages() should NOT have been called
expect(ctx.client.session.messages).not.toHaveBeenCalled()
})
// #given no cached token info exists
// #when tool.execute.after is called
// #then should skip gracefully without fetching
it("should skip gracefully when no cached token info exists", async () => {
const hook = createContextWindowMonitorHook(ctx as never)
const sessionID = "ses_no_cache"
const output = { title: "", output: "test output", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
// No fetch, no crash
expect(ctx.client.session.messages).not.toHaveBeenCalled()
expect(output.output).toBe("test output")
})
// #given token usage exceeds 70% threshold
// #when tool.execute.after is called
// #then context reminder should be appended to output
it("should append context reminder when usage exceeds threshold", async () => {
const hook = createContextWindowMonitorHook(ctx as never)
const sessionID = "ses_high_usage"
// 150K input + 10K cache read = 160K, which is 80% of 200K limit
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
finish: true,
tokens: {
input: 150000,
output: 1000,
reasoning: 0,
cache: { read: 10000, write: 0 },
},
},
},
},
})
const output = { title: "", output: "original", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
expect(output.output).toContain("context remaining")
expect(ctx.client.session.messages).not.toHaveBeenCalled()
})
// #given session is deleted
// #when session.deleted event fires
// #then cached data should be cleaned up
it("should clean up cache on session.deleted", async () => {
const hook = createContextWindowMonitorHook(ctx as never)
const sessionID = "ses_deleted"
// Cache some data
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
finish: true,
tokens: { input: 150000, output: 0, reasoning: 0, cache: { read: 10000, write: 0 } },
},
},
},
})
// Delete session
await hook.event({
event: {
type: "session.deleted",
properties: { info: { id: sessionID } },
},
})
// After deletion, no reminder should fire (cache gone, reminded set gone)
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
expect(output.output).toBe("test")
})
// #given non-anthropic provider
// #when message.updated fires
// #then should not trigger reminder
it("should ignore non-anthropic providers", async () => {
const hook = createContextWindowMonitorHook(ctx as never)
const sessionID = "ses_openai"
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "openai",
finish: true,
tokens: { input: 200000, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
},
},
},
})
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
expect(output.output).toBe("test")
})
})

View File

@@ -15,23 +15,21 @@ You are using Anthropic Claude with 1M context window.
You have plenty of context remaining - do NOT rush or skip tasks.
Complete your work thoroughly and methodically.`
interface AssistantMessageInfo {
role: "assistant"
providerID: string
tokens: {
input: number
output: number
reasoning: number
cache: { read: number; write: number }
}
interface TokenInfo {
input: number
output: number
reasoning: number
cache: { read: number; write: number }
}
interface MessageWrapper {
info: { role: string } & Partial<AssistantMessageInfo>
interface CachedTokenState {
providerID: string
tokens: TokenInfo
}
export function createContextWindowMonitorHook(ctx: PluginInput) {
const remindedSessions = new Set<string>()
const tokenCache = new Map<string, CachedTokenState>()
const toolExecuteAfter = async (
input: { tool: string; sessionID: string; callID: string },
@@ -41,44 +39,28 @@ export function createContextWindowMonitorHook(ctx: PluginInput) {
if (remindedSessions.has(sessionID)) return
try {
const response = await ctx.client.session.messages({
path: { id: sessionID },
})
const cached = tokenCache.get(sessionID)
if (!cached) return
const messages = (response.data ?? response) as MessageWrapper[]
if (cached.providerID !== "anthropic") return
const assistantMessages = messages
.filter((m) => m.info.role === "assistant")
.map((m) => m.info as AssistantMessageInfo)
const lastTokens = cached.tokens
const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0)
if (assistantMessages.length === 0) return
const actualUsagePercentage = totalInputTokens / ANTHROPIC_ACTUAL_LIMIT
const lastAssistant = assistantMessages[assistantMessages.length - 1]
if (lastAssistant.providerID !== "anthropic") return
if (actualUsagePercentage < CONTEXT_WARNING_THRESHOLD) return
// Use only the last assistant message's input tokens
// This reflects the ACTUAL current context window usage (post-compaction)
const lastTokens = lastAssistant.tokens
const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0)
remindedSessions.add(sessionID)
const actualUsagePercentage = totalInputTokens / ANTHROPIC_ACTUAL_LIMIT
const displayUsagePercentage = totalInputTokens / ANTHROPIC_DISPLAY_LIMIT
const usedPct = (displayUsagePercentage * 100).toFixed(1)
const remainingPct = ((1 - displayUsagePercentage) * 100).toFixed(1)
const usedTokens = totalInputTokens.toLocaleString()
const limitTokens = ANTHROPIC_DISPLAY_LIMIT.toLocaleString()
if (actualUsagePercentage < CONTEXT_WARNING_THRESHOLD) return
remindedSessions.add(sessionID)
const displayUsagePercentage = totalInputTokens / ANTHROPIC_DISPLAY_LIMIT
const usedPct = (displayUsagePercentage * 100).toFixed(1)
const remainingPct = ((1 - displayUsagePercentage) * 100).toFixed(1)
const usedTokens = totalInputTokens.toLocaleString()
const limitTokens = ANTHROPIC_DISPLAY_LIMIT.toLocaleString()
output.output += `\n\n${CONTEXT_REMINDER}
output.output += `\n\n${CONTEXT_REMINDER}
[Context Status: ${usedPct}% used (${usedTokens}/${limitTokens} tokens), ${remainingPct}% remaining]`
} catch {
// Graceful degradation - do not disrupt tool execution
}
}
const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => {
@@ -88,8 +70,27 @@ export function createContextWindowMonitorHook(ctx: PluginInput) {
const sessionInfo = props?.info as { id?: string } | undefined
if (sessionInfo?.id) {
remindedSessions.delete(sessionInfo.id)
tokenCache.delete(sessionInfo.id)
}
}
if (event.type === "message.updated") {
const info = props?.info as {
role?: string
sessionID?: string
providerID?: string
finish?: boolean
tokens?: TokenInfo
} | undefined
if (!info || info.role !== "assistant" || !info.finish) return
if (!info.sessionID || !info.providerID || !info.tokens) return
tokenCache.set(info.sessionID, {
providerID: info.providerID,
tokens: info.tokens,
})
}
}
return {

View File

@@ -1,132 +1,155 @@
import { describe, expect, mock, test } from "bun:test"
import { createPreemptiveCompactionHook } from "./preemptive-compaction.ts"
import { describe, it, expect, mock, beforeEach } from "bun:test"
import { createPreemptiveCompactionHook } from "./preemptive-compaction"
function createMockCtx() {
return {
client: {
session: {
messages: mock(() => Promise.resolve({ data: [] })),
summarize: mock(() => Promise.resolve({})),
},
tui: {
showToast: mock(() => Promise.resolve()),
},
},
directory: "/tmp/test",
}
}
describe("preemptive-compaction", () => {
const sessionID = "preemptive-compaction-session"
let ctx: ReturnType<typeof createMockCtx>
function createMockCtx(overrides?: {
messages?: ReturnType<typeof mock>
summarize?: ReturnType<typeof mock>
}) {
const messages = overrides?.messages ?? mock(() => Promise.resolve({ data: [] }))
const summarize = overrides?.summarize ?? mock(() => Promise.resolve())
beforeEach(() => {
ctx = createMockCtx()
})
return {
client: {
session: {
messages,
summarize,
},
tui: {
showToast: mock(() => Promise.resolve()),
// #given event caches token info from message.updated
// #when tool.execute.after is called
// #then session.messages() should NOT be called
it("should use cached token info instead of fetching session.messages()", async () => {
const hook = createPreemptiveCompactionHook(ctx as never)
const sessionID = "ses_test1"
// Simulate message.updated with token info below threshold
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
modelID: "claude-sonnet-4-5",
finish: true,
tokens: {
input: 50000,
output: 1000,
reasoning: 0,
cache: { read: 5000, write: 0 },
},
},
},
},
directory: "/tmp/test",
} as never
}
})
test("triggers summarize when usage exceeds threshold", async () => {
// #given
const messages = mock(() =>
Promise.resolve({
data: [
{
info: {
role: "assistant",
providerID: "anthropic",
modelID: "claude-opus-4-6",
tokens: {
input: 180000,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
},
},
],
})
)
const summarize = mock(() => Promise.resolve())
const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize }))
const output = { title: "", output: "", metadata: {} }
// #when
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "Read", sessionID, callID: "call-1" },
{ tool: "bash", sessionID, callID: "call_1" },
output
)
// #then
expect(summarize).toHaveBeenCalled()
expect(ctx.client.session.messages).not.toHaveBeenCalled()
})
test("triggers summarize for non-anthropic providers when usage exceeds threshold", async () => {
//#given
const messages = mock(() =>
Promise.resolve({
data: [
{
info: {
role: "assistant",
providerID: "openai",
modelID: "gpt-5.2",
tokens: {
input: 180000,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
},
},
],
})
)
const summarize = mock(() => Promise.resolve())
const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize }))
const output = { title: "", output: "", metadata: {} }
// #given no cached token info
// #when tool.execute.after is called
// #then should skip without fetching
it("should skip gracefully when no cached token info exists", async () => {
const hook = createPreemptiveCompactionHook(ctx as never)
//#when
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "Read", sessionID, callID: "call-3" },
{ tool: "bash", sessionID: "ses_none", callID: "call_1" },
output
)
//#then
expect(summarize).toHaveBeenCalled()
expect(ctx.client.session.messages).not.toHaveBeenCalled()
})
test("does not summarize when usage is below threshold", async () => {
// #given
const messages = mock(() =>
Promise.resolve({
data: [
{
info: {
role: "assistant",
providerID: "anthropic",
modelID: "claude-opus-4-6",
tokens: {
input: 100000,
output: 0,
reasoning: 0,
cache: { read: 0, write: 0 },
},
// #given usage above 78% threshold
// #when tool.execute.after runs
// #then should trigger summarize
it("should trigger compaction when usage exceeds threshold", async () => {
const hook = createPreemptiveCompactionHook(ctx as never)
const sessionID = "ses_high"
// 170K input + 10K cache = 180K → 90% of 200K
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
modelID: "claude-sonnet-4-5",
finish: true,
tokens: {
input: 170000,
output: 1000,
reasoning: 0,
cache: { read: 10000, write: 0 },
},
},
],
})
)
const summarize = mock(() => Promise.resolve())
const hook = createPreemptiveCompactionHook(createMockCtx({ messages, summarize }))
const output = { title: "", output: "", metadata: {} }
},
},
})
// #when
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "Read", sessionID, callID: "call-2" },
{ tool: "bash", sessionID, callID: "call_1" },
output
)
// #then
expect(summarize).not.toHaveBeenCalled()
expect(ctx.client.session.messages).not.toHaveBeenCalled()
expect(ctx.client.session.summarize).toHaveBeenCalled()
})
// #given session deleted
// #then cache should be cleaned up
it("should clean up cache on session.deleted", async () => {
const hook = createPreemptiveCompactionHook(ctx as never)
const sessionID = "ses_del"
await hook.event({
event: {
type: "message.updated",
properties: {
info: {
role: "assistant",
sessionID,
providerID: "anthropic",
modelID: "claude-sonnet-4-5",
finish: true,
tokens: { input: 180000, output: 0, reasoning: 0, cache: { read: 10000, write: 0 } },
},
},
},
})
await hook.event({
event: {
type: "session.deleted",
properties: { info: { id: sessionID } },
},
})
const output = { title: "", output: "test", metadata: null }
await hook["tool.execute.after"](
{ tool: "bash", sessionID, callID: "call_1" },
output
)
expect(ctx.client.session.summarize).not.toHaveBeenCalled()
})
})

View File

@@ -8,29 +8,29 @@ const ANTHROPIC_ACTUAL_LIMIT =
const PREEMPTIVE_COMPACTION_THRESHOLD = 0.78
interface AssistantMessageInfo {
role: "assistant"
providerID: string
modelID?: string
tokens: {
input: number
output: number
reasoning: number
cache: { read: number; write: number }
}
interface TokenInfo {
input: number
output: number
reasoning: number
cache: { read: number; write: number }
}
interface MessageWrapper {
info: { role: string } & Partial<AssistantMessageInfo>
interface CachedCompactionState {
providerID: string
modelID: string
tokens: TokenInfo
}
type PluginInput = {
client: {
session: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
messages: (...args: any[]) => any
// eslint-disable-next-line @typescript-eslint/no-explicit-any
summarize: (...args: any[]) => any
}
tui: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
showToast: (...args: any[]) => any
}
}
@@ -40,6 +40,7 @@ type PluginInput = {
export function createPreemptiveCompactionHook(ctx: PluginInput) {
const compactionInProgress = new Set<string>()
const compactedSessions = new Set<string>()
const tokenCache = new Map<string, CachedCompactionState>()
const toolExecuteAfter = async (
input: { tool: string; sessionID: string; callID: string },
@@ -48,38 +49,29 @@ export function createPreemptiveCompactionHook(ctx: PluginInput) {
const { sessionID } = input
if (compactedSessions.has(sessionID) || compactionInProgress.has(sessionID)) return
const cached = tokenCache.get(sessionID)
if (!cached) return
const actualLimit =
cached.providerID === "anthropic"
? ANTHROPIC_ACTUAL_LIMIT
: DEFAULT_ACTUAL_LIMIT
const lastTokens = cached.tokens
const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0)
const usageRatio = totalInputTokens / actualLimit
if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD) return
const modelID = cached.modelID
if (!modelID) return
compactionInProgress.add(sessionID)
try {
const response = await ctx.client.session.messages({
path: { id: sessionID },
})
const payload = response as { data?: MessageWrapper[] } | MessageWrapper[]
const messages = Array.isArray(payload) ? payload : (payload.data ?? [])
const assistantMessages = messages
.filter((m) => m.info.role === "assistant")
.map((m) => m.info as AssistantMessageInfo)
if (assistantMessages.length === 0) return
const lastAssistant = assistantMessages[assistantMessages.length - 1]
const actualLimit =
lastAssistant.providerID === "anthropic"
? ANTHROPIC_ACTUAL_LIMIT
: DEFAULT_ACTUAL_LIMIT
const lastTokens = lastAssistant.tokens
const totalInputTokens = (lastTokens?.input ?? 0) + (lastTokens?.cache?.read ?? 0)
const usageRatio = totalInputTokens / actualLimit
if (usageRatio < PREEMPTIVE_COMPACTION_THRESHOLD) return
const modelID = lastAssistant.modelID
if (!modelID) return
compactionInProgress.add(sessionID)
await ctx.client.session.summarize({
path: { id: sessionID },
body: { providerID: lastAssistant.providerID, modelID, auto: true } as never,
body: { providerID: cached.providerID, modelID, auto: true } as never,
query: { directory: ctx.directory },
})
@@ -92,12 +84,36 @@ export function createPreemptiveCompactionHook(ctx: PluginInput) {
}
const eventHandler = async ({ event }: { event: { type: string; properties?: unknown } }) => {
if (event.type !== "session.deleted") return
const props = event.properties as Record<string, unknown> | undefined
const sessionInfo = props?.info as { id?: string } | undefined
if (sessionInfo?.id) {
compactionInProgress.delete(sessionInfo.id)
compactedSessions.delete(sessionInfo.id)
if (event.type === "session.deleted") {
const sessionInfo = props?.info as { id?: string } | undefined
if (sessionInfo?.id) {
compactionInProgress.delete(sessionInfo.id)
compactedSessions.delete(sessionInfo.id)
tokenCache.delete(sessionInfo.id)
}
return
}
if (event.type === "message.updated") {
const info = props?.info as {
role?: string
sessionID?: string
providerID?: string
modelID?: string
finish?: boolean
tokens?: TokenInfo
} | undefined
if (!info || info.role !== "assistant" || !info.finish) return
if (!info.sessionID || !info.providerID || !info.tokens) return
tokenCache.set(info.sessionID, {
providerID: info.providerID,
modelID: info.modelID ?? "",
tokens: info.tokens,
})
}
}

View File

@@ -1,33 +1,69 @@
import type { SessionState } from "./types"
// TTL for idle session state entries (10 minutes)
const SESSION_STATE_TTL_MS = 10 * 60 * 1000
// Prune interval (every 2 minutes)
const SESSION_STATE_PRUNE_INTERVAL_MS = 2 * 60 * 1000
interface TrackedSessionState {
state: SessionState
lastAccessedAt: number
}
export interface SessionStateStore {
getState: (sessionID: string) => SessionState
getExistingState: (sessionID: string) => SessionState | undefined
cancelCountdown: (sessionID: string) => void
cleanup: (sessionID: string) => void
cancelAllCountdowns: () => void
shutdown: () => void
}
export function createSessionStateStore(): SessionStateStore {
const sessions = new Map<string, SessionState>()
const sessions = new Map<string, TrackedSessionState>()
// Periodic pruning of stale session states to prevent unbounded Map growth
let pruneInterval: ReturnType<typeof setInterval> | undefined
pruneInterval = setInterval(() => {
const now = Date.now()
for (const [sessionID, tracked] of sessions.entries()) {
if (now - tracked.lastAccessedAt > SESSION_STATE_TTL_MS) {
cancelCountdown(sessionID)
sessions.delete(sessionID)
}
}
}, SESSION_STATE_PRUNE_INTERVAL_MS)
// Allow process to exit naturally even if interval is running
if (typeof pruneInterval === "object" && "unref" in pruneInterval) {
pruneInterval.unref()
}
function getState(sessionID: string): SessionState {
const existingState = sessions.get(sessionID)
if (existingState) return existingState
const existing = sessions.get(sessionID)
if (existing) {
existing.lastAccessedAt = Date.now()
return existing.state
}
const state: SessionState = {}
sessions.set(sessionID, state)
sessions.set(sessionID, { state, lastAccessedAt: Date.now() })
return state
}
function getExistingState(sessionID: string): SessionState | undefined {
return sessions.get(sessionID)
const existing = sessions.get(sessionID)
if (existing) {
existing.lastAccessedAt = Date.now()
return existing.state
}
return undefined
}
function cancelCountdown(sessionID: string): void {
const state = sessions.get(sessionID)
if (!state) return
const tracked = sessions.get(sessionID)
if (!tracked) return
const state = tracked.state
if (state.countdownTimer) {
clearTimeout(state.countdownTimer)
state.countdownTimer = undefined
@@ -52,11 +88,18 @@ export function createSessionStateStore(): SessionStateStore {
}
}
function shutdown(): void {
clearInterval(pruneInterval)
cancelAllCountdowns()
sessions.clear()
}
return {
getState,
getExistingState,
cancelCountdown,
cleanup,
cancelAllCountdowns,
shutdown,
}
}