fix(tmux-subagent): retry pending pane closes to prevent zombie panes

When queryWindowState returned null during session deletion, the
session mapping was deleted but the real tmux pane stayed alive,
creating zombie panes.

- Add closePending/closeRetryCount fields to TrackedSession
- Mark sessions closePending instead of deleting on close failure
- Add retryPendingCloses() called from onSessionCreated and cleanup
- Force-remove mappings after 3 failed retry attempts
- Extract TrackedSessionState helper for field initialization

Tests: 3 pass, 9 expects
This commit is contained in:
YeonGyu-Kim
2026-03-11 20:09:56 +09:00
parent a2f030e699
commit fed720dd11
6 changed files with 437 additions and 73 deletions

View File

@@ -1,6 +1,6 @@
import type { PluginInput } from "@opencode-ai/plugin"
import type { TmuxConfig } from "../../config/schema"
import type { TrackedSession, CapacityConfig } from "./types"
import type { TrackedSession, CapacityConfig, WindowState } from "./types"
import { log, normalizeSDKResponse } from "../../shared"
import {
isInsideTmux as defaultIsInsideTmux,
@@ -13,6 +13,7 @@ import { queryWindowState } from "./pane-state-querier"
import { decideSpawnActions, decideCloseAction, type SessionMapping } from "./decision-engine"
import { executeActions, executeAction } from "./action-executor"
import { TmuxPollingManager } from "./polling-manager"
import { createTrackedSession, markTrackedSessionClosePending } from "./tracked-session-state"
type OpencodeClient = PluginInput["client"]
interface SessionCreatedEvent {
@@ -38,6 +39,7 @@ const defaultTmuxDeps: TmuxUtilDeps = {
const DEFERRED_SESSION_TTL_MS = 5 * 60 * 1000
const MAX_DEFERRED_QUEUE_SIZE = 20
const MAX_CLOSE_RETRY_COUNT = 3
/**
* State-first Tmux Session Manager
@@ -106,6 +108,118 @@ export class TmuxSessionManager {
}))
}
private removeTrackedSession(sessionId: string): void {
this.sessions.delete(sessionId)
if (this.sessions.size === 0) {
this.pollingManager.stopPolling()
}
}
private markSessionClosePending(sessionId: string): void {
const tracked = this.sessions.get(sessionId)
if (!tracked) return
this.sessions.set(sessionId, markTrackedSessionClosePending(tracked))
log("[tmux-session-manager] marked session close pending", {
sessionId,
paneId: tracked.paneId,
closeRetryCount: tracked.closeRetryCount,
})
}
private async queryWindowStateSafely(): Promise<WindowState | null> {
if (!this.sourcePaneId) return null
try {
return await queryWindowState(this.sourcePaneId)
} catch (error) {
log("[tmux-session-manager] failed to query window state for close", {
error: String(error),
})
return null
}
}
private async tryCloseTrackedSession(tracked: TrackedSession): Promise<boolean> {
const state = await this.queryWindowStateSafely()
if (!state) return false
try {
const result = await executeAction(
{ type: "close", paneId: tracked.paneId, sessionId: tracked.sessionId },
{
config: this.tmuxConfig,
serverUrl: this.serverUrl,
windowState: state,
sourcePaneId: this.sourcePaneId,
}
)
return result.success
} catch (error) {
log("[tmux-session-manager] close session pane failed", {
sessionId: tracked.sessionId,
paneId: tracked.paneId,
error: String(error),
})
return false
}
}
private async retryPendingCloses(): Promise<void> {
const pendingSessions = Array.from(this.sessions.values()).filter(
(tracked) => tracked.closePending,
)
for (const tracked of pendingSessions) {
if (!this.sessions.has(tracked.sessionId)) continue
if (tracked.closeRetryCount >= MAX_CLOSE_RETRY_COUNT) {
log("[tmux-session-manager] force removing close-pending session after max retries", {
sessionId: tracked.sessionId,
paneId: tracked.paneId,
closeRetryCount: tracked.closeRetryCount,
})
this.removeTrackedSession(tracked.sessionId)
continue
}
const closed = await this.tryCloseTrackedSession(tracked)
if (closed) {
log("[tmux-session-manager] retried close succeeded", {
sessionId: tracked.sessionId,
paneId: tracked.paneId,
closeRetryCount: tracked.closeRetryCount,
})
this.removeTrackedSession(tracked.sessionId)
continue
}
const nextRetryCount = tracked.closeRetryCount + 1
if (nextRetryCount >= MAX_CLOSE_RETRY_COUNT) {
log("[tmux-session-manager] force removing close-pending session after failed retry", {
sessionId: tracked.sessionId,
paneId: tracked.paneId,
closeRetryCount: nextRetryCount,
})
this.removeTrackedSession(tracked.sessionId)
continue
}
this.sessions.set(tracked.sessionId, {
...tracked,
closePending: true,
closeRetryCount: nextRetryCount,
})
log("[tmux-session-manager] retried close failed", {
sessionId: tracked.sessionId,
paneId: tracked.paneId,
closeRetryCount: nextRetryCount,
})
}
}
private enqueueDeferredSession(sessionId: string, title: string): void {
if (this.deferredSessions.has(sessionId)) return
if (this.deferredQueue.length >= MAX_DEFERRED_QUEUE_SIZE) {
@@ -257,14 +371,14 @@ export class TmuxSessionManager {
})
}
const now = Date.now()
this.sessions.set(sessionId, {
this.sessions.set(
sessionId,
paneId: result.spawnedPaneId,
description: deferred.title,
createdAt: new Date(now),
lastSeenAt: new Date(now),
})
createTrackedSession({
sessionId,
paneId: result.spawnedPaneId,
description: deferred.title,
}),
)
this.removeDeferredSession(sessionId)
this.pollingManager.startPolling()
log("[tmux-session-manager] deferred session attached", {
@@ -324,6 +438,13 @@ export class TmuxSessionManager {
const sessionId = info.id
const title = info.title ?? "Subagent"
if (!this.sourcePaneId) {
log("[tmux-session-manager] no source pane id")
return
}
await this.retryPendingCloses()
if (
this.sessions.has(sessionId) ||
this.pendingSessions.has(sessionId) ||
@@ -332,11 +453,6 @@ export class TmuxSessionManager {
log("[tmux-session-manager] session already tracked or pending", { sessionId })
return
}
if (!this.sourcePaneId) {
log("[tmux-session-manager] no source pane id")
return
}
const sourcePaneId = this.sourcePaneId
this.pendingSessions.add(sessionId)
@@ -418,14 +534,14 @@ export class TmuxSessionManager {
})
}
const now = Date.now()
this.sessions.set(sessionId, {
this.sessions.set(
sessionId,
paneId: result.spawnedPaneId,
description: title,
createdAt: new Date(now),
lastSeenAt: new Date(now),
})
createTrackedSession({
sessionId,
paneId: result.spawnedPaneId,
description: title,
}),
)
log("[tmux-session-manager] pane spawned and tracked", {
sessionId,
paneId: result.spawnedPaneId,
@@ -485,27 +601,40 @@ export class TmuxSessionManager {
log("[tmux-session-manager] onSessionDeleted", { sessionId: event.sessionID })
const state = await queryWindowState(this.sourcePaneId)
const state = await this.queryWindowStateSafely()
if (!state) {
this.sessions.delete(event.sessionID)
this.markSessionClosePending(event.sessionID)
return
}
const closeAction = decideCloseAction(state, event.sessionID, this.getSessionMappings())
if (closeAction) {
await executeAction(closeAction, {
if (!closeAction) {
this.removeTrackedSession(event.sessionID)
return
}
try {
const result = await executeAction(closeAction, {
config: this.tmuxConfig,
serverUrl: this.serverUrl,
windowState: state,
sourcePaneId: this.sourcePaneId,
})
if (!result.success) {
this.markSessionClosePending(event.sessionID)
return
}
} catch (error) {
log("[tmux-session-manager] failed to close pane for deleted session", {
sessionId: event.sessionID,
error: String(error),
})
this.markSessionClosePending(event.sessionID)
return
}
this.sessions.delete(event.sessionID)
if (this.sessions.size === 0) {
this.pollingManager.stopPolling()
}
this.removeTrackedSession(event.sessionID)
}
@@ -518,24 +647,13 @@ export class TmuxSessionManager {
paneId: tracked.paneId,
})
const state = this.sourcePaneId ? await queryWindowState(this.sourcePaneId) : null
if (state) {
await executeAction(
{ type: "close", paneId: tracked.paneId, sessionId },
{
config: this.tmuxConfig,
serverUrl: this.serverUrl,
windowState: state,
sourcePaneId: this.sourcePaneId,
}
)
const closed = await this.tryCloseTrackedSession(tracked)
if (!closed) {
this.markSessionClosePending(sessionId)
return
}
this.sessions.delete(sessionId)
if (this.sessions.size === 0) {
this.pollingManager.stopPolling()
}
this.removeTrackedSession(sessionId)
}
createEventHandler(): (input: { event: { type: string; properties?: unknown } }) => Promise<void> {
@@ -552,30 +670,22 @@ export class TmuxSessionManager {
if (this.sessions.size > 0) {
log("[tmux-session-manager] closing all panes", { count: this.sessions.size })
const state = this.sourcePaneId ? await queryWindowState(this.sourcePaneId) : null
if (state) {
const closePromises = Array.from(this.sessions.values()).map((s) =>
executeAction(
{ type: "close", paneId: s.paneId, sessionId: s.sessionId },
{
config: this.tmuxConfig,
serverUrl: this.serverUrl,
windowState: state,
sourcePaneId: this.sourcePaneId,
}
).catch((err) =>
log("[tmux-session-manager] cleanup error for pane", {
paneId: s.paneId,
error: String(err),
}),
),
)
await Promise.all(closePromises)
const sessionIds = Array.from(this.sessions.keys())
for (const sessionId of sessionIds) {
try {
await this.closeSessionById(sessionId)
} catch (error) {
log("[tmux-session-manager] cleanup error for pane", {
sessionId,
error: String(error),
})
}
}
this.sessions.clear()
}
await this.retryPendingCloses()
log("[tmux-session-manager] cleanup complete")
}
}

View File

@@ -12,6 +12,8 @@ describe("TmuxPollingManager overlap", () => {
description: "test",
createdAt: new Date(),
lastSeenAt: new Date(),
closePending: false,
closeRetryCount: 0,
})
let activeCalls = 0

View File

@@ -6,6 +6,7 @@ import { queryWindowState } from "./pane-state-querier"
import { decideSpawnActions, type SessionMapping } from "./decision-engine"
import { executeActions } from "./action-executor"
import type { SessionCreatedEvent } from "./session-created-event"
import { createTrackedSession } from "./tracked-session-state"
type OpencodeClient = PluginInput["client"]
@@ -152,14 +153,14 @@ export async function handleSessionCreated(
return
}
const now = Date.now()
deps.sessions.set(sessionId, {
deps.sessions.set(
sessionId,
paneId: result.spawnedPaneId,
description: title,
createdAt: new Date(now),
lastSeenAt: new Date(now),
})
createTrackedSession({
sessionId,
paneId: result.spawnedPaneId,
description: title,
}),
)
log("[tmux-session-manager] pane spawned and tracked", {
sessionId,

View File

@@ -0,0 +1,27 @@
import type { TrackedSession } from "./types"
export function createTrackedSession(params: {
sessionId: string
paneId: string
description: string
now?: Date
}): TrackedSession {
const now = params.now ?? new Date()
return {
sessionId: params.sessionId,
paneId: params.paneId,
description: params.description,
createdAt: now,
lastSeenAt: now,
closePending: false,
closeRetryCount: 0,
}
}
export function markTrackedSessionClosePending(tracked: TrackedSession): TrackedSession {
return {
...tracked,
closePending: true,
}
}

View File

@@ -4,6 +4,8 @@ export interface TrackedSession {
description: string
createdAt: Date
lastSeenAt: Date
closePending: boolean
closeRetryCount: number
// Stability detection fields (prevents premature closure)
lastMessageCount?: number
stableIdlePolls?: number

View File

@@ -0,0 +1,222 @@
import { beforeEach, describe, expect, mock, test } from "bun:test"
import type { TmuxConfig } from "../../config/schema"
import type { ActionResult, ExecuteContext, ExecuteActionsResult } from "./action-executor"
import type { TmuxUtilDeps } from "./manager"
import type { TrackedSession, WindowState } from "./types"
const mockQueryWindowState = mock<(paneId: string) => Promise<WindowState | null>>(async () => ({
windowWidth: 220,
windowHeight: 44,
mainPane: { paneId: "%0", width: 110, height: 44, left: 0, top: 0, title: "main", isActive: true },
agentPanes: [],
}))
const mockExecuteAction = mock<(
action: { type: string },
ctx: ExecuteContext,
) => Promise<ActionResult>>(async () => ({ success: true }))
const mockExecuteActions = mock<(
actions: unknown[],
ctx: ExecuteContext,
) => Promise<ExecuteActionsResult>>(async () => ({
success: true,
spawnedPaneId: "%1",
results: [],
}))
const mockIsInsideTmux = mock<() => boolean>(() => true)
const mockGetCurrentPaneId = mock<() => string | undefined>(() => "%0")
mock.module("./pane-state-querier", () => ({
queryWindowState: mockQueryWindowState,
}))
mock.module("./action-executor", () => ({
executeAction: mockExecuteAction,
executeActions: mockExecuteActions,
}))
mock.module("../../shared/tmux", () => ({
isInsideTmux: mockIsInsideTmux,
getCurrentPaneId: mockGetCurrentPaneId,
POLL_INTERVAL_BACKGROUND_MS: 10,
SESSION_READY_POLL_INTERVAL_MS: 10,
SESSION_READY_TIMEOUT_MS: 50,
SESSION_MISSING_GRACE_MS: 1_000,
}))
const mockTmuxDeps: TmuxUtilDeps = {
isInsideTmux: mockIsInsideTmux,
getCurrentPaneId: mockGetCurrentPaneId,
}
function createConfig(): TmuxConfig {
return {
enabled: true,
layout: "main-vertical",
main_pane_size: 60,
main_pane_min_width: 80,
agent_pane_min_width: 40,
}
}
function createContext() {
const shell = Object.assign(
() => {
throw new Error("shell should not be called in this test")
},
{
braces: () => [],
escape: (input: string) => input,
env() {
return shell
},
cwd() {
return shell
},
nothrow() {
return shell
},
throws() {
return shell
},
},
)
return {
project: {
id: "project-id",
worktree: "/tmp/omo-fix-memory-leaks",
time: { created: Date.now() },
},
directory: "/tmp/omo-fix-memory-leaks",
worktree: "/tmp/omo-fix-memory-leaks",
serverUrl: new URL("http://localhost:4096"),
$: shell,
client: {
session: {
status: mock(async () => ({ data: {} })),
messages: mock(async () => ({ data: [] })),
},
},
}
}
function createTrackedSession(overrides?: Partial<TrackedSession>): TrackedSession {
return {
sessionId: "ses_pending",
paneId: "%1",
description: "Pending pane",
createdAt: new Date(),
lastSeenAt: new Date(),
closePending: false,
closeRetryCount: 0,
...overrides,
}
}
function getTrackedSessions(target: object): Map<string, TrackedSession> {
const sessions = Reflect.get(target, "sessions")
if (!(sessions instanceof Map)) {
throw new Error("Expected sessions map")
}
return sessions
}
function getRetryPendingCloses(target: object): () => Promise<void> {
const retryPendingCloses = Reflect.get(target, "retryPendingCloses")
if (typeof retryPendingCloses !== "function") {
throw new Error("Expected retryPendingCloses method")
}
return retryPendingCloses.bind(target)
}
function createManager(
TmuxSessionManager: typeof import("./manager").TmuxSessionManager,
): import("./manager").TmuxSessionManager {
return Reflect.construct(TmuxSessionManager, [createContext(), createConfig(), mockTmuxDeps])
}
describe("TmuxSessionManager zombie pane handling", () => {
beforeEach(() => {
mockQueryWindowState.mockClear()
mockExecuteAction.mockClear()
mockExecuteActions.mockClear()
mockIsInsideTmux.mockClear()
mockGetCurrentPaneId.mockClear()
mockQueryWindowState.mockImplementation(async () => ({
windowWidth: 220,
windowHeight: 44,
mainPane: { paneId: "%0", width: 110, height: 44, left: 0, top: 0, title: "main", isActive: true },
agentPanes: [],
}))
mockExecuteAction.mockImplementation(async () => ({ success: true }))
mockExecuteActions.mockImplementation(async () => ({
success: true,
spawnedPaneId: "%1",
results: [],
}))
mockIsInsideTmux.mockReturnValue(true)
mockGetCurrentPaneId.mockReturnValue("%0")
})
test("#given session in sessions Map #when onSessionDeleted called with null window state #then session stays in Map with closePending true", async () => {
// given
mockQueryWindowState.mockImplementation(async () => null)
const { TmuxSessionManager } = await import("./manager")
const manager = createManager(TmuxSessionManager)
const sessions = getTrackedSessions(manager)
sessions.set("ses_pending", createTrackedSession())
// when
await manager.onSessionDeleted({ sessionID: "ses_pending" })
// then
const tracked = sessions.get("ses_pending")
expect(tracked).toBeDefined()
expect(tracked?.closePending).toBe(true)
expect(tracked?.closeRetryCount).toBe(0)
expect(mockExecuteAction).not.toHaveBeenCalled()
})
test("#given session with closePending true #when retryPendingCloses succeeds #then session is removed from Map", async () => {
// given
const { TmuxSessionManager } = await import("./manager")
const manager = createManager(TmuxSessionManager)
const sessions = getTrackedSessions(manager)
sessions.set(
"ses_pending",
createTrackedSession({ closePending: true, closeRetryCount: 0 }),
)
// when
await getRetryPendingCloses(manager)()
// then
expect(sessions.has("ses_pending")).toBe(false)
expect(mockExecuteAction).toHaveBeenCalledTimes(1)
})
test("#given session with closePending true and closeRetryCount >= 3 #when retryPendingCloses called #then session is force-removed from Map", async () => {
// given
const { TmuxSessionManager } = await import("./manager")
const manager = createManager(TmuxSessionManager)
const sessions = getTrackedSessions(manager)
sessions.set(
"ses_pending",
createTrackedSession({ closePending: true, closeRetryCount: 3 }),
)
// when
await getRetryPendingCloses(manager)()
// then
expect(sessions.has("ses_pending")).toBe(false)
expect(mockQueryWindowState).not.toHaveBeenCalled()
expect(mockExecuteAction).not.toHaveBeenCalled()
})
})