diff --git a/src/config/schema/openclaw.ts b/src/config/schema/openclaw.ts index 31df990f9..a768728c8 100644 --- a/src/config/schema/openclaw.ts +++ b/src/config/schema/openclaw.ts @@ -9,7 +9,7 @@ export const OpenClawGatewaySchema = z.object({ // Command specific command: z.string().optional(), // Shared - timeout: z.number().default(10000), + timeout: z.number().optional(), }) export const OpenClawHookSchema = z.object({ @@ -18,28 +18,33 @@ export const OpenClawHookSchema = z.object({ instruction: z.string(), }) -export const OpenClawConfigSchema = z.object({ - enabled: z.boolean().default(false), - - // Outbound Configuration - gateways: z.record(z.string(), OpenClawGatewaySchema).default({}), - hooks: z.record(z.string(), OpenClawHookSchema).default({}), - - // Inbound Configuration (Reply Listener) +export const OpenClawReplyListenerConfigSchema = z.object({ discordBotToken: z.string().optional(), discordChannelId: z.string().optional(), discordMention: z.string().optional(), // For allowed_mentions authorizedDiscordUserIds: z.array(z.string()).default([]), - + telegramBotToken: z.string().optional(), telegramChatId: z.string().optional(), - + pollIntervalMs: z.number().default(3000), rateLimitPerMinute: z.number().default(10), maxMessageLength: z.number().default(500), includePrefix: z.boolean().default(true), }) +export const OpenClawConfigSchema = z.object({ + enabled: z.boolean().default(false), + + // Outbound Configuration + gateways: z.record(z.string(), OpenClawGatewaySchema).default({}), + hooks: z.record(z.string(), OpenClawHookSchema).default({}), + + // Inbound Configuration (Reply Listener) + replyListener: OpenClawReplyListenerConfigSchema.optional(), +}) + export type OpenClawConfig = z.infer export type OpenClawGateway = z.infer export type OpenClawHook = z.infer +export type OpenClawReplyListenerConfig = z.infer diff --git a/src/hooks/openclaw.test.ts b/src/hooks/openclaw.test.ts new file mode 100644 index 000000000..06fb3957e --- /dev/null +++ b/src/hooks/openclaw.test.ts @@ -0,0 +1,60 @@ +import { beforeEach, describe, expect, mock, test } from "bun:test" + +const wakeOpenClawMock = mock(async () => null) + +mock.module("../openclaw", () => ({ + wakeOpenClaw: wakeOpenClawMock, +})) + +describe("createOpenClawHook", () => { + beforeEach(() => { + wakeOpenClawMock.mockClear() + }) + + test("maps session.stop events to stop", async () => { + const { createOpenClawHook } = await import("./openclaw") + const hook = createOpenClawHook( + { directory: "/tmp/project" } as any, + { openclaw: { enabled: true } } as any, + ) + + await hook?.event?.({ + event: { + type: "session.stop", + properties: { sessionID: "session-1" }, + }, + }) + + expect(wakeOpenClawMock).toHaveBeenCalledWith( + expect.anything(), + "stop", + expect.objectContaining({ + projectPath: "/tmp/project", + sessionId: "session-1", + }), + ) + }) + + test("uses tool.execute.before for question tools", async () => { + const { createOpenClawHook } = await import("./openclaw") + const hook = createOpenClawHook( + { directory: "/tmp/project" } as any, + { openclaw: { enabled: true } } as any, + ) + + await hook?.["tool.execute.before"]?.( + { tool: "ask_user_question", sessionID: "session-2" }, + { args: { question: "Need approval?" } }, + ) + + expect(wakeOpenClawMock).toHaveBeenCalledWith( + expect.anything(), + "ask-user-question", + expect.objectContaining({ + projectPath: "/tmp/project", + question: "Need approval?", + sessionId: "session-2", + }), + ) + }) +}) diff --git a/src/hooks/openclaw.ts b/src/hooks/openclaw.ts index 4fdd1f2d8..83f96269d 100644 --- a/src/hooks/openclaw.ts +++ b/src/hooks/openclaw.ts @@ -3,7 +3,6 @@ import type { OhMyOpenCodeConfig } from "../config" import { wakeOpenClaw } from "../openclaw" import type { OpenClawContext } from "../openclaw/types" - export function createOpenClawHook( ctx: PluginContext, pluginConfig: OhMyOpenCodeConfig, @@ -35,21 +34,31 @@ export function createOpenClawHook( // This is heuristic. If the last message was from assistant and ended with a question? // Or if the system is idle. await handleWake("session-idle", context) - } else if (event.type === "session.stopped") { // Assuming this event exists or map from error? + } else if (event.type === "session.stop") { await handleWake("stop", context) } }, - - toolExecuteBefore: async (input: any) => { - const { toolName, toolInput, sessionID } = input - if (toolName === "ask_user" || toolName === "ask_followup_question") { - const context: OpenClawContext = { - sessionId: sessionID, - projectPath: ctx.directory, - question: toolInput.question, - } - await handleWake("ask-user-question", context) - } - } + + "tool.execute.before": async ( + input: { tool: string; sessionID: string }, + output: { args: Record }, + ) => { + const normalizedToolName = input.tool.toLowerCase() + if ( + normalizedToolName !== "question" + && normalizedToolName !== "ask_user_question" + && normalizedToolName !== "askuserquestion" + ) { + return + } + + const question = typeof output.args.question === "string" ? output.args.question : undefined + const context: OpenClawContext = { + sessionId: input.sessionID, + projectPath: ctx.directory, + question, + } + await handleWake("ask-user-question", context) + }, } } diff --git a/src/openclaw/__tests__/config.test.ts b/src/openclaw/__tests__/config.test.ts index 3237c9b38..62972f45a 100644 --- a/src/openclaw/__tests__/config.test.ts +++ b/src/openclaw/__tests__/config.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test } from "bun:test" import { resolveGateway, validateGatewayUrl, normalizeReplyListenerConfig } from "../config" import type { OpenClawConfig } from "../types" +import { OpenClawConfigSchema } from "../../config/schema/openclaw" describe("OpenClaw Config", () => { test("resolveGateway resolves HTTP gateway", () => { @@ -49,7 +50,7 @@ describe("OpenClaw Config", () => { test("resolveGateway returns null for disabled hook", () => { const config: OpenClawConfig = { enabled: true, - gateways: { g: { url: "https://example.com" } }, + gateways: { g: { type: "http", url: "https://example.com" } }, hooks: { event: { enabled: false, gateway: "g", instruction: "i" }, }, @@ -69,4 +70,46 @@ describe("OpenClaw Config", () => { expect(validateGatewayUrl("http://localhost:3000")).toBe(true) expect(validateGatewayUrl("http://127.0.0.1:3000")).toBe(true) }) + + test("normalizeReplyListenerConfig normalizes nested reply listener fields", () => { + const config = normalizeReplyListenerConfig({ + enabled: true, + gateways: {}, + hooks: {}, + replyListener: { + discordBotToken: "discord-token", + discordChannelId: "channel-id", + authorizedDiscordUserIds: ["user-1", "", "user-2"], + pollIntervalMs: 100, + rateLimitPerMinute: 0, + maxMessageLength: 9000, + includePrefix: false, + }, + } as OpenClawConfig) + + expect(config.replyListener).toEqual({ + discordBotToken: "discord-token", + discordChannelId: "channel-id", + authorizedDiscordUserIds: ["user-1", "user-2"], + pollIntervalMs: 500, + rateLimitPerMinute: 1, + maxMessageLength: 4000, + includePrefix: false, + }) + }) + + test("gateway timeout remains optional so env fallback can apply", () => { + const parsed = OpenClawConfigSchema.parse({ + enabled: true, + gateways: { + command: { + type: "command", + command: "echo hi", + }, + }, + hooks: {}, + }) + + expect(parsed.gateways.command.timeout).toBeUndefined() + }) }) diff --git a/src/openclaw/__tests__/dispatcher.test.ts b/src/openclaw/__tests__/dispatcher.test.ts index d9bad0696..5cf624e28 100644 --- a/src/openclaw/__tests__/dispatcher.test.ts +++ b/src/openclaw/__tests__/dispatcher.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test, mock, spyOn } from "bun:test" import { interpolateInstruction, + resolveCommandTimeoutMs, shellEscapeArg, wakeGateway, wakeCommandGateway, @@ -30,21 +31,22 @@ describe("OpenClaw Dispatcher", () => { const fetchSpy = spyOn(global, "fetch").mockResolvedValue( new Response(JSON.stringify({ ok: true }), { status: 200 }), ) + try { + const result = await wakeGateway( + "test", + { url: "https://example.com", method: "POST", timeout: 1000, type: "http" }, + { foo: "bar" }, + ) - const result = await wakeGateway( - "test", - { url: "https://example.com", method: "POST", timeout: 1000, type: "http" }, - { foo: "bar" }, - ) - - expect(result.success).toBe(true) - expect(fetchSpy).toHaveBeenCalled() - const call = fetchSpy.mock.calls[0] - expect(call[0]).toBe("https://example.com") - expect(call[1]?.method).toBe("POST") - expect(call[1]?.body).toBe('{"foo":"bar"}') - - fetchSpy.mockRestore() + expect(result.success).toBe(true) + expect(fetchSpy).toHaveBeenCalled() + const call = fetchSpy.mock.calls[0] + expect(call[0]).toBe("https://example.com") + expect(call[1]?.method).toBe("POST") + expect(call[1]?.body).toBe('{"foo":"bar"}') + } finally { + fetchSpy.mockRestore() + } }) test("wakeGateway fails on invalid URL", async () => { @@ -52,4 +54,16 @@ describe("OpenClaw Dispatcher", () => { expect(result.success).toBe(false) expect(result.error).toContain("Invalid URL") }) + + test("resolveCommandTimeoutMs reads OMO env fallback", () => { + const original = process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = "4321" + + try { + expect(resolveCommandTimeoutMs(undefined, process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS)).toBe(4321) + } finally { + if (original === undefined) delete process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + else process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = original + } + }) }) diff --git a/src/openclaw/__tests__/tmux.test.ts b/src/openclaw/__tests__/tmux.test.ts new file mode 100644 index 000000000..790a1bbe0 --- /dev/null +++ b/src/openclaw/__tests__/tmux.test.ts @@ -0,0 +1,13 @@ +import { describe, expect, test } from "bun:test" +import { analyzePaneContent } from "../tmux" + +describe("openclaw tmux helpers", () => { + test("analyzePaneContent recognizes the opencode welcome prompt", () => { + const content = "opencode\nAsk anything...\nRun /help" + expect(analyzePaneContent(content).confidence).toBeGreaterThanOrEqual(1) + }) + + test("analyzePaneContent returns zero confidence for empty content", () => { + expect(analyzePaneContent(null).confidence).toBe(0) + }) +}) diff --git a/src/openclaw/config.ts b/src/openclaw/config.ts index 06e8fa80f..946b11e69 100644 --- a/src/openclaw/config.ts +++ b/src/openclaw/config.ts @@ -1,4 +1,8 @@ -import type { OpenClawConfig, OpenClawGateway } from "./types" +import type { + OpenClawConfig, + OpenClawGateway, + OpenClawReplyListenerConfig, +} from "./types" const DEFAULT_REPLY_POLL_INTERVAL_MS = 3000 const MIN_REPLY_POLL_INTERVAL_MS = 500 @@ -29,41 +33,44 @@ function normalizeInteger( } export function normalizeReplyListenerConfig(config: OpenClawConfig): OpenClawConfig { - const discordEnabled = - config.discordBotToken && config.discordChannelId ? true : false - const telegramEnabled = - config.telegramBotToken && config.telegramChatId ? true : false + const replyListener = config.replyListener + if (!replyListener) return config - return { - ...config, - discordBotToken: config.discordBotToken, - discordChannelId: config.discordChannelId, - telegramBotToken: config.telegramBotToken, - telegramChatId: config.telegramChatId, + const normalizedReplyListener: OpenClawReplyListenerConfig = { + ...replyListener, + discordBotToken: replyListener.discordBotToken, + discordChannelId: replyListener.discordChannelId, + telegramBotToken: replyListener.telegramBotToken, + telegramChatId: replyListener.telegramChatId, pollIntervalMs: normalizeInteger( - config.pollIntervalMs, + replyListener.pollIntervalMs, DEFAULT_REPLY_POLL_INTERVAL_MS, MIN_REPLY_POLL_INTERVAL_MS, MAX_REPLY_POLL_INTERVAL_MS, ), rateLimitPerMinute: normalizeInteger( - config.rateLimitPerMinute, + replyListener.rateLimitPerMinute, DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE, MIN_REPLY_RATE_LIMIT_PER_MINUTE, ), maxMessageLength: normalizeInteger( - config.maxMessageLength, + replyListener.maxMessageLength, DEFAULT_REPLY_MAX_MESSAGE_LENGTH, MIN_REPLY_MAX_MESSAGE_LENGTH, MAX_REPLY_MAX_MESSAGE_LENGTH, ), - includePrefix: config.includePrefix !== false, - authorizedDiscordUserIds: Array.isArray(config.authorizedDiscordUserIds) - ? config.authorizedDiscordUserIds.filter( + includePrefix: replyListener.includePrefix !== false, + authorizedDiscordUserIds: Array.isArray(replyListener.authorizedDiscordUserIds) + ? replyListener.authorizedDiscordUserIds.filter( (id) => typeof id === "string" && id.trim() !== "", ) : [], } + + return { + ...config, + replyListener: normalizedReplyListener, + } } export function resolveGateway( @@ -71,17 +78,17 @@ export function resolveGateway( event: string, ): { gatewayName: string; gateway: OpenClawGateway; instruction: string } | null { if (!config.enabled) return null - + const mapping = config.hooks[event] if (!mapping || !mapping.enabled) { return null } - + const gateway = config.gateways[mapping.gateway] if (!gateway) { return null } - + // Validate based on gateway type if (gateway.type === "command") { if (!gateway.command) return null @@ -89,7 +96,7 @@ export function resolveGateway( // HTTP gateway if (!gateway.url) return null } - + return { gatewayName: mapping.gateway, gateway, instruction: mapping.instruction } } diff --git a/src/openclaw/daemon.ts b/src/openclaw/daemon.ts index 3fbf137ea..b075903ab 100644 --- a/src/openclaw/daemon.ts +++ b/src/openclaw/daemon.ts @@ -1,6 +1,9 @@ -import { pollLoop } from "./reply-listener" +import { pollLoop, logReplyListenerMessage } from "./reply-listener" pollLoop().catch((err) => { + logReplyListenerMessage( + `FATAL: reply listener daemon crashed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`, + ) console.error(err) process.exit(1) }) diff --git a/src/openclaw/dispatcher.ts b/src/openclaw/dispatcher.ts index 47713dece..a965d7b47 100644 --- a/src/openclaw/dispatcher.ts +++ b/src/openclaw/dispatcher.ts @@ -41,7 +41,9 @@ export function shellEscapeArg(value: string): string { export function resolveCommandTimeoutMs( gatewayTimeout?: number, - envTimeoutRaw = process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS, + envTimeoutRaw = + process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS + ?? process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS, ): number { const parseFinite = (value: unknown): number | undefined => { if (typeof value !== "number" || !Number.isFinite(value)) return undefined @@ -52,12 +54,12 @@ export function resolveCommandTimeoutMs( const parsed = Number(value) return Number.isFinite(parsed) ? parsed : undefined } - + const rawTimeout = parseFinite(gatewayTimeout) ?? parseEnv(envTimeoutRaw) ?? DEFAULT_COMMAND_TIMEOUT_MS - + return Math.min( MAX_COMMAND_TIMEOUT_MS, Math.max(MIN_COMMAND_TIMEOUT_MS, Math.trunc(rawTimeout)), @@ -76,27 +78,27 @@ export async function wakeGateway( error: "Invalid URL (HTTPS required)", } } - + try { const headers = { "Content-Type": "application/json", ...gatewayConfig.headers, } - + const timeout = gatewayConfig.timeout ?? DEFAULT_HTTP_TIMEOUT_MS - + const controller = new AbortController() const timeoutId = setTimeout(() => controller.abort(), timeout) - + const response = await fetch(gatewayConfig.url, { method: gatewayConfig.method || "POST", headers, body: JSON.stringify(payload), signal: controller.signal, + }).finally(() => { + clearTimeout(timeoutId) }) - - clearTimeout(timeoutId) - + if (!response.ok) { return { gateway: gatewayName, @@ -128,38 +130,45 @@ export async function wakeCommandGateway( error: "No command configured", } } - + try { const timeout = resolveCommandTimeoutMs(gatewayConfig.timeout) - + // Interpolate variables with shell escaping - let interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => { + const interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => { const value = variables[key] if (value === undefined) return _match return shellEscapeArg(value) }) - + // Always use sh -c to handle the shell command string correctly const proc = spawn(["sh", "-c", interpolated], { env: { ...process.env }, stdout: "ignore", stderr: "ignore", }) - + // Handle timeout manually - const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => { + let timeoutId: ReturnType | undefined + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { proc.kill() reject(new Error("Command timed out")) }, timeout) }) - - await Promise.race([proc.exited, timeoutPromise]) - + + try { + await Promise.race([proc.exited, timeoutPromise]) + } finally { + if (timeoutId !== undefined) { + clearTimeout(timeoutId) + } + } + if (proc.exitCode !== 0) { throw new Error(`Command exited with code ${proc.exitCode}`) } - + return { gateway: gatewayName, success: true } } catch (error) { return { @@ -169,4 +178,3 @@ export async function wakeCommandGateway( } } } - diff --git a/src/openclaw/index.ts b/src/openclaw/index.ts index ed71be880..5cbbe3362 100644 --- a/src/openclaw/index.ts +++ b/src/openclaw/index.ts @@ -9,7 +9,9 @@ import { getCurrentTmuxSession, captureTmuxPane } from "./tmux" import { startReplyListener, stopReplyListener } from "./reply-listener" import type { OpenClawConfig, OpenClawContext, OpenClawPayload, WakeResult } from "./types" -const DEBUG = process.env.OMX_OPENCLAW_DEBUG === "1" +const DEBUG = + process.env.OMO_OPENCLAW_DEBUG === "1" + || process.env.OMX_OPENCLAW_DEBUG === "1" function buildWhitelistedContext(context: OpenClawContext): OpenClawContext { const result: OpenClawContext = {} @@ -18,7 +20,7 @@ function buildWhitelistedContext(context: OpenClawContext): OpenClawContext { if (context.tmuxSession !== undefined) result.tmuxSession = context.tmuxSession if (context.prompt !== undefined) result.prompt = context.prompt if (context.contextSummary !== undefined) result.contextSummary = context.contextSummary - if (context.reason !== undefined) result.reason = context.reason + if (context.reasoning !== undefined) result.reasoning = context.reasoning if (context.question !== undefined) result.question = context.question if (context.tmuxTail !== undefined) result.tmuxTail = context.tmuxTail if (context.replyChannel !== undefined) result.replyChannel = context.replyChannel @@ -34,27 +36,27 @@ export async function wakeOpenClaw( ): Promise { try { if (!config.enabled) return null - + const resolved = resolveGateway(config, event) if (!resolved) return null - + const { gatewayName, gateway, instruction } = resolved - + const now = new Date().toISOString() - + const replyChannel = context.replyChannel ?? process.env.OPENCLAW_REPLY_CHANNEL const replyTarget = context.replyTarget ?? process.env.OPENCLAW_REPLY_TARGET const replyThread = context.replyThread ?? process.env.OPENCLAW_REPLY_THREAD - + const enrichedContext: OpenClawContext = { ...context, ...(replyChannel !== undefined && { replyChannel }), ...(replyTarget !== undefined && { replyTarget }), ...(replyThread !== undefined && { replyThread }), } - + const tmuxSession = enrichedContext.tmuxSession ?? getCurrentTmuxSession() ?? undefined - + let tmuxTail = enrichedContext.tmuxTail if (!tmuxTail && (event === "stop" || event === "session-end") && process.env.TMUX) { try { @@ -62,11 +64,16 @@ export async function wakeOpenClaw( if (paneId) { tmuxTail = (await captureTmuxPane(paneId, 15)) ?? undefined } - } catch { - // Ignore + } catch (error) { + if (DEBUG) { + console.error( + "[openclaw] failed to capture tmux tail:", + error instanceof Error ? error.message : error, + ) + } } } - + const variables: Record = { sessionId: enrichedContext.sessionId, projectPath: enrichedContext.projectPath, @@ -74,7 +81,7 @@ export async function wakeOpenClaw( tmuxSession, prompt: enrichedContext.prompt, contextSummary: enrichedContext.contextSummary, - reason: enrichedContext.reason, + reasoning: enrichedContext.reasoning, question: enrichedContext.question, tmuxTail, event, @@ -83,12 +90,12 @@ export async function wakeOpenClaw( replyTarget, replyThread, } - + const interpolatedInstruction = interpolateInstruction(instruction, variables) variables.instruction = interpolatedInstruction - + let result: WakeResult - + if (gateway.type === "command") { result = await wakeCommandGateway(gatewayName, gateway, variables) } else { @@ -107,14 +114,14 @@ export async function wakeOpenClaw( ...(replyThread !== undefined && { threadId: replyThread }), context: buildWhitelistedContext(enrichedContext), } - + result = await wakeGateway(gatewayName, gateway, payload) } - + if (DEBUG) { console.error(`[openclaw] wake ${event} -> ${gatewayName}: ${result.success ? "ok" : result.error}`) } - + return result } catch (error) { if (DEBUG) { @@ -125,7 +132,8 @@ export async function wakeOpenClaw( } export async function initializeOpenClaw(config: OpenClawConfig): Promise { - if (config.enabled && (config.discordBotToken || config.telegramBotToken)) { + const replyListener = config.replyListener + if (config.enabled && (replyListener?.discordBotToken || replyListener?.telegramBotToken)) { await startReplyListener(config) } } diff --git a/src/openclaw/reply-listener.ts b/src/openclaw/reply-listener.ts index fd07983b6..f6c8e015b 100644 --- a/src/openclaw/reply-listener.ts +++ b/src/openclaw/reply-listener.ts @@ -58,7 +58,7 @@ const STATE_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-state.json") const CONFIG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-config.json") const LOG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.log") -const DAEMON_IDENTITY_MARKER = "pollLoop" +export const DAEMON_IDENTITY_MARKER = "--openclaw-reply-listener-daemon" function createMinimalDaemonEnv(): Record { const env: Record = {} @@ -114,6 +114,10 @@ function log(message: string): void { } } +export function logReplyListenerMessage(message: string): void { + log(message) +} + interface DaemonState { isRunning: boolean pid: number | null @@ -255,22 +259,23 @@ async function injectReply( platform: string, config: OpenClawConfig, ): Promise { + const replyListener = config.replyListener const content = await captureTmuxPane(paneId, 15) const analysis = analyzePaneContent(content) - + if (analysis.confidence < 0.3) { // Lower threshold for simple check log( - `WARN: Pane ${paneId} does not appear to be running Codex CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`, + `WARN: Pane ${paneId} does not appear to be running OpenCode CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`, ) removeMessagesByPane(paneId) return false } - - const prefix = config.includePrefix ? `[reply:${platform}] ` : "" + + const prefix = replyListener?.includePrefix === false ? "" : `[reply:${platform}] ` const sanitized = sanitizeReplyInput(prefix + text) - const truncated = sanitized.slice(0, config.maxMessageLength) + const truncated = sanitized.slice(0, replyListener?.maxMessageLength ?? 500) const success = await sendToPane(paneId, truncated, true) - + if (success) { log( `Injected reply from ${platform} into pane ${paneId}: "${truncated.slice(0, 50)}${truncated.length > 50 ? "..." : ""}"`, @@ -288,30 +293,36 @@ async function pollDiscord( state: DaemonState, rateLimiter: RateLimiter, ): Promise { - if (!config.discordBotToken || !config.discordChannelId) return - if (!config.authorizedDiscordUserIds || config.authorizedDiscordUserIds.length === 0) return + const replyListener = config.replyListener + if (!replyListener?.discordBotToken || !replyListener.discordChannelId) return + if ( + !replyListener.authorizedDiscordUserIds + || replyListener.authorizedDiscordUserIds.length === 0 + ) { + return + } if (Date.now() < discordBackoffUntil) return try { const after = state.discordLastMessageId ? `?after=${state.discordLastMessageId}&limit=10` : "?limit=10" - const url = `https://discord.com/api/v10/channels/${config.discordChannelId}/messages${after}` - + const url = `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages${after}` + const controller = new AbortController() const timeout = setTimeout(() => controller.abort(), 10000) - + const response = await fetch(url, { method: "GET", - headers: { Authorization: `Bot ${config.discordBotToken}` }, + headers: { Authorization: `Bot ${replyListener.discordBotToken}` }, signal: controller.signal, }) - + clearTimeout(timeout) - + const remaining = response.headers.get("x-ratelimit-remaining") const reset = response.headers.get("x-ratelimit-reset") - + if (remaining !== null && parseInt(remaining, 10) < 2) { const parsed = reset ? parseFloat(reset) : Number.NaN const resetTime = Number.isFinite(parsed) ? parsed * 1000 : Date.now() + 10000 @@ -320,37 +331,37 @@ async function pollDiscord( `WARN: Discord rate limit low (remaining: ${remaining}), backing off until ${new Date(resetTime).toISOString()}`, ) } - + if (!response.ok) { log(`Discord API error: HTTP ${response.status}`) return } - + const messages = await response.json() if (!Array.isArray(messages) || messages.length === 0) return - + const sorted = [...messages].reverse() - + for (const msg of sorted) { if (!msg.message_reference?.message_id) { state.discordLastMessageId = msg.id writeDaemonState(state) continue } - - if (!config.authorizedDiscordUserIds.includes(msg.author.id)) { + + if (!replyListener.authorizedDiscordUserIds.includes(msg.author.id)) { state.discordLastMessageId = msg.id writeDaemonState(state) continue } - + const mapping = lookupByMessageId("discord-bot", msg.message_reference.message_id) if (!mapping) { state.discordLastMessageId = msg.id writeDaemonState(state) continue } - + if (!rateLimiter.canProceed()) { log(`WARN: Rate limit exceeded, dropping Discord message ${msg.id}`) state.discordLastMessageId = msg.id @@ -358,21 +369,21 @@ async function pollDiscord( state.errors++ continue } - + state.discordLastMessageId = msg.id writeDaemonState(state) - + const success = await injectReply(mapping.tmuxPaneId, msg.content, "discord", config) - + if (success) { state.messagesInjected++ // Add reaction try { await fetch( - `https://discord.com/api/v10/channels/${config.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`, + `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`, { method: "PUT", - headers: { Authorization: `Bot ${config.discordBotToken}` }, + headers: { Authorization: `Bot ${replyListener.discordBotToken}` }, }, ) } catch { @@ -394,30 +405,31 @@ async function pollTelegram( state: DaemonState, rateLimiter: RateLimiter, ): Promise { - if (!config.telegramBotToken || !config.telegramChatId) return + const replyListener = config.replyListener + if (!replyListener?.telegramBotToken || !replyListener.telegramChatId) return try { const offset = state.telegramLastUpdateId ? state.telegramLastUpdateId + 1 : 0 - const url = `https://api.telegram.org/bot${config.telegramBotToken}/getUpdates?offset=${offset}&timeout=0` - + const url = `https://api.telegram.org/bot${replyListener.telegramBotToken}/getUpdates?offset=${offset}&timeout=0` + const controller = new AbortController() const timeout = setTimeout(() => controller.abort(), 10000) - + const response = await fetch(url, { method: "GET", signal: controller.signal, }) - + clearTimeout(timeout) - + if (!response.ok) { log(`Telegram API error: HTTP ${response.status}`) return } - + const body = await response.json() as any const updates = body.result || [] - + for (const update of updates) { const msg = update.message if (!msg) { @@ -431,27 +443,27 @@ async function pollTelegram( writeDaemonState(state) continue } - - if (String(msg.chat.id) !== config.telegramChatId) { + + if (String(msg.chat.id) !== replyListener.telegramChatId) { state.telegramLastUpdateId = update.update_id writeDaemonState(state) continue } - + const mapping = lookupByMessageId("telegram", String(msg.reply_to_message.message_id)) if (!mapping) { state.telegramLastUpdateId = update.update_id writeDaemonState(state) continue } - + const text = msg.text || "" if (!text) { state.telegramLastUpdateId = update.update_id writeDaemonState(state) continue } - + if (!rateLimiter.canProceed()) { log(`WARN: Rate limit exceeded, dropping Telegram message ${msg.message_id}`) state.telegramLastUpdateId = update.update_id @@ -459,22 +471,22 @@ async function pollTelegram( state.errors++ continue } - + state.telegramLastUpdateId = update.update_id writeDaemonState(state) - + const success = await injectReply(mapping.tmuxPaneId, text, "telegram", config) - + if (success) { state.messagesInjected++ try { await fetch( - `https://api.telegram.org/bot${config.telegramBotToken}/sendMessage`, + `https://api.telegram.org/bot${replyListener.telegramBotToken}/sendMessage`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ - chat_id: config.telegramChatId, + chat_id: replyListener.telegramChatId, text: "Injected into Codex CLI session.", reply_to_message_id: msg.message_id, }), @@ -517,10 +529,10 @@ export async function pollLoop(): Promise { state.isRunning = true state.pid = process.pid - - const rateLimiter = new RateLimiter(config.rateLimitPerMinute || 10) + + const rateLimiter = new RateLimiter(config.replyListener?.rateLimitPerMinute || 10) let lastPruneAt = Date.now() - + const shutdown = (): void => { log("Shutdown signal received") state.isRunning = false @@ -528,10 +540,10 @@ export async function pollLoop(): Promise { removePidFile() process.exit(0) } - + process.on("SIGTERM", shutdown) process.on("SIGINT", shutdown) - + try { pruneStale() log("Pruned stale registry entries") @@ -554,15 +566,19 @@ export async function pollLoop(): Promise { log(`WARN: Prune failed: ${e instanceof Error ? e.message : String(e)}`) } } - + writeDaemonState(state) - await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs || 3000)) + await new Promise((resolve) => + setTimeout(resolve, config.replyListener?.pollIntervalMs || 3000), + ) } catch (error) { state.errors++ state.lastError = error instanceof Error ? error.message : String(error) log(`Poll error: ${state.lastError}`) writeDaemonState(state) - await new Promise((resolve) => setTimeout(resolve, (config.pollIntervalMs || 3000) * 2)) + await new Promise((resolve) => + setTimeout(resolve, (config.replyListener?.pollIntervalMs || 3000) * 2), + ) } } log("Poll loop ended") @@ -577,16 +593,17 @@ export async function startReplyListener(config: OpenClawConfig): Promise<{ succ state: state || undefined, } } - + if (!(await isTmuxAvailable())) { return { success: false, message: "tmux not available - reply injection requires tmux", } } - + const normalizedConfig = normalizeReplyListenerConfig(config) - if (!normalizedConfig.discordBotToken && !normalizedConfig.telegramBotToken) { + const replyListener = normalizedConfig.replyListener + if (!replyListener?.discordBotToken && !replyListener?.telegramBotToken) { // Only warn if no platforms enabled, but user might just want outbound // Actually, instructions say: "Fire-and-forget for outbound, daemon process for inbound" // So if no inbound config, we shouldn't start daemon. @@ -595,18 +612,18 @@ export async function startReplyListener(config: OpenClawConfig): Promise<{ succ message: "No enabled reply listener platforms configured (missing bot tokens/channels)", } } - + writeDaemonConfig(normalizedConfig) ensureStateDir() - + const currentFile = import.meta.url const isTs = currentFile.endsWith(".ts") - const daemonScript = isTs + const daemonScript = isTs ? join(dirname(new URL(currentFile).pathname), "daemon.ts") : join(dirname(new URL(currentFile).pathname), "daemon.js") - + try { - const proc = spawn(["bun", "run", daemonScript], { + const proc = spawn(["bun", "run", daemonScript, DAEMON_IDENTITY_MARKER], { detached: true, stdio: ["ignore", "ignore", "ignore"], cwd: process.cwd(), diff --git a/src/openclaw/session-registry.ts b/src/openclaw/session-registry.ts index 6fe35b7a5..4f0b37979 100644 --- a/src/openclaw/session-registry.ts +++ b/src/openclaw/session-registry.ts @@ -11,11 +11,12 @@ import { constants, } from "fs" import { join, dirname } from "path" -import { homedir } from "os" import { randomUUID } from "crypto" +import { getOpenCodeStorageDir } from "../shared/data-path" -const REGISTRY_PATH = join(homedir(), ".omx", "state", "reply-session-registry.jsonl") -const REGISTRY_LOCK_PATH = join(homedir(), ".omx", "state", "reply-session-registry.lock") +const OPENCLAW_STORAGE_DIR = join(getOpenCodeStorageDir(), "openclaw") +const REGISTRY_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.jsonl") +const REGISTRY_LOCK_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.lock") const SECURE_FILE_MODE = 0o600 const MAX_AGE_MS = 24 * 60 * 60 * 1000 const LOCK_TIMEOUT_MS = 2000 @@ -120,12 +121,26 @@ function acquireRegistryLock(): LockHandle | null { constants.O_CREAT | constants.O_EXCL | constants.O_WRONLY, SECURE_FILE_MODE, ) - const lockPayload = JSON.stringify({ - pid: process.pid, - acquiredAt: Date.now(), - token, - }) - writeSync(fd, lockPayload) + try { + const lockPayload = JSON.stringify({ + pid: process.pid, + acquiredAt: Date.now(), + token, + }) + writeSync(fd, lockPayload) + } catch (writeError) { + try { + closeSync(fd) + } catch { + // Ignore + } + try { + unlinkSync(REGISTRY_LOCK_PATH) + } catch { + // Ignore + } + throw writeError + } return { fd, token } } catch (error) { const err = error as NodeJS.ErrnoException diff --git a/src/openclaw/tmux.ts b/src/openclaw/tmux.ts index 7f1024e4a..663ac040a 100644 --- a/src/openclaw/tmux.ts +++ b/src/openclaw/tmux.ts @@ -14,7 +14,9 @@ export async function getTmuxSessionName(): Promise { stdout: "pipe", stderr: "ignore", }) - const output = await new Response(proc.stdout).text() + const outputPromise = new Response(proc.stdout).text() + await proc.exited + const output = await outputPromise if (proc.exitCode !== 0) return null return output.trim() || null } catch { @@ -31,7 +33,9 @@ export async function captureTmuxPane(paneId: string, lines = 15): Promise { try { - const proc = spawn(["tmux", "send-keys", "-t", paneId, text, ...(confirm ? ["Enter"] : [])], { + const literalProc = spawn(["tmux", "send-keys", "-t", paneId, "-l", "--", text], { stdout: "ignore", stderr: "ignore", }) - await proc.exited - return proc.exitCode === 0 + await literalProc.exited + if (literalProc.exitCode !== 0) return false + + if (!confirm) return true + + const enterProc = spawn(["tmux", "send-keys", "-t", paneId, "Enter"], { + stdout: "ignore", + stderr: "ignore", + }) + await enterProc.exited + return enterProc.exitCode === 0 } catch { return false } @@ -67,18 +80,11 @@ export async function isTmuxAvailable(): Promise { export function analyzePaneContent(content: string | null): { confidence: number } { if (!content) return { confidence: 0 } - // Simple heuristic: check for common CLI prompts or output - // Reference implementation had more logic, but for now simple check is okay - // Ideally, I should port the reference logic. - // Reference logic: - // if (content.includes("opencode")) confidence += 0.5 - // if (content.includes("How can I help you?")) confidence += 0.8 - // etc. - + let confidence = 0 if (content.includes("opencode")) confidence += 0.3 - if (content.includes("How can I help you?")) confidence += 0.5 - if (content.includes("Type /help")) confidence += 0.2 - + if (content.includes("Ask anything...")) confidence += 0.5 + if (content.includes("Run /help")) confidence += 0.2 + return { confidence: Math.min(1, confidence) } } diff --git a/src/openclaw/types.ts b/src/openclaw/types.ts index 5db89d6dc..b05325da2 100644 --- a/src/openclaw/types.ts +++ b/src/openclaw/types.ts @@ -1,6 +1,16 @@ -import type { OpenClawConfig, OpenClawGateway, OpenClawHook } from "../config/schema/openclaw" +import type { + OpenClawConfig, + OpenClawGateway, + OpenClawHook, + OpenClawReplyListenerConfig, +} from "../config/schema/openclaw" -export type { OpenClawConfig, OpenClawGateway, OpenClawHook } +export type { + OpenClawConfig, + OpenClawGateway, + OpenClawHook, + OpenClawReplyListenerConfig, +} export interface OpenClawContext { sessionId?: string