diff --git a/.gitignore b/.gitignore index 06aa8147b..2eb885215 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ test-injection/ notepad.md oauth-success.html *.bun-build +.omx/ diff --git a/src/config/schema/oh-my-opencode-config.ts b/src/config/schema/oh-my-opencode-config.ts index 9f4d70c99..ea7e479c5 100644 --- a/src/config/schema/oh-my-opencode-config.ts +++ b/src/config/schema/oh-my-opencode-config.ts @@ -12,6 +12,7 @@ import { BuiltinCommandNameSchema } from "./commands" import { ExperimentalConfigSchema } from "./experimental" import { GitMasterConfigSchema } from "./git-master" import { NotificationConfigSchema } from "./notification" +import { OpenClawConfigSchema } from "./openclaw" import { RalphLoopConfigSchema } from "./ralph-loop" import { RuntimeFallbackConfigSchema } from "./runtime-fallback" import { SkillsConfigSchema } from "./skills" @@ -55,6 +56,7 @@ export const OhMyOpenCodeConfigSchema = z.object({ runtime_fallback: z.union([z.boolean(), RuntimeFallbackConfigSchema]).optional(), background_task: BackgroundTaskConfigSchema.optional(), notification: NotificationConfigSchema.optional(), + openclaw: OpenClawConfigSchema.optional(), babysitting: BabysittingConfigSchema.optional(), git_master: GitMasterConfigSchema.optional(), browser_automation_engine: BrowserAutomationConfigSchema.optional(), diff --git a/src/config/schema/openclaw.ts b/src/config/schema/openclaw.ts new file mode 100644 index 000000000..a768728c8 --- /dev/null +++ b/src/config/schema/openclaw.ts @@ -0,0 +1,50 @@ +import { z } from "zod" + +export const OpenClawGatewaySchema = z.object({ + type: z.enum(["http", "command"]).default("http"), + // HTTP specific + url: z.string().optional(), + method: z.string().default("POST"), + headers: z.record(z.string(), z.string()).optional(), + // Command specific + command: z.string().optional(), + // Shared + timeout: z.number().optional(), +}) + +export const OpenClawHookSchema = z.object({ + enabled: z.boolean().default(true), + gateway: z.string(), + instruction: z.string(), +}) + +export const OpenClawReplyListenerConfigSchema = z.object({ + discordBotToken: z.string().optional(), + discordChannelId: z.string().optional(), + discordMention: z.string().optional(), // For allowed_mentions + authorizedDiscordUserIds: z.array(z.string()).default([]), + + telegramBotToken: z.string().optional(), + telegramChatId: z.string().optional(), + + pollIntervalMs: z.number().default(3000), + rateLimitPerMinute: z.number().default(10), + maxMessageLength: z.number().default(500), + includePrefix: z.boolean().default(true), +}) + +export const OpenClawConfigSchema = z.object({ + enabled: z.boolean().default(false), + + // Outbound Configuration + gateways: z.record(z.string(), OpenClawGatewaySchema).default({}), + hooks: z.record(z.string(), OpenClawHookSchema).default({}), + + // Inbound Configuration (Reply Listener) + replyListener: OpenClawReplyListenerConfigSchema.optional(), +}) + +export type OpenClawConfig = z.infer +export type OpenClawGateway = z.infer +export type OpenClawHook = z.infer +export type OpenClawReplyListenerConfig = z.infer diff --git a/src/hooks/openclaw.test.ts b/src/hooks/openclaw.test.ts new file mode 100644 index 000000000..db3b69a91 --- /dev/null +++ b/src/hooks/openclaw.test.ts @@ -0,0 +1,82 @@ +import { beforeEach, describe, expect, mock, test } from "bun:test" + +const wakeOpenClawMock = mock(async () => null) + +mock.module("../openclaw", () => ({ + wakeOpenClaw: wakeOpenClawMock, +})) + +describe("createOpenClawHook", () => { + beforeEach(() => { + wakeOpenClawMock.mockClear() + }) + + test("maps session.created to session-start", async () => { + const { createOpenClawHook } = await import("./openclaw") + const hook = createOpenClawHook( + { directory: "/tmp/project" } as any, + { openclaw: { enabled: true } } as any, + ) + + await hook?.event?.({ + event: { + type: "session.created", + properties: { sessionID: "session-1" }, + }, + }) + + expect(wakeOpenClawMock).toHaveBeenCalledWith( + expect.anything(), + "session-start", + expect.objectContaining({ + projectPath: "/tmp/project", + sessionId: "session-1", + }), + ) + }) + + test("uses tool.execute.before for question tools", async () => { + const { createOpenClawHook } = await import("./openclaw") + const hook = createOpenClawHook( + { directory: "/tmp/project" } as any, + { openclaw: { enabled: true } } as any, + ) + + await hook?.["tool.execute.before"]?.( + { tool: "ask_user_question", sessionID: "session-2" }, + { args: { questions: [{ question: "Need approval?", options: [{ label: "Yes" }] }] } }, + ) + + expect(wakeOpenClawMock).toHaveBeenCalledWith( + expect.anything(), + "ask-user-question", + expect.objectContaining({ + projectPath: "/tmp/project", + question: "Need approval?", + sessionId: "session-2", + }), + ) + }) + + test("falls back to args.question string when questions array absent", async () => { + const { createOpenClawHook } = await import("./openclaw") + const hook = createOpenClawHook( + { directory: "/tmp/project" } as any, + { openclaw: { enabled: true } } as any, + ) + + await hook?.["tool.execute.before"]?.( + { tool: "question", sessionID: "session-3" }, + { args: { question: "Fallback?" } }, + ) + + expect(wakeOpenClawMock).toHaveBeenCalledWith( + expect.anything(), + "ask-user-question", + expect.objectContaining({ + question: "Fallback?", + sessionId: "session-3", + }), + ) + }) +}) diff --git a/src/hooks/openclaw.ts b/src/hooks/openclaw.ts new file mode 100644 index 000000000..00bce217e --- /dev/null +++ b/src/hooks/openclaw.ts @@ -0,0 +1,66 @@ +import type { PluginContext } from "../plugin/types" +import type { OhMyOpenCodeConfig } from "../config" +import { wakeOpenClaw } from "../openclaw" +import type { OpenClawContext } from "../openclaw/types" + +export function createOpenClawHook( + ctx: PluginContext, + pluginConfig: OhMyOpenCodeConfig, +) { + const config = pluginConfig.openclaw + if (!config?.enabled) return null + + const handleWake = async (event: string, context: OpenClawContext) => { + await wakeOpenClaw(config, event, context) + } + + return { + event: async (input: any) => { + const { event } = input + const props = event.properties || {} + const sessionID = props.sessionID || props.info?.id + + const context: OpenClawContext = { + sessionId: sessionID, + projectPath: ctx.directory, + } + + if (event.type === "session.created") { + await handleWake("session-start", context) + } else if (event.type === "session.deleted") { + await handleWake("session-end", context) + } else if (event.type === "session.idle") { + // Check if we are waiting for user input (ask-user-question) + // This is heuristic. If the last message was from assistant and ended with a question? + // Or if the system is idle. + await handleWake("session-idle", context) + } + }, + + "tool.execute.before": async ( + input: { tool: string; sessionID: string }, + output: { args: Record }, + ) => { + const normalizedToolName = input.tool.toLowerCase() + if ( + normalizedToolName !== "question" + && normalizedToolName !== "ask_user_question" + && normalizedToolName !== "askuserquestion" + ) { + return + } + + // question tool uses args.questions array, not args.question + const questions = Array.isArray(output.args.questions) ? output.args.questions : [] + const question = questions.length > 0 && typeof questions[0]?.question === "string" + ? questions[0].question + : typeof output.args.question === "string" ? output.args.question : undefined + const context: OpenClawContext = { + sessionId: input.sessionID, + projectPath: ctx.directory, + question, + } + await handleWake("ask-user-question", context) + }, + } +} diff --git a/src/openclaw/__tests__/config.test.ts b/src/openclaw/__tests__/config.test.ts new file mode 100644 index 000000000..62972f45a --- /dev/null +++ b/src/openclaw/__tests__/config.test.ts @@ -0,0 +1,115 @@ +import { describe, expect, test } from "bun:test" +import { resolveGateway, validateGatewayUrl, normalizeReplyListenerConfig } from "../config" +import type { OpenClawConfig } from "../types" +import { OpenClawConfigSchema } from "../../config/schema/openclaw" + +describe("OpenClaw Config", () => { + test("resolveGateway resolves HTTP gateway", () => { + const config: OpenClawConfig = { + enabled: true, + gateways: { + discord: { + type: "http", + url: "https://discord.com/api/webhooks/123", + }, + }, + hooks: { + "session-start": { + enabled: true, + gateway: "discord", + instruction: "Started session {{sessionId}}", + }, + }, + } as any + + const resolved = resolveGateway(config, "session-start") + expect(resolved).not.toBeNull() + expect(resolved?.gatewayName).toBe("discord") + expect(resolved?.gateway.url).toBe("https://discord.com/api/webhooks/123") + expect(resolved?.instruction).toBe("Started session {{sessionId}}") + }) + + test("resolveGateway returns null for disabled config", () => { + const config: OpenClawConfig = { + enabled: false, + gateways: {}, + hooks: {}, + } as any + expect(resolveGateway(config, "session-start")).toBeNull() + }) + + test("resolveGateway returns null for unknown hook", () => { + const config: OpenClawConfig = { + enabled: true, + gateways: {}, + hooks: {}, + } as any + expect(resolveGateway(config, "unknown")).toBeNull() + }) + + test("resolveGateway returns null for disabled hook", () => { + const config: OpenClawConfig = { + enabled: true, + gateways: { g: { type: "http", url: "https://example.com" } }, + hooks: { + event: { enabled: false, gateway: "g", instruction: "i" }, + }, + } as any + expect(resolveGateway(config, "event")).toBeNull() + }) + + test("validateGatewayUrl allows HTTPS", () => { + expect(validateGatewayUrl("https://example.com")).toBe(true) + }) + + test("validateGatewayUrl rejects HTTP remote", () => { + expect(validateGatewayUrl("http://example.com")).toBe(false) + }) + + test("validateGatewayUrl allows HTTP localhost", () => { + expect(validateGatewayUrl("http://localhost:3000")).toBe(true) + expect(validateGatewayUrl("http://127.0.0.1:3000")).toBe(true) + }) + + test("normalizeReplyListenerConfig normalizes nested reply listener fields", () => { + const config = normalizeReplyListenerConfig({ + enabled: true, + gateways: {}, + hooks: {}, + replyListener: { + discordBotToken: "discord-token", + discordChannelId: "channel-id", + authorizedDiscordUserIds: ["user-1", "", "user-2"], + pollIntervalMs: 100, + rateLimitPerMinute: 0, + maxMessageLength: 9000, + includePrefix: false, + }, + } as OpenClawConfig) + + expect(config.replyListener).toEqual({ + discordBotToken: "discord-token", + discordChannelId: "channel-id", + authorizedDiscordUserIds: ["user-1", "user-2"], + pollIntervalMs: 500, + rateLimitPerMinute: 1, + maxMessageLength: 4000, + includePrefix: false, + }) + }) + + test("gateway timeout remains optional so env fallback can apply", () => { + const parsed = OpenClawConfigSchema.parse({ + enabled: true, + gateways: { + command: { + type: "command", + command: "echo hi", + }, + }, + hooks: {}, + }) + + expect(parsed.gateways.command.timeout).toBeUndefined() + }) +}) diff --git a/src/openclaw/__tests__/dispatcher.test.ts b/src/openclaw/__tests__/dispatcher.test.ts new file mode 100644 index 000000000..95fbd320b --- /dev/null +++ b/src/openclaw/__tests__/dispatcher.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, test, mock, spyOn } from "bun:test" +import { + interpolateInstruction, + resolveCommandTimeoutMs, + shellEscapeArg, + wakeGateway, + wakeCommandGateway, +} from "../dispatcher" + +describe("OpenClaw Dispatcher", () => { + test("interpolateInstruction replaces variables", () => { + const template = "Hello {{name}}, welcome to {{place}}!" + const variables = { name: "World", place: "Bun" } + expect(interpolateInstruction(template, variables)).toBe( + "Hello World, welcome to Bun!", + ) + }) + + test("interpolateInstruction handles missing variables", () => { + const template = "Hello {{name}}!" + const variables = {} + expect(interpolateInstruction(template, variables)).toBe("Hello !") + }) + + test("shellEscapeArg escapes single quotes", () => { + expect(shellEscapeArg("foo'bar")).toBe("'foo'\\''bar'") + expect(shellEscapeArg("simple")).toBe("'simple'") + }) + + test("wakeGateway sends POST request", async () => { + const fetchSpy = spyOn(global, "fetch").mockResolvedValue( + new Response(JSON.stringify({ ok: true }), { status: 200 }), + ) + try { + const result = await wakeGateway( + "test", + { url: "https://example.com", method: "POST", timeout: 1000, type: "http" }, + { foo: "bar" }, + ) + + expect(result.success).toBe(true) + expect(fetchSpy).toHaveBeenCalled() + const call = fetchSpy.mock.calls[0] + expect(call[0]).toBe("https://example.com") + expect(call[1]?.method).toBe("POST") + expect(call[1]?.body).toBe('{"foo":"bar"}') + } finally { + fetchSpy.mockRestore() + } + }) + + test("wakeGateway fails on invalid URL", async () => { + const result = await wakeGateway("test", { url: "http://example.com", method: "POST", timeout: 1000, type: "http" }, {}) + expect(result.success).toBe(false) + expect(result.error).toContain("Invalid URL") + }) + + test("resolveCommandTimeoutMs reads OMO env fallback", () => { + const original = process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = "4321" + + try { + // Call without explicit envTimeoutRaw so the function reads from process.env itself + expect(resolveCommandTimeoutMs(undefined)).toBe(4321) + } finally { + if (original === undefined) delete process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + else process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = original + } + }) +}) diff --git a/src/openclaw/__tests__/tmux.test.ts b/src/openclaw/__tests__/tmux.test.ts new file mode 100644 index 000000000..790a1bbe0 --- /dev/null +++ b/src/openclaw/__tests__/tmux.test.ts @@ -0,0 +1,13 @@ +import { describe, expect, test } from "bun:test" +import { analyzePaneContent } from "../tmux" + +describe("openclaw tmux helpers", () => { + test("analyzePaneContent recognizes the opencode welcome prompt", () => { + const content = "opencode\nAsk anything...\nRun /help" + expect(analyzePaneContent(content).confidence).toBeGreaterThanOrEqual(1) + }) + + test("analyzePaneContent returns zero confidence for empty content", () => { + expect(analyzePaneContent(null).confidence).toBe(0) + }) +}) diff --git a/src/openclaw/config.ts b/src/openclaw/config.ts new file mode 100644 index 000000000..946b11e69 --- /dev/null +++ b/src/openclaw/config.ts @@ -0,0 +1,120 @@ +import type { + OpenClawConfig, + OpenClawGateway, + OpenClawReplyListenerConfig, +} from "./types" + +const DEFAULT_REPLY_POLL_INTERVAL_MS = 3000 +const MIN_REPLY_POLL_INTERVAL_MS = 500 +const MAX_REPLY_POLL_INTERVAL_MS = 60000 +const DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE = 10 +const MIN_REPLY_RATE_LIMIT_PER_MINUTE = 1 +const DEFAULT_REPLY_MAX_MESSAGE_LENGTH = 500 +const MIN_REPLY_MAX_MESSAGE_LENGTH = 1 +const MAX_REPLY_MAX_MESSAGE_LENGTH = 4000 + +function normalizeInteger( + value: unknown, + fallback: number, + min: number, + max?: number, +): number { + const numeric = + typeof value === "number" + ? Math.trunc(value) + : typeof value === "string" && value.trim() + ? Number.parseInt(value, 10) + : Number.NaN + + if (!Number.isFinite(numeric)) return fallback + if (numeric < min) return min + if (max !== undefined && numeric > max) return max + return numeric +} + +export function normalizeReplyListenerConfig(config: OpenClawConfig): OpenClawConfig { + const replyListener = config.replyListener + if (!replyListener) return config + + const normalizedReplyListener: OpenClawReplyListenerConfig = { + ...replyListener, + discordBotToken: replyListener.discordBotToken, + discordChannelId: replyListener.discordChannelId, + telegramBotToken: replyListener.telegramBotToken, + telegramChatId: replyListener.telegramChatId, + pollIntervalMs: normalizeInteger( + replyListener.pollIntervalMs, + DEFAULT_REPLY_POLL_INTERVAL_MS, + MIN_REPLY_POLL_INTERVAL_MS, + MAX_REPLY_POLL_INTERVAL_MS, + ), + rateLimitPerMinute: normalizeInteger( + replyListener.rateLimitPerMinute, + DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE, + MIN_REPLY_RATE_LIMIT_PER_MINUTE, + ), + maxMessageLength: normalizeInteger( + replyListener.maxMessageLength, + DEFAULT_REPLY_MAX_MESSAGE_LENGTH, + MIN_REPLY_MAX_MESSAGE_LENGTH, + MAX_REPLY_MAX_MESSAGE_LENGTH, + ), + includePrefix: replyListener.includePrefix !== false, + authorizedDiscordUserIds: Array.isArray(replyListener.authorizedDiscordUserIds) + ? replyListener.authorizedDiscordUserIds.filter( + (id) => typeof id === "string" && id.trim() !== "", + ) + : [], + } + + return { + ...config, + replyListener: normalizedReplyListener, + } +} + +export function resolveGateway( + config: OpenClawConfig, + event: string, +): { gatewayName: string; gateway: OpenClawGateway; instruction: string } | null { + if (!config.enabled) return null + + const mapping = config.hooks[event] + if (!mapping || !mapping.enabled) { + return null + } + + const gateway = config.gateways[mapping.gateway] + if (!gateway) { + return null + } + + // Validate based on gateway type + if (gateway.type === "command") { + if (!gateway.command) return null + } else { + // HTTP gateway + if (!gateway.url) return null + } + + return { gatewayName: mapping.gateway, gateway, instruction: mapping.instruction } +} + +export function validateGatewayUrl(url: string): boolean { + try { + const parsed = new URL(url) + if (parsed.protocol === "https:") return true + if ( + parsed.protocol === "http:" && + (parsed.hostname === "localhost" || + parsed.hostname === "127.0.0.1" || + parsed.hostname === "::1" || + parsed.hostname === "[::1]") + ) { + return true + } + return false + } catch { + return false + } +} diff --git a/src/openclaw/daemon.ts b/src/openclaw/daemon.ts new file mode 100644 index 000000000..b075903ab --- /dev/null +++ b/src/openclaw/daemon.ts @@ -0,0 +1,9 @@ +import { pollLoop, logReplyListenerMessage } from "./reply-listener" + +pollLoop().catch((err) => { + logReplyListenerMessage( + `FATAL: reply listener daemon crashed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`, + ) + console.error(err) + process.exit(1) +}) diff --git a/src/openclaw/dispatcher.ts b/src/openclaw/dispatcher.ts new file mode 100644 index 000000000..a965d7b47 --- /dev/null +++ b/src/openclaw/dispatcher.ts @@ -0,0 +1,180 @@ +import { spawn } from "bun" +import type { OpenClawGateway } from "./types" + +const DEFAULT_HTTP_TIMEOUT_MS = 10_000 +const DEFAULT_COMMAND_TIMEOUT_MS = 5_000 +const MIN_COMMAND_TIMEOUT_MS = 100 +const MAX_COMMAND_TIMEOUT_MS = 300_000 +const SHELL_METACHAR_RE = /[|&;><`$()]/ + +export function validateGatewayUrl(url: string): boolean { + try { + const parsed = new URL(url) + if (parsed.protocol === "https:") return true + if ( + parsed.protocol === "http:" && + (parsed.hostname === "localhost" || + parsed.hostname === "127.0.0.1" || + parsed.hostname === "::1" || + parsed.hostname === "[::1]") + ) { + return true + } + return false + } catch { + return false + } +} + +export function interpolateInstruction( + template: string, + variables: Record, +): string { + return template.replace(/\{\{(\w+)\}\}/g, (_match, key) => { + return variables[key] ?? "" + }) +} + +export function shellEscapeArg(value: string): string { + return "'" + value.replace(/'/g, "'\\''") + "'" +} + +export function resolveCommandTimeoutMs( + gatewayTimeout?: number, + envTimeoutRaw = + process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + ?? process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS, +): number { + const parseFinite = (value: unknown): number | undefined => { + if (typeof value !== "number" || !Number.isFinite(value)) return undefined + return value + } + const parseEnv = (value?: string): number | undefined => { + if (!value) return undefined + const parsed = Number(value) + return Number.isFinite(parsed) ? parsed : undefined + } + + const rawTimeout = + parseFinite(gatewayTimeout) ?? + parseEnv(envTimeoutRaw) ?? + DEFAULT_COMMAND_TIMEOUT_MS + + return Math.min( + MAX_COMMAND_TIMEOUT_MS, + Math.max(MIN_COMMAND_TIMEOUT_MS, Math.trunc(rawTimeout)), + ) +} + +export async function wakeGateway( + gatewayName: string, + gatewayConfig: OpenClawGateway, + payload: unknown, +): Promise<{ gateway: string; success: boolean; error?: string; statusCode?: number }> { + if (!gatewayConfig.url || !validateGatewayUrl(gatewayConfig.url)) { + return { + gateway: gatewayName, + success: false, + error: "Invalid URL (HTTPS required)", + } + } + + try { + const headers = { + "Content-Type": "application/json", + ...gatewayConfig.headers, + } + + const timeout = gatewayConfig.timeout ?? DEFAULT_HTTP_TIMEOUT_MS + + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), timeout) + + const response = await fetch(gatewayConfig.url, { + method: gatewayConfig.method || "POST", + headers, + body: JSON.stringify(payload), + signal: controller.signal, + }).finally(() => { + clearTimeout(timeoutId) + }) + + if (!response.ok) { + return { + gateway: gatewayName, + success: false, + error: `HTTP ${response.status}`, + statusCode: response.status, + } + } + + return { gateway: gatewayName, success: true, statusCode: response.status } + } catch (error) { + return { + gateway: gatewayName, + success: false, + error: error instanceof Error ? error.message : "Unknown error", + } + } +} + +export async function wakeCommandGateway( + gatewayName: string, + gatewayConfig: OpenClawGateway, + variables: Record, +): Promise<{ gateway: string; success: boolean; error?: string }> { + if (!gatewayConfig.command) { + return { + gateway: gatewayName, + success: false, + error: "No command configured", + } + } + + try { + const timeout = resolveCommandTimeoutMs(gatewayConfig.timeout) + + // Interpolate variables with shell escaping + const interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => { + const value = variables[key] + if (value === undefined) return _match + return shellEscapeArg(value) + }) + + // Always use sh -c to handle the shell command string correctly + const proc = spawn(["sh", "-c", interpolated], { + env: { ...process.env }, + stdout: "ignore", + stderr: "ignore", + }) + + // Handle timeout manually + let timeoutId: ReturnType | undefined + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + proc.kill() + reject(new Error("Command timed out")) + }, timeout) + }) + + try { + await Promise.race([proc.exited, timeoutPromise]) + } finally { + if (timeoutId !== undefined) { + clearTimeout(timeoutId) + } + } + + if (proc.exitCode !== 0) { + throw new Error(`Command exited with code ${proc.exitCode}`) + } + + return { gateway: gatewayName, success: true } + } catch (error) { + return { + gateway: gatewayName, + success: false, + error: error instanceof Error ? error.message : "Unknown error", + } + } +} diff --git a/src/openclaw/index.ts b/src/openclaw/index.ts new file mode 100644 index 000000000..5cbbe3362 --- /dev/null +++ b/src/openclaw/index.ts @@ -0,0 +1,141 @@ +import { basename } from "path" +import { resolveGateway } from "./config" +import { + wakeGateway, + wakeCommandGateway, + interpolateInstruction, +} from "./dispatcher" +import { getCurrentTmuxSession, captureTmuxPane } from "./tmux" +import { startReplyListener, stopReplyListener } from "./reply-listener" +import type { OpenClawConfig, OpenClawContext, OpenClawPayload, WakeResult } from "./types" + +const DEBUG = + process.env.OMO_OPENCLAW_DEBUG === "1" + || process.env.OMX_OPENCLAW_DEBUG === "1" + +function buildWhitelistedContext(context: OpenClawContext): OpenClawContext { + const result: OpenClawContext = {} + if (context.sessionId !== undefined) result.sessionId = context.sessionId + if (context.projectPath !== undefined) result.projectPath = context.projectPath + if (context.tmuxSession !== undefined) result.tmuxSession = context.tmuxSession + if (context.prompt !== undefined) result.prompt = context.prompt + if (context.contextSummary !== undefined) result.contextSummary = context.contextSummary + if (context.reasoning !== undefined) result.reasoning = context.reasoning + if (context.question !== undefined) result.question = context.question + if (context.tmuxTail !== undefined) result.tmuxTail = context.tmuxTail + if (context.replyChannel !== undefined) result.replyChannel = context.replyChannel + if (context.replyTarget !== undefined) result.replyTarget = context.replyTarget + if (context.replyThread !== undefined) result.replyThread = context.replyThread + return result +} + +export async function wakeOpenClaw( + config: OpenClawConfig, + event: string, + context: OpenClawContext, +): Promise { + try { + if (!config.enabled) return null + + const resolved = resolveGateway(config, event) + if (!resolved) return null + + const { gatewayName, gateway, instruction } = resolved + + const now = new Date().toISOString() + + const replyChannel = context.replyChannel ?? process.env.OPENCLAW_REPLY_CHANNEL + const replyTarget = context.replyTarget ?? process.env.OPENCLAW_REPLY_TARGET + const replyThread = context.replyThread ?? process.env.OPENCLAW_REPLY_THREAD + + const enrichedContext: OpenClawContext = { + ...context, + ...(replyChannel !== undefined && { replyChannel }), + ...(replyTarget !== undefined && { replyTarget }), + ...(replyThread !== undefined && { replyThread }), + } + + const tmuxSession = enrichedContext.tmuxSession ?? getCurrentTmuxSession() ?? undefined + + let tmuxTail = enrichedContext.tmuxTail + if (!tmuxTail && (event === "stop" || event === "session-end") && process.env.TMUX) { + try { + const paneId = process.env.TMUX_PANE + if (paneId) { + tmuxTail = (await captureTmuxPane(paneId, 15)) ?? undefined + } + } catch (error) { + if (DEBUG) { + console.error( + "[openclaw] failed to capture tmux tail:", + error instanceof Error ? error.message : error, + ) + } + } + } + + const variables: Record = { + sessionId: enrichedContext.sessionId, + projectPath: enrichedContext.projectPath, + projectName: enrichedContext.projectPath ? basename(enrichedContext.projectPath) : undefined, + tmuxSession, + prompt: enrichedContext.prompt, + contextSummary: enrichedContext.contextSummary, + reasoning: enrichedContext.reasoning, + question: enrichedContext.question, + tmuxTail, + event, + timestamp: now, + replyChannel, + replyTarget, + replyThread, + } + + const interpolatedInstruction = interpolateInstruction(instruction, variables) + variables.instruction = interpolatedInstruction + + let result: WakeResult + + if (gateway.type === "command") { + result = await wakeCommandGateway(gatewayName, gateway, variables) + } else { + const payload: OpenClawPayload = { + event, + instruction: interpolatedInstruction, + text: interpolatedInstruction, + timestamp: now, + sessionId: enrichedContext.sessionId, + projectPath: enrichedContext.projectPath, + projectName: enrichedContext.projectPath ? basename(enrichedContext.projectPath) : undefined, + tmuxSession, + tmuxTail, + ...(replyChannel !== undefined && { channel: replyChannel }), + ...(replyTarget !== undefined && { to: replyTarget }), + ...(replyThread !== undefined && { threadId: replyThread }), + context: buildWhitelistedContext(enrichedContext), + } + + result = await wakeGateway(gatewayName, gateway, payload) + } + + if (DEBUG) { + console.error(`[openclaw] wake ${event} -> ${gatewayName}: ${result.success ? "ok" : result.error}`) + } + + return result + } catch (error) { + if (DEBUG) { + console.error(`[openclaw] wakeOpenClaw error:`, error instanceof Error ? error.message : error) + } + return null + } +} + +export async function initializeOpenClaw(config: OpenClawConfig): Promise { + const replyListener = config.replyListener + if (config.enabled && (replyListener?.discordBotToken || replyListener?.telegramBotToken)) { + await startReplyListener(config) + } +} + +export { startReplyListener, stopReplyListener } diff --git a/src/openclaw/reply-listener.ts b/src/openclaw/reply-listener.ts new file mode 100644 index 000000000..f6c8e015b --- /dev/null +++ b/src/openclaw/reply-listener.ts @@ -0,0 +1,717 @@ +import { + existsSync, + mkdirSync, + readFileSync, + writeFileSync, + unlinkSync, + chmodSync, + statSync, + appendFileSync, + renameSync, +} from "fs" +import { join, dirname } from "path" +import { homedir } from "os" +import { spawn } from "bun" // Use bun spawn +import { captureTmuxPane, analyzePaneContent, sendToPane, isTmuxAvailable } from "./tmux" +import { lookupByMessageId, removeMessagesByPane, pruneStale } from "./session-registry" +import type { OpenClawConfig } from "./types" +import { normalizeReplyListenerConfig } from "./config" + +const SECURE_FILE_MODE = 0o600 +const MAX_LOG_SIZE_BYTES = 1 * 1024 * 1024 +const DAEMON_ENV_ALLOWLIST = [ + "PATH", + "HOME", + "USERPROFILE", + "USER", + "USERNAME", + "LOGNAME", + "LANG", + "LC_ALL", + "LC_CTYPE", + "TERM", + "TMUX", + "TMUX_PANE", + "TMPDIR", + "TMP", + "TEMP", + "XDG_RUNTIME_DIR", + "XDG_DATA_HOME", + "XDG_CONFIG_HOME", + "SHELL", + "NODE_ENV", + "HTTP_PROXY", + "HTTPS_PROXY", + "http_proxy", + "https_proxy", + "NO_PROXY", + "no_proxy", + "SystemRoot", + "SYSTEMROOT", + "windir", + "COMSPEC", +] + +const DEFAULT_STATE_DIR = join(homedir(), ".omx", "state") +const PID_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.pid") +const STATE_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-state.json") +const CONFIG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-config.json") +const LOG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.log") + +export const DAEMON_IDENTITY_MARKER = "--openclaw-reply-listener-daemon" + +function createMinimalDaemonEnv(): Record { + const env: Record = {} + for (const key of DAEMON_ENV_ALLOWLIST) { + if (process.env[key] !== undefined) { + env[key] = process.env[key] as string + } + } + return env +} + +function ensureStateDir(): void { + if (!existsSync(DEFAULT_STATE_DIR)) { + mkdirSync(DEFAULT_STATE_DIR, { recursive: true, mode: 0o700 }) + } +} + +function writeSecureFile(filePath: string, content: string): void { + ensureStateDir() + writeFileSync(filePath, content, { mode: SECURE_FILE_MODE }) + try { + chmodSync(filePath, SECURE_FILE_MODE) + } catch { + // Ignore + } +} + +function rotateLogIfNeeded(logPath: string): void { + try { + if (!existsSync(logPath)) return + const stats = statSync(logPath) + if (stats.size > MAX_LOG_SIZE_BYTES) { + const backupPath = `${logPath}.old` + if (existsSync(backupPath)) { + unlinkSync(backupPath) + } + renameSync(logPath, backupPath) + } + } catch { + // Ignore + } +} + +function log(message: string): void { + try { + ensureStateDir() + rotateLogIfNeeded(LOG_FILE_PATH) + const timestamp = new Date().toISOString() + const logLine = `[${timestamp}] ${message}\n` + appendFileSync(LOG_FILE_PATH, logLine, { mode: SECURE_FILE_MODE }) + } catch { + // Ignore + } +} + +export function logReplyListenerMessage(message: string): void { + log(message) +} + +interface DaemonState { + isRunning: boolean + pid: number | null + startedAt: string + lastPollAt: string | null + telegramLastUpdateId: number | null + discordLastMessageId: string | null + messagesInjected: number + errors: number + lastError?: string +} + +function readDaemonState(): DaemonState | null { + try { + if (!existsSync(STATE_FILE_PATH)) return null + const content = readFileSync(STATE_FILE_PATH, "utf-8") + return JSON.parse(content) + } catch { + return null + } +} + +function writeDaemonState(state: DaemonState): void { + writeSecureFile(STATE_FILE_PATH, JSON.stringify(state, null, 2)) +} + +function readDaemonConfig(): OpenClawConfig | null { + try { + if (!existsSync(CONFIG_FILE_PATH)) return null + const content = readFileSync(CONFIG_FILE_PATH, "utf-8") + return JSON.parse(content) + } catch { + return null + } +} + +function writeDaemonConfig(config: OpenClawConfig): void { + writeSecureFile(CONFIG_FILE_PATH, JSON.stringify(config, null, 2)) +} + +function readPidFile(): number | null { + try { + if (!existsSync(PID_FILE_PATH)) return null + const content = readFileSync(PID_FILE_PATH, "utf-8") + const pid = parseInt(content.trim(), 10) + if (Number.isNaN(pid)) return null + return pid + } catch { + return null + } +} + +function writePidFile(pid: number): void { + writeSecureFile(PID_FILE_PATH, String(pid)) +} + +function removePidFile(): void { + if (existsSync(PID_FILE_PATH)) { + unlinkSync(PID_FILE_PATH) + } +} + +function isProcessRunning(pid: number): boolean { + try { + process.kill(pid, 0) + return true + } catch { + return false + } +} + +export async function isReplyListenerProcess(pid: number): Promise { + try { + if (process.platform === "linux") { + const cmdline = readFileSync(`/proc/${pid}/cmdline`, "utf-8") + return cmdline.includes(DAEMON_IDENTITY_MARKER) + } + // macOS + const proc = spawn(["ps", "-p", String(pid), "-o", "args="], { + stdout: "pipe", + stderr: "ignore", + }) + const stdout = await new Response(proc.stdout).text() + if (proc.exitCode !== 0) return false + return stdout.includes(DAEMON_IDENTITY_MARKER) + } catch { + return false + } +} + +export async function isDaemonRunning(): Promise { + const pid = readPidFile() + if (pid === null) return false + if (!isProcessRunning(pid)) { + removePidFile() + return false + } + if (!(await isReplyListenerProcess(pid))) { + removePidFile() + return false + } + return true +} + +// Input Sanitization +export function sanitizeReplyInput(text: string): string { + return text + .replace(/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/g, "") + .replace(/[\u200e\u200f\u202a-\u202e\u2066-\u2069]/g, "") + .replace(/\r?\n/g, " ") + .replace(/\\/g, "\\\\") + .replace(/`/g, "\\`") + .replace(/\$\(/g, "\\$(") + .replace(/\$\{/g, "\\${") + .trim() +} + +class RateLimiter { + maxPerMinute: number + timestamps: number[] = [] + windowMs = 60 * 1000 + + constructor(maxPerMinute: number) { + this.maxPerMinute = maxPerMinute + } + + canProceed(): boolean { + const now = Date.now() + this.timestamps = this.timestamps.filter((t) => now - t < this.windowMs) + if (this.timestamps.length >= this.maxPerMinute) return false + this.timestamps.push(now) + return true + } +} + +async function injectReply( + paneId: string, + text: string, + platform: string, + config: OpenClawConfig, +): Promise { + const replyListener = config.replyListener + const content = await captureTmuxPane(paneId, 15) + const analysis = analyzePaneContent(content) + + if (analysis.confidence < 0.3) { // Lower threshold for simple check + log( + `WARN: Pane ${paneId} does not appear to be running OpenCode CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`, + ) + removeMessagesByPane(paneId) + return false + } + + const prefix = replyListener?.includePrefix === false ? "" : `[reply:${platform}] ` + const sanitized = sanitizeReplyInput(prefix + text) + const truncated = sanitized.slice(0, replyListener?.maxMessageLength ?? 500) + const success = await sendToPane(paneId, truncated, true) + + if (success) { + log( + `Injected reply from ${platform} into pane ${paneId}: "${truncated.slice(0, 50)}${truncated.length > 50 ? "..." : ""}"`, + ) + } else { + log(`ERROR: Failed to inject reply into pane ${paneId}`) + } + return success +} + +let discordBackoffUntil = 0 + +async function pollDiscord( + config: OpenClawConfig, + state: DaemonState, + rateLimiter: RateLimiter, +): Promise { + const replyListener = config.replyListener + if (!replyListener?.discordBotToken || !replyListener.discordChannelId) return + if ( + !replyListener.authorizedDiscordUserIds + || replyListener.authorizedDiscordUserIds.length === 0 + ) { + return + } + if (Date.now() < discordBackoffUntil) return + + try { + const after = state.discordLastMessageId + ? `?after=${state.discordLastMessageId}&limit=10` + : "?limit=10" + const url = `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages${after}` + + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), 10000) + + const response = await fetch(url, { + method: "GET", + headers: { Authorization: `Bot ${replyListener.discordBotToken}` }, + signal: controller.signal, + }) + + clearTimeout(timeout) + + const remaining = response.headers.get("x-ratelimit-remaining") + const reset = response.headers.get("x-ratelimit-reset") + + if (remaining !== null && parseInt(remaining, 10) < 2) { + const parsed = reset ? parseFloat(reset) : Number.NaN + const resetTime = Number.isFinite(parsed) ? parsed * 1000 : Date.now() + 10000 + discordBackoffUntil = resetTime + log( + `WARN: Discord rate limit low (remaining: ${remaining}), backing off until ${new Date(resetTime).toISOString()}`, + ) + } + + if (!response.ok) { + log(`Discord API error: HTTP ${response.status}`) + return + } + + const messages = await response.json() + if (!Array.isArray(messages) || messages.length === 0) return + + const sorted = [...messages].reverse() + + for (const msg of sorted) { + if (!msg.message_reference?.message_id) { + state.discordLastMessageId = msg.id + writeDaemonState(state) + continue + } + + if (!replyListener.authorizedDiscordUserIds.includes(msg.author.id)) { + state.discordLastMessageId = msg.id + writeDaemonState(state) + continue + } + + const mapping = lookupByMessageId("discord-bot", msg.message_reference.message_id) + if (!mapping) { + state.discordLastMessageId = msg.id + writeDaemonState(state) + continue + } + + if (!rateLimiter.canProceed()) { + log(`WARN: Rate limit exceeded, dropping Discord message ${msg.id}`) + state.discordLastMessageId = msg.id + writeDaemonState(state) + state.errors++ + continue + } + + state.discordLastMessageId = msg.id + writeDaemonState(state) + + const success = await injectReply(mapping.tmuxPaneId, msg.content, "discord", config) + + if (success) { + state.messagesInjected++ + // Add reaction + try { + await fetch( + `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`, + { + method: "PUT", + headers: { Authorization: `Bot ${replyListener.discordBotToken}` }, + }, + ) + } catch { + // Ignore + } + } else { + state.errors++ + } + } + } catch (error) { + state.errors++ + state.lastError = error instanceof Error ? error.message : String(error) + log(`Discord polling error: ${state.lastError}`) + } +} + +async function pollTelegram( + config: OpenClawConfig, + state: DaemonState, + rateLimiter: RateLimiter, +): Promise { + const replyListener = config.replyListener + if (!replyListener?.telegramBotToken || !replyListener.telegramChatId) return + + try { + const offset = state.telegramLastUpdateId ? state.telegramLastUpdateId + 1 : 0 + const url = `https://api.telegram.org/bot${replyListener.telegramBotToken}/getUpdates?offset=${offset}&timeout=0` + + const controller = new AbortController() + const timeout = setTimeout(() => controller.abort(), 10000) + + const response = await fetch(url, { + method: "GET", + signal: controller.signal, + }) + + clearTimeout(timeout) + + if (!response.ok) { + log(`Telegram API error: HTTP ${response.status}`) + return + } + + const body = await response.json() as any + const updates = body.result || [] + + for (const update of updates) { + const msg = update.message + if (!msg) { + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + continue + } + + if (!msg.reply_to_message?.message_id) { + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + continue + } + + if (String(msg.chat.id) !== replyListener.telegramChatId) { + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + continue + } + + const mapping = lookupByMessageId("telegram", String(msg.reply_to_message.message_id)) + if (!mapping) { + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + continue + } + + const text = msg.text || "" + if (!text) { + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + continue + } + + if (!rateLimiter.canProceed()) { + log(`WARN: Rate limit exceeded, dropping Telegram message ${msg.message_id}`) + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + state.errors++ + continue + } + + state.telegramLastUpdateId = update.update_id + writeDaemonState(state) + + const success = await injectReply(mapping.tmuxPaneId, text, "telegram", config) + + if (success) { + state.messagesInjected++ + try { + await fetch( + `https://api.telegram.org/bot${replyListener.telegramBotToken}/sendMessage`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + chat_id: replyListener.telegramChatId, + text: "Injected into Codex CLI session.", + reply_to_message_id: msg.message_id, + }), + }, + ) + } catch { + // Ignore + } + } else { + state.errors++ + } + } + } catch (error) { + state.errors++ + state.lastError = error instanceof Error ? error.message : String(error) + log(`Telegram polling error: ${state.lastError}`) + } +} + +const PRUNE_INTERVAL_MS = 60 * 60 * 1000 + +export async function pollLoop(): Promise { + log("Reply listener daemon starting poll loop") + const config = readDaemonConfig() + if (!config) { + log("ERROR: No daemon config found, exiting") + process.exit(1) + } + + const state = readDaemonState() || { + isRunning: true, + pid: process.pid, + startedAt: new Date().toISOString(), + lastPollAt: null, + telegramLastUpdateId: null, + discordLastMessageId: null, + messagesInjected: 0, + errors: 0, + } + + state.isRunning = true + state.pid = process.pid + + const rateLimiter = new RateLimiter(config.replyListener?.rateLimitPerMinute || 10) + let lastPruneAt = Date.now() + + const shutdown = (): void => { + log("Shutdown signal received") + state.isRunning = false + writeDaemonState(state) + removePidFile() + process.exit(0) + } + + process.on("SIGTERM", shutdown) + process.on("SIGINT", shutdown) + + try { + pruneStale() + log("Pruned stale registry entries") + } catch (e) { + log(`WARN: Failed to prune stale entries: ${e}`) + } + + while (state.isRunning) { + try { + state.lastPollAt = new Date().toISOString() + await pollDiscord(config, state, rateLimiter) + await pollTelegram(config, state, rateLimiter) + + if (Date.now() - lastPruneAt > PRUNE_INTERVAL_MS) { + try { + pruneStale() + lastPruneAt = Date.now() + log("Pruned stale registry entries") + } catch (e) { + log(`WARN: Prune failed: ${e instanceof Error ? e.message : String(e)}`) + } + } + + writeDaemonState(state) + await new Promise((resolve) => + setTimeout(resolve, config.replyListener?.pollIntervalMs || 3000), + ) + } catch (error) { + state.errors++ + state.lastError = error instanceof Error ? error.message : String(error) + log(`Poll error: ${state.lastError}`) + writeDaemonState(state) + await new Promise((resolve) => + setTimeout(resolve, (config.replyListener?.pollIntervalMs || 3000) * 2), + ) + } + } + log("Poll loop ended") +} + +export async function startReplyListener(config: OpenClawConfig): Promise<{ success: boolean; message: string; state?: DaemonState; error?: string }> { + if (await isDaemonRunning()) { + const state = readDaemonState() + return { + success: true, + message: "Reply listener daemon is already running", + state: state || undefined, + } + } + + if (!(await isTmuxAvailable())) { + return { + success: false, + message: "tmux not available - reply injection requires tmux", + } + } + + const normalizedConfig = normalizeReplyListenerConfig(config) + const replyListener = normalizedConfig.replyListener + if (!replyListener?.discordBotToken && !replyListener?.telegramBotToken) { + // Only warn if no platforms enabled, but user might just want outbound + // Actually, instructions say: "Fire-and-forget for outbound, daemon process for inbound" + // So if no inbound config, we shouldn't start daemon. + return { + success: false, + message: "No enabled reply listener platforms configured (missing bot tokens/channels)", + } + } + + writeDaemonConfig(normalizedConfig) + ensureStateDir() + + const currentFile = import.meta.url + const isTs = currentFile.endsWith(".ts") + const daemonScript = isTs + ? join(dirname(new URL(currentFile).pathname), "daemon.ts") + : join(dirname(new URL(currentFile).pathname), "daemon.js") + + try { + const proc = spawn(["bun", "run", daemonScript, DAEMON_IDENTITY_MARKER], { + detached: true, + stdio: ["ignore", "ignore", "ignore"], + cwd: process.cwd(), + env: createMinimalDaemonEnv(), + }) + + proc.unref() + const pid = proc.pid + + if (pid) { + writePidFile(pid) + const state: DaemonState = { + isRunning: true, + pid, + startedAt: new Date().toISOString(), + lastPollAt: null, + telegramLastUpdateId: null, + discordLastMessageId: null, + messagesInjected: 0, + errors: 0, + } + writeDaemonState(state) + log(`Reply listener daemon started with PID ${pid}`) + return { + success: true, + message: `Reply listener daemon started with PID ${pid}`, + state, + } + } + + return { + success: false, + message: "Failed to start daemon process", + } + } catch (error) { + return { + success: false, + message: "Failed to start daemon", + error: error instanceof Error ? error.message : String(error), + } + } +} + +export async function stopReplyListener(): Promise<{ success: boolean; message: string; state?: DaemonState; error?: string }> { + const pid = readPidFile() + if (pid === null) { + return { + success: true, + message: "Reply listener daemon is not running", + } + } + + if (!isProcessRunning(pid)) { + removePidFile() + return { + success: true, + message: "Reply listener daemon was not running (cleaned up stale PID file)", + } + } + + if (!(await isReplyListenerProcess(pid))) { + removePidFile() + return { + success: false, + message: `Refusing to kill PID ${pid}: process identity does not match the reply listener daemon (stale or reused PID - removed PID file)`, + } + } + + try { + process.kill(pid, "SIGTERM") + removePidFile() + const state = readDaemonState() + if (state) { + state.isRunning = false + state.pid = null + writeDaemonState(state) + } + log(`Reply listener daemon stopped (PID ${pid})`) + return { + success: true, + message: `Reply listener daemon stopped (PID ${pid})`, + state: state || undefined, + } + } catch (error) { + return { + success: false, + message: "Failed to stop daemon", + error: error instanceof Error ? error.message : String(error), + } + } +} diff --git a/src/openclaw/session-registry.ts b/src/openclaw/session-registry.ts new file mode 100644 index 000000000..4f0b37979 --- /dev/null +++ b/src/openclaw/session-registry.ts @@ -0,0 +1,340 @@ +import { + existsSync, + mkdirSync, + readFileSync, + writeFileSync, + openSync, + closeSync, + writeSync, + unlinkSync, + statSync, + constants, +} from "fs" +import { join, dirname } from "path" +import { randomUUID } from "crypto" +import { getOpenCodeStorageDir } from "../shared/data-path" + +const OPENCLAW_STORAGE_DIR = join(getOpenCodeStorageDir(), "openclaw") +const REGISTRY_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.jsonl") +const REGISTRY_LOCK_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.lock") +const SECURE_FILE_MODE = 0o600 +const MAX_AGE_MS = 24 * 60 * 60 * 1000 +const LOCK_TIMEOUT_MS = 2000 +const LOCK_WAIT_TIMEOUT_MS = 4000 +const LOCK_RETRY_MS = 20 +const LOCK_STALE_MS = 10000 + +export interface SessionMapping { + sessionId: string + tmuxSession: string + tmuxPaneId: string + projectPath: string + platform: string + messageId: string + channelId?: string + threadId?: string + createdAt: string +} + +function ensureRegistryDir(): void { + const registryDir = dirname(REGISTRY_PATH) + if (!existsSync(registryDir)) { + mkdirSync(registryDir, { recursive: true, mode: 0o700 }) + } +} + +function sleepMs(ms: number): void { + // Use Atomics.wait for synchronous sleep + Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms) +} + +function isPidAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) return false + try { + process.kill(pid, 0) + return true + } catch (error) { + return (error as NodeJS.ErrnoException).code === "EPERM" + } +} + +interface LockSnapshot { + raw: string + pid: number | null + token: string | null +} + +function readLockSnapshot(): LockSnapshot | null { + try { + if (!existsSync(REGISTRY_LOCK_PATH)) return null + const raw = readFileSync(REGISTRY_LOCK_PATH, "utf-8") + const trimmed = raw.trim() + if (!trimmed) return { raw, pid: null, token: null } + + try { + const parsed = JSON.parse(trimmed) + const pid = + typeof parsed.pid === "number" && Number.isFinite(parsed.pid) ? parsed.pid : null + const token = + typeof parsed.token === "string" && parsed.token.length > 0 ? parsed.token : null + return { raw, pid, token } + } catch { + // Legacy format or plain PID + const [pidStr] = trimmed.split(":") + const parsedPid = Number.parseInt(pidStr ?? "", 10) + return { + raw, + pid: Number.isFinite(parsedPid) && parsedPid > 0 ? parsedPid : null, + token: null, + } + } + } catch { + return null + } +} + +function removeLockIfUnchanged(snapshot: LockSnapshot): boolean { + try { + if (!existsSync(REGISTRY_LOCK_PATH)) return false + const currentRaw = readFileSync(REGISTRY_LOCK_PATH, "utf-8") + if (currentRaw !== snapshot.raw) return false + unlinkSync(REGISTRY_LOCK_PATH) + return true + } catch { + return false + } +} + +interface LockHandle { + fd: number + token: string +} + +function acquireRegistryLock(): LockHandle | null { + ensureRegistryDir() + const started = Date.now() + while (Date.now() - started < LOCK_TIMEOUT_MS) { + try { + const token = randomUUID() + const fd = openSync( + REGISTRY_LOCK_PATH, + constants.O_CREAT | constants.O_EXCL | constants.O_WRONLY, + SECURE_FILE_MODE, + ) + try { + const lockPayload = JSON.stringify({ + pid: process.pid, + acquiredAt: Date.now(), + token, + }) + writeSync(fd, lockPayload) + } catch (writeError) { + try { + closeSync(fd) + } catch { + // Ignore + } + try { + unlinkSync(REGISTRY_LOCK_PATH) + } catch { + // Ignore + } + throw writeError + } + return { fd, token } + } catch (error) { + const err = error as NodeJS.ErrnoException + if (err.code !== "EEXIST") throw error + + try { + const stats = statSync(REGISTRY_LOCK_PATH) + const lockAgeMs = Date.now() - stats.mtimeMs + if (lockAgeMs > LOCK_STALE_MS) { + const snapshot = readLockSnapshot() + if (!snapshot) { + sleepMs(LOCK_RETRY_MS) + continue + } + if (snapshot.pid !== null && isPidAlive(snapshot.pid)) { + sleepMs(LOCK_RETRY_MS) + continue + } + if (removeLockIfUnchanged(snapshot)) { + continue + } + } + } catch { + // Ignore errors + } + sleepMs(LOCK_RETRY_MS) + } + } + return null +} + +function acquireRegistryLockOrWait(maxWaitMs = LOCK_WAIT_TIMEOUT_MS): LockHandle | null { + const started = Date.now() + while (Date.now() - started < maxWaitMs) { + const lock = acquireRegistryLock() + if (lock !== null) return lock + if (Date.now() - started < maxWaitMs) { + sleepMs(LOCK_RETRY_MS) + } + } + return null +} + +function releaseRegistryLock(lock: LockHandle): void { + try { + closeSync(lock.fd) + } catch { + // Ignore + } + const snapshot = readLockSnapshot() + if (!snapshot || snapshot.token !== lock.token) return + removeLockIfUnchanged(snapshot) +} + +function withRegistryLockOrWait( + onLocked: () => T, + onLockUnavailable: () => T, +): T { + const lock = acquireRegistryLockOrWait() + if (lock === null) return onLockUnavailable() + try { + return onLocked() + } finally { + releaseRegistryLock(lock) + } +} + +function withRegistryLock(onLocked: () => void, onLockUnavailable: () => void): void { + const lock = acquireRegistryLock() + if (lock === null) { + onLockUnavailable() + return + } + try { + onLocked() + } finally { + releaseRegistryLock(lock) + } +} + +function readAllMappingsUnsafe(): SessionMapping[] { + if (!existsSync(REGISTRY_PATH)) return [] + try { + const content = readFileSync(REGISTRY_PATH, "utf-8") + return content + .split("\n") + .filter((line) => line.trim()) + .map((line) => { + try { + return JSON.parse(line) as SessionMapping + } catch { + return null + } + }) + .filter((m): m is SessionMapping => m !== null) + } catch { + return [] + } +} + +function rewriteRegistryUnsafe(mappings: SessionMapping[]): void { + ensureRegistryDir() + if (mappings.length === 0) { + writeFileSync(REGISTRY_PATH, "", { mode: SECURE_FILE_MODE }) + return + } + const content = mappings.map((m) => JSON.stringify(m)).join("\n") + "\n" + writeFileSync(REGISTRY_PATH, content, { mode: SECURE_FILE_MODE }) +} + +export function registerMessage(mapping: SessionMapping): boolean { + return withRegistryLockOrWait( + () => { + ensureRegistryDir() + const line = JSON.stringify(mapping) + "\n" + const fd = openSync( + REGISTRY_PATH, + constants.O_WRONLY | constants.O_APPEND | constants.O_CREAT, + SECURE_FILE_MODE, + ) + try { + writeSync(fd, line) + } finally { + closeSync(fd) + } + return true + }, + () => { + console.warn( + "[notifications] session registry lock unavailable; skipping reply correlation write", + ) + return false + }, + ) +} + +export function loadAllMappings(): SessionMapping[] { + return withRegistryLockOrWait( + () => readAllMappingsUnsafe(), + () => [], + ) +} + +export function lookupByMessageId(platform: string, messageId: string): SessionMapping | null { + const mappings = loadAllMappings() + return mappings.find((m) => m.platform === platform && m.messageId === messageId) || null +} + +export function removeSession(sessionId: string): void { + withRegistryLock( + () => { + const mappings = readAllMappingsUnsafe() + const filtered = mappings.filter((m) => m.sessionId !== sessionId) + if (filtered.length === mappings.length) return + rewriteRegistryUnsafe(filtered) + }, + () => { + // Best-effort + }, + ) +} + +export function removeMessagesByPane(paneId: string): void { + withRegistryLock( + () => { + const mappings = readAllMappingsUnsafe() + const filtered = mappings.filter((m) => m.tmuxPaneId !== paneId) + if (filtered.length === mappings.length) return + rewriteRegistryUnsafe(filtered) + }, + () => { + // Best-effort + }, + ) +} + +export function pruneStale(): void { + withRegistryLock( + () => { + const now = Date.now() + const mappings = readAllMappingsUnsafe() + const filtered = mappings.filter((m) => { + try { + const age = now - new Date(m.createdAt).getTime() + return age < MAX_AGE_MS + } catch { + return false + } + }) + if (filtered.length === mappings.length) return + rewriteRegistryUnsafe(filtered) + }, + () => { + // Best-effort + }, + ) +} diff --git a/src/openclaw/tmux.ts b/src/openclaw/tmux.ts new file mode 100644 index 000000000..6b575e662 --- /dev/null +++ b/src/openclaw/tmux.ts @@ -0,0 +1,91 @@ +import { spawn } from "bun" + +export function getCurrentTmuxSession(): string | null { + const env = process.env.TMUX + if (!env) return null + const match = env.match(/(\d+)$/) + return match ? `session-${match[1]}` : null // Wait, TMUX env is /tmp/tmux-501/default,1234,0 + // Reference tmux.js gets session name via `tmux display-message -p '#S'` +} + +export async function getTmuxSessionName(): Promise { + try { + const proc = spawn(["tmux", "display-message", "-p", "#S"], { + stdout: "pipe", + stderr: "ignore", + }) + const outputPromise = new Response(proc.stdout).text() + await proc.exited + const output = await outputPromise + // Await proc.exited ensures exitCode is set; avoid race condition + if (proc.exitCode !== 0) return null + return output.trim() || null + } catch { + return null + } +} + +export async function captureTmuxPane(paneId: string, lines = 15): Promise { + try { + const proc = spawn( + ["tmux", "capture-pane", "-p", "-t", paneId, "-S", `-${lines}`], + { + stdout: "pipe", + stderr: "ignore", + }, + ) + const outputPromise = new Response(proc.stdout).text() + await proc.exited + const output = await outputPromise + if (proc.exitCode !== 0) return null + return output.trim() || null + } catch { + return null + } +} + +export async function sendToPane(paneId: string, text: string, confirm = true): Promise { + try { + const literalProc = spawn(["tmux", "send-keys", "-t", paneId, "-l", "--", text], { + stdout: "ignore", + stderr: "ignore", + }) + await literalProc.exited + if (literalProc.exitCode !== 0) return false + + if (!confirm) return true + + const enterProc = spawn(["tmux", "send-keys", "-t", paneId, "Enter"], { + stdout: "ignore", + stderr: "ignore", + }) + await enterProc.exited + return enterProc.exitCode === 0 + } catch { + return false + } +} + +export async function isTmuxAvailable(): Promise { + try { + const proc = spawn(["tmux", "-V"], { + stdout: "ignore", + stderr: "ignore", + }) + await proc.exited + return proc.exitCode === 0 + } catch { + return false + } +} + +export function analyzePaneContent(content: string | null): { confidence: number } { + if (!content) return { confidence: 0 } + + let confidence = 0 + if (content.includes("opencode")) confidence += 0.3 + if (content.includes("Ask anything...")) confidence += 0.5 + if (content.includes("Run /help")) confidence += 0.2 + + return { confidence: Math.min(1, confidence) } +} diff --git a/src/openclaw/types.ts b/src/openclaw/types.ts new file mode 100644 index 000000000..b05325da2 --- /dev/null +++ b/src/openclaw/types.ts @@ -0,0 +1,52 @@ +import type { + OpenClawConfig, + OpenClawGateway, + OpenClawHook, + OpenClawReplyListenerConfig, +} from "../config/schema/openclaw" + +export type { + OpenClawConfig, + OpenClawGateway, + OpenClawHook, + OpenClawReplyListenerConfig, +} + +export interface OpenClawContext { + sessionId?: string + projectPath?: string + projectName?: string + tmuxSession?: string + prompt?: string + contextSummary?: string + reasoning?: string + question?: string + tmuxTail?: string + replyChannel?: string + replyTarget?: string + replyThread?: string + [key: string]: string | undefined +} + +export interface OpenClawPayload { + event: string + instruction: string + text: string + timestamp: string + sessionId?: string + projectPath?: string + projectName?: string + tmuxSession?: string + tmuxTail?: string + channel?: string + to?: string + threadId?: string + context: OpenClawContext +} + +export interface WakeResult { + gateway: string + success: boolean + error?: string + statusCode?: number +}