fix: follow up cmux runtime and scheduler edge cases

This commit is contained in:
Kenny
2026-03-29 20:07:23 +08:00
parent 6ffadaaa51
commit 2b02e2c1a0
5 changed files with 253 additions and 24 deletions

View File

@@ -0,0 +1,88 @@
import { afterEach, describe, expect, jest, test } from "bun:test"
import type { PluginInput } from "@opencode-ai/plugin"
import { createIdleNotificationScheduler } from "./session-notification-scheduler"
async function flushMicrotasks(): Promise<void> {
await Promise.resolve()
await Promise.resolve()
}
function createDeferred<T>() {
let resolvePromise: (value: T | PromiseLike<T>) => void = () => {}
const promise = new Promise<T>((resolve) => {
resolvePromise = resolve
})
return {
promise,
resolve: resolvePromise,
}
}
describe("session-notification-scheduler", () => {
afterEach(() => {
jest.clearAllTimers()
jest.useRealTimers()
})
test("does not resend when notification version entry is evicted during delivery", async () => {
jest.useFakeTimers()
const firstSendGate = createDeferred<void>()
let firstSendStarted = false
const sendCalls: string[] = []
const scheduler = createIdleNotificationScheduler({
ctx: {} as PluginInput,
platform: "darwin",
config: {
playSound: false,
soundPath: "",
idleConfirmationDelay: 10,
skipIfIncompleteTodos: false,
maxTrackedSessions: 1,
activityGracePeriodMs: 0,
},
hasIncompleteTodos: async () => false,
send: async (_ctx, _platform, sessionID) => {
sendCalls.push(sessionID)
if (sessionID !== "session-a") {
return true
}
firstSendStarted = true
await firstSendGate.promise
return true
},
playSound: async () => {},
})
scheduler.scheduleIdleNotification("session-a")
jest.advanceTimersByTime(10)
await flushMicrotasks()
expect(sendCalls).toEqual(["session-a"])
scheduler.scheduleIdleNotification("session-b")
jest.advanceTimersByTime(10)
await flushMicrotasks()
expect(sendCalls).toEqual(["session-a", "session-b"])
if (!firstSendStarted) {
throw new Error("Expected the first send call to be in-flight")
}
firstSendGate.resolve()
await flushMicrotasks()
scheduler.scheduleIdleNotification("session-a")
jest.advanceTimersByTime(10)
await flushMicrotasks()
const sessionASendCount = sendCalls.filter(id => id === "session-a").length
expect(sessionASendCount).toBe(1)
})
})

View File

@@ -48,12 +48,6 @@ export function createIdleNotificationScheduler(options: {
notificationVersions.delete(id)
})
}
if (executingNotifications.size > maxSessions) {
const sessionsToRemove = Array.from(executingNotifications).slice(0, executingNotifications.size - maxSessions)
sessionsToRemove.forEach((id) => {
executingNotifications.delete(id)
})
}
if (scheduledAt.size > maxSessions) {
const sessionsToRemove = Array.from(scheduledAt.keys()).slice(0, scheduledAt.size - maxSessions)
sessionsToRemove.forEach((id) => {
@@ -89,6 +83,15 @@ export function createIdleNotificationScheduler(options: {
}
}
function hasStaleNotificationVersion(sessionID: string, version: number): boolean {
const latestVersion = notificationVersions.get(sessionID)
if (latestVersion === undefined) {
return !executingNotifications.has(sessionID)
}
return latestVersion !== version
}
async function executeNotification(sessionID: string, version: number): Promise<void> {
if (executingNotifications.has(sessionID)) {
pendingTimers.delete(sessionID)
@@ -96,7 +99,7 @@ export function createIdleNotificationScheduler(options: {
return
}
if (notificationVersions.get(sessionID) !== version) {
if (hasStaleNotificationVersion(sessionID, version)) {
pendingTimers.delete(sessionID)
scheduledAt.delete(sessionID)
return
@@ -119,13 +122,13 @@ export function createIdleNotificationScheduler(options: {
try {
if (options.config.skipIfIncompleteTodos) {
const hasPendingWork = await options.hasIncompleteTodos(options.ctx, sessionID)
if (notificationVersions.get(sessionID) !== version) {
if (hasStaleNotificationVersion(sessionID, version)) {
return
}
if (hasPendingWork) return
}
if (notificationVersions.get(sessionID) !== version) {
if (hasStaleNotificationVersion(sessionID, version)) {
return
}
@@ -139,7 +142,7 @@ export function createIdleNotificationScheduler(options: {
return
}
if (notificationVersions.get(sessionID) !== version) {
if (hasStaleNotificationVersion(sessionID, version)) {
return
}

View File

@@ -6,6 +6,7 @@ import {
type ResolvedMultiplexer,
} from "./multiplexer-runtime"
import {
probeCmuxReachability,
resetMultiplexerPathCacheForTesting,
type CmuxRuntimeProbe,
type TmuxRuntimeProbe,
@@ -170,7 +171,14 @@ describe("multiplexer runtime resolution", () => {
expect(runtime.notificationBackend).toBe("desktop")
})
test("treats relay endpoint addresses as valid cmux socket targets", () => {
test("treats relay endpoint addresses as valid cmux socket targets", async () => {
const derivedProbe = await probeCmuxReachability({
environment: {
CMUX_SOCKET_PATH: "127.0.0.1:7777",
OH_MY_OPENCODE_DISABLE_CMUX: "1",
},
})
const runtime = resolveRuntime({
environment: {
TMUX: undefined,
@@ -178,16 +186,25 @@ describe("multiplexer runtime resolution", () => {
CMUX_SOCKET_PATH: "127.0.0.1:7777",
},
cmuxProbe: {
endpointType: "relay",
socketPath: "127.0.0.1:7777",
endpointType: derivedProbe.endpointType,
socketPath: derivedProbe.socketPath,
},
})
expect(derivedProbe.endpointType).toBe("relay")
expect(runtime.mode).toBe("cmux-notify-only")
expect(runtime.cmux.endpointType).toBe("relay")
})
test("keeps weak ghostty hint as non-authoritative on non-mac platforms", () => {
test("keeps weak ghostty hint as non-authoritative on non-mac platforms", async () => {
const derivedProbe = await probeCmuxReachability({
environment: {
TERM_PROGRAM: "ghostty",
CMUX_SOCKET_PATH: undefined,
OH_MY_OPENCODE_DISABLE_CMUX: "1",
},
})
const runtime = resolveRuntime({
platform: "linux",
environment: {
@@ -204,14 +221,15 @@ describe("multiplexer runtime resolution", () => {
cmuxProbe: {
reachable: false,
path: "/usr/local/bin/cmux",
socketPath: undefined,
endpointType: "missing",
hintStrength: "weak",
socketPath: derivedProbe.socketPath,
endpointType: derivedProbe.endpointType,
hintStrength: derivedProbe.hintStrength,
notifyCapable: false,
failureKind: "missing-socket",
},
})
expect(derivedProbe.hintStrength).toBe("weak")
expect(runtime.mode).toBe("none")
expect(runtime.cmux.hintStrength).toBe("weak")
expect(runtime.platform).toBe("linux")

View File

@@ -41,6 +41,33 @@ function createTmuxEnabledRuntime(): ResolvedMultiplexer {
}
}
function createPaneUnavailableRuntime(): ResolvedMultiplexer {
return {
platform: process.platform,
mode: "cmux-notify-only",
paneBackend: "none",
notificationBackend: "cmux",
tmux: {
path: "/usr/bin/tmux",
reachable: false,
insideEnvironment: false,
paneId: undefined,
explicitDisable: false,
},
cmux: {
path: "/usr/local/bin/cmux",
reachable: true,
notifyCapable: true,
socketPath: "/tmp/cmux.sock",
endpointType: "unix",
workspaceId: "workspace-1",
surfaceId: "surface-1",
hintStrength: "strong",
explicitDisable: false,
},
}
}
describe("interactive_bash runtime resolution", () => {
afterEach(() => {
resetResolvedMultiplexerRuntimeForTesting()
@@ -76,4 +103,60 @@ describe("interactive_bash runtime resolution", () => {
getTmuxPathSpy.mockRestore()
}
})
test("allows detached new-session commands when pane control is unavailable", async () => {
const getTmuxPathSpy = spyOn(tmuxPathResolver, "getTmuxPath").mockResolvedValue(null)
try {
const tool = createInteractiveBashTool(createPaneUnavailableRuntime())
const result = await tool.execute(
{ tmux_command: "new-session -d -s omo-dev" },
mockToolContext,
)
expect(result).toBe("Error: tmux executable is not reachable")
expect(getTmuxPathSpy).toHaveBeenCalledTimes(1)
} finally {
getTmuxPathSpy.mockRestore()
}
})
test("allows targeted tmux commands when pane control is unavailable", async () => {
const getTmuxPathSpy = spyOn(tmuxPathResolver, "getTmuxPath").mockResolvedValue(null)
try {
const tool = createInteractiveBashTool(createPaneUnavailableRuntime())
const result = await tool.execute(
{ tmux_command: "send-keys -t omo-dev \"vim\" Enter" },
mockToolContext,
)
expect(result).toBe("Error: tmux executable is not reachable")
expect(getTmuxPathSpy).toHaveBeenCalledTimes(1)
} finally {
getTmuxPathSpy.mockRestore()
}
})
test("blocks untargeted pane-control commands when pane backend is unavailable", async () => {
const getTmuxPathSpy = spyOn(tmuxPathResolver, "getTmuxPath").mockResolvedValue(null)
try {
const tool = createInteractiveBashTool(createPaneUnavailableRuntime())
const result = await tool.execute(
{ tmux_command: "send-keys \"vim\" Enter" },
mockToolContext,
)
expect(result).toBe(
"Error: interactive_bash is TMUX-only and pane control is unavailable in 'cmux-notify-only' runtime.",
)
expect(getTmuxPathSpy).toHaveBeenCalledTimes(0)
} finally {
getTmuxPathSpy.mockRestore()
}
})
})

View File

@@ -53,6 +53,43 @@ export function tokenizeCommand(cmd: string): string[] {
return tokens
}
function hasTmuxTargetFlag(tokens: string[]): boolean {
return tokens.some((token, index) => {
if (token === "-t") {
return typeof tokens[index + 1] === "string" && tokens[index + 1].length > 0
}
return token.startsWith("-t") && token.length > 2
})
}
function hasDetachedFlag(tokens: string[]): boolean {
return tokens.some((token) => {
if (token === "-d") {
return true
}
if (!token.startsWith("-") || token.startsWith("--")) {
return false
}
return token.slice(1).includes("d")
})
}
function canRunWithoutPaneControl(tokens: string[]): boolean {
const subcommand = tokens[0]?.toLowerCase()
if (!subcommand) {
return false
}
const isDetachedNewSession =
(subcommand === "new-session" || subcommand === "new")
&& hasDetachedFlag(tokens)
return isDetachedNewSession || hasTmuxTargetFlag(tokens)
}
export function createInteractiveBashTool(
runtime?: ResolvedMultiplexer,
): ToolDefinition {
@@ -68,7 +105,13 @@ export function createInteractiveBashTool(
?? getResolvedMultiplexerRuntime()
?? createDisabledMultiplexerRuntime()
if (resolvedRuntime.paneBackend !== "tmux") {
const parts = tokenizeCommand(args.tmux_command)
if (parts.length === 0) {
return "Error: Empty tmux command"
}
if (resolvedRuntime.paneBackend !== "tmux" && !canRunWithoutPaneControl(parts)) {
return `Error: interactive_bash is TMUX-only and pane control is unavailable in '${resolvedRuntime.mode}' runtime.`
}
@@ -77,12 +120,6 @@ export function createInteractiveBashTool(
return "Error: tmux executable is not reachable"
}
const parts = tokenizeCommand(args.tmux_command)
if (parts.length === 0) {
return "Error: Empty tmux command"
}
const subcommand = parts[0].toLowerCase()
if (BLOCKED_TMUX_SUBCOMMANDS.includes(subcommand)) {
const sessionIdx = parts.findIndex(p => p === "-t" || p.startsWith("-t"))