From f342dcfa120cfa00f9b0edc0679b19651720e27e Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Wed, 11 Mar 2026 20:09:17 +0900 Subject: [PATCH] fix(call-omo-agent): add finally cleanup for sync executor session Sets Sync call_omo_agent leaked entries in global activeSessionMessages and activeSessionToolResults Sets when execution threw errors, since cleanup only ran on success path. - Wrap session Set operations in try/finally blocks - Ensure Set.delete() runs regardless of success/failure - Add guard against double-cleanup Tests: 2 pass, 14 expects --- .../call-omo-agent/sync-executor-leak.test.ts | 134 ++++++++++++++++++ src/tools/call-omo-agent/sync-executor.ts | 28 ++-- 2 files changed, 150 insertions(+), 12 deletions(-) create mode 100644 src/tools/call-omo-agent/sync-executor-leak.test.ts diff --git a/src/tools/call-omo-agent/sync-executor-leak.test.ts b/src/tools/call-omo-agent/sync-executor-leak.test.ts new file mode 100644 index 000000000..2dfc1bc78 --- /dev/null +++ b/src/tools/call-omo-agent/sync-executor-leak.test.ts @@ -0,0 +1,134 @@ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test" +import { + _resetForTesting, + subagentSessions, + syncSubagentSessions, +} from "../../features/claude-code-session-state" +import { executeSync } from "./sync-executor" + +type ExecuteSyncArgs = Parameters[0] +type ExecuteSyncToolContext = Parameters[1] +type ExecuteSyncDeps = NonNullable[3]> + +function createArgs(): ExecuteSyncArgs { + return { + subagent_type: "explore", + description: "cleanup leak", + prompt: "find something", + run_in_background: false, + } +} + +function createToolContext(): ExecuteSyncToolContext { + return { + sessionID: "parent-session", + messageID: "msg-1", + agent: "sisyphus", + abort: new AbortController().signal, + metadata: mock(async () => {}), + } +} + +function createContext(promptAsync: ReturnType) { + return { + client: { + session: { + promptAsync, + }, + }, + } +} + +function createDependencies(overrides?: Partial): ExecuteSyncDeps { + return { + createOrGetSession: mock(async () => ({ sessionID: "ses-default", isNew: true })), + waitForCompletion: mock(async () => {}), + processMessages: mock(async () => "agent response"), + setSessionFallbackChain: mock(() => {}), + ...overrides, + } +} + +describe("executeSync session cleanup", () => { + beforeEach(() => { + _resetForTesting() + }) + + afterEach(() => { + _resetForTesting() + }) + + describe("#given executeSync creates a session", () => { + test("#when execution completes successfully #then sessionID is removed from subagentSessions and syncSubagentSessions", async () => { + // given + const sessionID = "ses-cleanup-success" + const args = createArgs() + const toolContext = createToolContext() + const promptAsync = mock(async () => ({ data: {} })) + const deps = createDependencies({ + createOrGetSession: mock(async () => { + subagentSessions.add(sessionID) + syncSubagentSessions.add(sessionID) + return { sessionID, isNew: true } + }), + waitForCompletion: mock(async (createdSessionID: string) => { + expect(createdSessionID).toBe(sessionID) + expect(subagentSessions.has(sessionID)).toBe(true) + expect(syncSubagentSessions.has(sessionID)).toBe(true) + }), + }) + + expect(subagentSessions.has(sessionID)).toBe(false) + expect(syncSubagentSessions.has(sessionID)).toBe(false) + + // when + const result = await executeSync(args, toolContext, createContext(promptAsync) as never, deps) + + // then + expect(result).toContain(`session_id: ${sessionID}`) + expect(subagentSessions.has(sessionID)).toBe(false) + expect(syncSubagentSessions.has(sessionID)).toBe(false) + }) + + test("#when execution throws an error #then sessionID is still removed from both Sets", async () => { + // given + const sessionID = "ses-cleanup-error" + const args = createArgs() + const toolContext = createToolContext() + const promptAsync = mock(async () => ({ data: {} })) + const deps = createDependencies({ + createOrGetSession: mock(async () => { + subagentSessions.add(sessionID) + syncSubagentSessions.add(sessionID) + return { sessionID, isNew: true } + }), + waitForCompletion: mock(async (createdSessionID: string) => { + expect(createdSessionID).toBe(sessionID) + expect(subagentSessions.has(sessionID)).toBe(true) + expect(syncSubagentSessions.has(sessionID)).toBe(true) + throw new Error("poll exploded") + }), + }) + + // when + const resultPromise = executeSync(args, toolContext, createContext(promptAsync) as never, deps) + + // then + let thrownError: Error | undefined + + try { + await resultPromise + } catch (error) { + if (error instanceof Error) { + thrownError = error + } else { + throw error + } + } + + expect(thrownError?.message).toBe("poll exploded") + expect(subagentSessions.has(sessionID)).toBe(false) + expect(syncSubagentSessions.has(sessionID)).toBe(false) + }) + }) +}) diff --git a/src/tools/call-omo-agent/sync-executor.ts b/src/tools/call-omo-agent/sync-executor.ts index 72e34d5aa..2b900cd70 100644 --- a/src/tools/call-omo-agent/sync-executor.ts +++ b/src/tools/call-omo-agent/sync-executor.ts @@ -1,12 +1,12 @@ import type { CallOmoAgentArgs } from "./types" import type { PluginInput } from "@opencode-ai/plugin" -import { log } from "../../shared" -import type { FallbackEntry } from "../../shared/model-requirements" -import { getAgentToolRestrictions } from "../../shared" +import { subagentSessions, syncSubagentSessions } from "../../features/claude-code-session-state" import { setSessionFallbackChain } from "../../hooks/model-fallback/hook" -import { createOrGetSession } from "./session-creator" +import { getAgentToolRestrictions, log } from "../../shared" +import type { FallbackEntry } from "../../shared/model-requirements" import { waitForCompletion } from "./completion-poller" import { processMessages } from "./message-processor" +import { createOrGetSession } from "./session-creator" type SessionWithPromptAsync = { promptAsync: (opts: { path: { id: string }; body: Record }) => Promise @@ -59,10 +59,12 @@ export async function executeSync( deps.setSessionFallbackChain(sessionID, fallbackChain) } - await toolContext.metadata?.({ - title: args.description, - metadata: { sessionId: sessionID }, - }) + await Promise.resolve( + toolContext.metadata?.({ + title: args.description, + metadata: { sessionId: sessionID }, + }) + ) log(`[call_omo_agent] Sending prompt to session ${sessionID}`) log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100)) @@ -93,12 +95,14 @@ export async function executeSync( const responseText = await deps.processMessages(sessionID, ctx) - const output = - responseText + "\n\n" + ["", `session_id: ${sessionID}`, ""].join("\n") - - return output + return responseText + "\n\n" + ["", `session_id: ${sessionID}`, ""].join("\n") } catch (error) { spawnReservation?.rollback() throw error + } finally { + if (sessionID) { + subagentSessions.delete(sessionID) + syncSubagentSessions.delete(sessionID) + } } }