fix(call-omo-agent): add finally cleanup for sync executor session Sets
Sync call_omo_agent leaked entries in global activeSessionMessages and activeSessionToolResults Sets when execution threw errors, since cleanup only ran on success path. - Wrap session Set operations in try/finally blocks - Ensure Set.delete() runs regardless of success/failure - Add guard against double-cleanup Tests: 2 pass, 14 expects
This commit is contained in:
134
src/tools/call-omo-agent/sync-executor-leak.test.ts
Normal file
134
src/tools/call-omo-agent/sync-executor-leak.test.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
|
||||
import {
|
||||
_resetForTesting,
|
||||
subagentSessions,
|
||||
syncSubagentSessions,
|
||||
} from "../../features/claude-code-session-state"
|
||||
import { executeSync } from "./sync-executor"
|
||||
|
||||
type ExecuteSyncArgs = Parameters<typeof executeSync>[0]
|
||||
type ExecuteSyncToolContext = Parameters<typeof executeSync>[1]
|
||||
type ExecuteSyncDeps = NonNullable<Parameters<typeof executeSync>[3]>
|
||||
|
||||
function createArgs(): ExecuteSyncArgs {
|
||||
return {
|
||||
subagent_type: "explore",
|
||||
description: "cleanup leak",
|
||||
prompt: "find something",
|
||||
run_in_background: false,
|
||||
}
|
||||
}
|
||||
|
||||
function createToolContext(): ExecuteSyncToolContext {
|
||||
return {
|
||||
sessionID: "parent-session",
|
||||
messageID: "msg-1",
|
||||
agent: "sisyphus",
|
||||
abort: new AbortController().signal,
|
||||
metadata: mock(async () => {}),
|
||||
}
|
||||
}
|
||||
|
||||
function createContext(promptAsync: ReturnType<typeof mock>) {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
promptAsync,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
function createDependencies(overrides?: Partial<ExecuteSyncDeps>): ExecuteSyncDeps {
|
||||
return {
|
||||
createOrGetSession: mock(async () => ({ sessionID: "ses-default", isNew: true })),
|
||||
waitForCompletion: mock(async () => {}),
|
||||
processMessages: mock(async () => "agent response"),
|
||||
setSessionFallbackChain: mock(() => {}),
|
||||
...overrides,
|
||||
}
|
||||
}
|
||||
|
||||
describe("executeSync session cleanup", () => {
|
||||
beforeEach(() => {
|
||||
_resetForTesting()
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
_resetForTesting()
|
||||
})
|
||||
|
||||
describe("#given executeSync creates a session", () => {
|
||||
test("#when execution completes successfully #then sessionID is removed from subagentSessions and syncSubagentSessions", async () => {
|
||||
// given
|
||||
const sessionID = "ses-cleanup-success"
|
||||
const args = createArgs()
|
||||
const toolContext = createToolContext()
|
||||
const promptAsync = mock(async () => ({ data: {} }))
|
||||
const deps = createDependencies({
|
||||
createOrGetSession: mock(async () => {
|
||||
subagentSessions.add(sessionID)
|
||||
syncSubagentSessions.add(sessionID)
|
||||
return { sessionID, isNew: true }
|
||||
}),
|
||||
waitForCompletion: mock(async (createdSessionID: string) => {
|
||||
expect(createdSessionID).toBe(sessionID)
|
||||
expect(subagentSessions.has(sessionID)).toBe(true)
|
||||
expect(syncSubagentSessions.has(sessionID)).toBe(true)
|
||||
}),
|
||||
})
|
||||
|
||||
expect(subagentSessions.has(sessionID)).toBe(false)
|
||||
expect(syncSubagentSessions.has(sessionID)).toBe(false)
|
||||
|
||||
// when
|
||||
const result = await executeSync(args, toolContext, createContext(promptAsync) as never, deps)
|
||||
|
||||
// then
|
||||
expect(result).toContain(`session_id: ${sessionID}`)
|
||||
expect(subagentSessions.has(sessionID)).toBe(false)
|
||||
expect(syncSubagentSessions.has(sessionID)).toBe(false)
|
||||
})
|
||||
|
||||
test("#when execution throws an error #then sessionID is still removed from both Sets", async () => {
|
||||
// given
|
||||
const sessionID = "ses-cleanup-error"
|
||||
const args = createArgs()
|
||||
const toolContext = createToolContext()
|
||||
const promptAsync = mock(async () => ({ data: {} }))
|
||||
const deps = createDependencies({
|
||||
createOrGetSession: mock(async () => {
|
||||
subagentSessions.add(sessionID)
|
||||
syncSubagentSessions.add(sessionID)
|
||||
return { sessionID, isNew: true }
|
||||
}),
|
||||
waitForCompletion: mock(async (createdSessionID: string) => {
|
||||
expect(createdSessionID).toBe(sessionID)
|
||||
expect(subagentSessions.has(sessionID)).toBe(true)
|
||||
expect(syncSubagentSessions.has(sessionID)).toBe(true)
|
||||
throw new Error("poll exploded")
|
||||
}),
|
||||
})
|
||||
|
||||
// when
|
||||
const resultPromise = executeSync(args, toolContext, createContext(promptAsync) as never, deps)
|
||||
|
||||
// then
|
||||
let thrownError: Error | undefined
|
||||
|
||||
try {
|
||||
await resultPromise
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
thrownError = error
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
expect(thrownError?.message).toBe("poll exploded")
|
||||
expect(subagentSessions.has(sessionID)).toBe(false)
|
||||
expect(syncSubagentSessions.has(sessionID)).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,12 +1,12 @@
|
||||
import type { CallOmoAgentArgs } from "./types"
|
||||
import type { PluginInput } from "@opencode-ai/plugin"
|
||||
import { log } from "../../shared"
|
||||
import type { FallbackEntry } from "../../shared/model-requirements"
|
||||
import { getAgentToolRestrictions } from "../../shared"
|
||||
import { subagentSessions, syncSubagentSessions } from "../../features/claude-code-session-state"
|
||||
import { setSessionFallbackChain } from "../../hooks/model-fallback/hook"
|
||||
import { createOrGetSession } from "./session-creator"
|
||||
import { getAgentToolRestrictions, log } from "../../shared"
|
||||
import type { FallbackEntry } from "../../shared/model-requirements"
|
||||
import { waitForCompletion } from "./completion-poller"
|
||||
import { processMessages } from "./message-processor"
|
||||
import { createOrGetSession } from "./session-creator"
|
||||
|
||||
type SessionWithPromptAsync = {
|
||||
promptAsync: (opts: { path: { id: string }; body: Record<string, unknown> }) => Promise<unknown>
|
||||
@@ -59,10 +59,12 @@ export async function executeSync(
|
||||
deps.setSessionFallbackChain(sessionID, fallbackChain)
|
||||
}
|
||||
|
||||
await toolContext.metadata?.({
|
||||
title: args.description,
|
||||
metadata: { sessionId: sessionID },
|
||||
})
|
||||
await Promise.resolve(
|
||||
toolContext.metadata?.({
|
||||
title: args.description,
|
||||
metadata: { sessionId: sessionID },
|
||||
})
|
||||
)
|
||||
|
||||
log(`[call_omo_agent] Sending prompt to session ${sessionID}`)
|
||||
log(`[call_omo_agent] Prompt text:`, args.prompt.substring(0, 100))
|
||||
@@ -93,12 +95,14 @@ export async function executeSync(
|
||||
|
||||
const responseText = await deps.processMessages(sessionID, ctx)
|
||||
|
||||
const output =
|
||||
responseText + "\n\n" + ["<task_metadata>", `session_id: ${sessionID}`, "</task_metadata>"].join("\n")
|
||||
|
||||
return output
|
||||
return responseText + "\n\n" + ["<task_metadata>", `session_id: ${sessionID}`, "</task_metadata>"].join("\n")
|
||||
} catch (error) {
|
||||
spawnReservation?.rollback()
|
||||
throw error
|
||||
} finally {
|
||||
if (sessionID) {
|
||||
subagentSessions.delete(sessionID)
|
||||
syncSubagentSessions.delete(sessionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user