Merge pull request #2620 from code-yeongyu/feat/openclaw-bidirectional

feat: port OpenClaw bidirectional integration from omx
This commit is contained in:
YeonGyu-Kim
2026-03-19 10:47:07 +09:00
committed by GitHub
16 changed files with 2049 additions and 0 deletions

1
.gitignore vendored
View File

@@ -36,3 +36,4 @@ test-injection/
notepad.md
oauth-success.html
*.bun-build
.omx/

View File

@@ -12,6 +12,7 @@ import { BuiltinCommandNameSchema } from "./commands"
import { ExperimentalConfigSchema } from "./experimental"
import { GitMasterConfigSchema } from "./git-master"
import { NotificationConfigSchema } from "./notification"
import { OpenClawConfigSchema } from "./openclaw"
import { RalphLoopConfigSchema } from "./ralph-loop"
import { RuntimeFallbackConfigSchema } from "./runtime-fallback"
import { SkillsConfigSchema } from "./skills"
@@ -55,6 +56,7 @@ export const OhMyOpenCodeConfigSchema = z.object({
runtime_fallback: z.union([z.boolean(), RuntimeFallbackConfigSchema]).optional(),
background_task: BackgroundTaskConfigSchema.optional(),
notification: NotificationConfigSchema.optional(),
openclaw: OpenClawConfigSchema.optional(),
babysitting: BabysittingConfigSchema.optional(),
git_master: GitMasterConfigSchema.optional(),
browser_automation_engine: BrowserAutomationConfigSchema.optional(),

View File

@@ -0,0 +1,50 @@
import { z } from "zod"
export const OpenClawGatewaySchema = z.object({
type: z.enum(["http", "command"]).default("http"),
// HTTP specific
url: z.string().optional(),
method: z.string().default("POST"),
headers: z.record(z.string(), z.string()).optional(),
// Command specific
command: z.string().optional(),
// Shared
timeout: z.number().optional(),
})
export const OpenClawHookSchema = z.object({
enabled: z.boolean().default(true),
gateway: z.string(),
instruction: z.string(),
})
export const OpenClawReplyListenerConfigSchema = z.object({
discordBotToken: z.string().optional(),
discordChannelId: z.string().optional(),
discordMention: z.string().optional(), // For allowed_mentions
authorizedDiscordUserIds: z.array(z.string()).default([]),
telegramBotToken: z.string().optional(),
telegramChatId: z.string().optional(),
pollIntervalMs: z.number().default(3000),
rateLimitPerMinute: z.number().default(10),
maxMessageLength: z.number().default(500),
includePrefix: z.boolean().default(true),
})
export const OpenClawConfigSchema = z.object({
enabled: z.boolean().default(false),
// Outbound Configuration
gateways: z.record(z.string(), OpenClawGatewaySchema).default({}),
hooks: z.record(z.string(), OpenClawHookSchema).default({}),
// Inbound Configuration (Reply Listener)
replyListener: OpenClawReplyListenerConfigSchema.optional(),
})
export type OpenClawConfig = z.infer<typeof OpenClawConfigSchema>
export type OpenClawGateway = z.infer<typeof OpenClawGatewaySchema>
export type OpenClawHook = z.infer<typeof OpenClawHookSchema>
export type OpenClawReplyListenerConfig = z.infer<typeof OpenClawReplyListenerConfigSchema>

View File

@@ -0,0 +1,82 @@
import { beforeEach, describe, expect, mock, test } from "bun:test"
const wakeOpenClawMock = mock(async () => null)
mock.module("../openclaw", () => ({
wakeOpenClaw: wakeOpenClawMock,
}))
describe("createOpenClawHook", () => {
beforeEach(() => {
wakeOpenClawMock.mockClear()
})
test("maps session.created to session-start", async () => {
const { createOpenClawHook } = await import("./openclaw")
const hook = createOpenClawHook(
{ directory: "/tmp/project" } as any,
{ openclaw: { enabled: true } } as any,
)
await hook?.event?.({
event: {
type: "session.created",
properties: { sessionID: "session-1" },
},
})
expect(wakeOpenClawMock).toHaveBeenCalledWith(
expect.anything(),
"session-start",
expect.objectContaining({
projectPath: "/tmp/project",
sessionId: "session-1",
}),
)
})
test("uses tool.execute.before for question tools", async () => {
const { createOpenClawHook } = await import("./openclaw")
const hook = createOpenClawHook(
{ directory: "/tmp/project" } as any,
{ openclaw: { enabled: true } } as any,
)
await hook?.["tool.execute.before"]?.(
{ tool: "ask_user_question", sessionID: "session-2" },
{ args: { questions: [{ question: "Need approval?", options: [{ label: "Yes" }] }] } },
)
expect(wakeOpenClawMock).toHaveBeenCalledWith(
expect.anything(),
"ask-user-question",
expect.objectContaining({
projectPath: "/tmp/project",
question: "Need approval?",
sessionId: "session-2",
}),
)
})
test("falls back to args.question string when questions array absent", async () => {
const { createOpenClawHook } = await import("./openclaw")
const hook = createOpenClawHook(
{ directory: "/tmp/project" } as any,
{ openclaw: { enabled: true } } as any,
)
await hook?.["tool.execute.before"]?.(
{ tool: "question", sessionID: "session-3" },
{ args: { question: "Fallback?" } },
)
expect(wakeOpenClawMock).toHaveBeenCalledWith(
expect.anything(),
"ask-user-question",
expect.objectContaining({
question: "Fallback?",
sessionId: "session-3",
}),
)
})
})

66
src/hooks/openclaw.ts Normal file
View File

@@ -0,0 +1,66 @@
import type { PluginContext } from "../plugin/types"
import type { OhMyOpenCodeConfig } from "../config"
import { wakeOpenClaw } from "../openclaw"
import type { OpenClawContext } from "../openclaw/types"
export function createOpenClawHook(
ctx: PluginContext,
pluginConfig: OhMyOpenCodeConfig,
) {
const config = pluginConfig.openclaw
if (!config?.enabled) return null
const handleWake = async (event: string, context: OpenClawContext) => {
await wakeOpenClaw(config, event, context)
}
return {
event: async (input: any) => {
const { event } = input
const props = event.properties || {}
const sessionID = props.sessionID || props.info?.id
const context: OpenClawContext = {
sessionId: sessionID,
projectPath: ctx.directory,
}
if (event.type === "session.created") {
await handleWake("session-start", context)
} else if (event.type === "session.deleted") {
await handleWake("session-end", context)
} else if (event.type === "session.idle") {
// Check if we are waiting for user input (ask-user-question)
// This is heuristic. If the last message was from assistant and ended with a question?
// Or if the system is idle.
await handleWake("session-idle", context)
}
},
"tool.execute.before": async (
input: { tool: string; sessionID: string },
output: { args: Record<string, unknown> },
) => {
const normalizedToolName = input.tool.toLowerCase()
if (
normalizedToolName !== "question"
&& normalizedToolName !== "ask_user_question"
&& normalizedToolName !== "askuserquestion"
) {
return
}
// question tool uses args.questions array, not args.question
const questions = Array.isArray(output.args.questions) ? output.args.questions : []
const question = questions.length > 0 && typeof questions[0]?.question === "string"
? questions[0].question
: typeof output.args.question === "string" ? output.args.question : undefined
const context: OpenClawContext = {
sessionId: input.sessionID,
projectPath: ctx.directory,
question,
}
await handleWake("ask-user-question", context)
},
}
}

View File

@@ -0,0 +1,115 @@
import { describe, expect, test } from "bun:test"
import { resolveGateway, validateGatewayUrl, normalizeReplyListenerConfig } from "../config"
import type { OpenClawConfig } from "../types"
import { OpenClawConfigSchema } from "../../config/schema/openclaw"
describe("OpenClaw Config", () => {
test("resolveGateway resolves HTTP gateway", () => {
const config: OpenClawConfig = {
enabled: true,
gateways: {
discord: {
type: "http",
url: "https://discord.com/api/webhooks/123",
},
},
hooks: {
"session-start": {
enabled: true,
gateway: "discord",
instruction: "Started session {{sessionId}}",
},
},
} as any
const resolved = resolveGateway(config, "session-start")
expect(resolved).not.toBeNull()
expect(resolved?.gatewayName).toBe("discord")
expect(resolved?.gateway.url).toBe("https://discord.com/api/webhooks/123")
expect(resolved?.instruction).toBe("Started session {{sessionId}}")
})
test("resolveGateway returns null for disabled config", () => {
const config: OpenClawConfig = {
enabled: false,
gateways: {},
hooks: {},
} as any
expect(resolveGateway(config, "session-start")).toBeNull()
})
test("resolveGateway returns null for unknown hook", () => {
const config: OpenClawConfig = {
enabled: true,
gateways: {},
hooks: {},
} as any
expect(resolveGateway(config, "unknown")).toBeNull()
})
test("resolveGateway returns null for disabled hook", () => {
const config: OpenClawConfig = {
enabled: true,
gateways: { g: { type: "http", url: "https://example.com" } },
hooks: {
event: { enabled: false, gateway: "g", instruction: "i" },
},
} as any
expect(resolveGateway(config, "event")).toBeNull()
})
test("validateGatewayUrl allows HTTPS", () => {
expect(validateGatewayUrl("https://example.com")).toBe(true)
})
test("validateGatewayUrl rejects HTTP remote", () => {
expect(validateGatewayUrl("http://example.com")).toBe(false)
})
test("validateGatewayUrl allows HTTP localhost", () => {
expect(validateGatewayUrl("http://localhost:3000")).toBe(true)
expect(validateGatewayUrl("http://127.0.0.1:3000")).toBe(true)
})
test("normalizeReplyListenerConfig normalizes nested reply listener fields", () => {
const config = normalizeReplyListenerConfig({
enabled: true,
gateways: {},
hooks: {},
replyListener: {
discordBotToken: "discord-token",
discordChannelId: "channel-id",
authorizedDiscordUserIds: ["user-1", "", "user-2"],
pollIntervalMs: 100,
rateLimitPerMinute: 0,
maxMessageLength: 9000,
includePrefix: false,
},
} as OpenClawConfig)
expect(config.replyListener).toEqual({
discordBotToken: "discord-token",
discordChannelId: "channel-id",
authorizedDiscordUserIds: ["user-1", "user-2"],
pollIntervalMs: 500,
rateLimitPerMinute: 1,
maxMessageLength: 4000,
includePrefix: false,
})
})
test("gateway timeout remains optional so env fallback can apply", () => {
const parsed = OpenClawConfigSchema.parse({
enabled: true,
gateways: {
command: {
type: "command",
command: "echo hi",
},
},
hooks: {},
})
expect(parsed.gateways.command.timeout).toBeUndefined()
})
})

View File

@@ -0,0 +1,70 @@
import { describe, expect, test, mock, spyOn } from "bun:test"
import {
interpolateInstruction,
resolveCommandTimeoutMs,
shellEscapeArg,
wakeGateway,
wakeCommandGateway,
} from "../dispatcher"
describe("OpenClaw Dispatcher", () => {
test("interpolateInstruction replaces variables", () => {
const template = "Hello {{name}}, welcome to {{place}}!"
const variables = { name: "World", place: "Bun" }
expect(interpolateInstruction(template, variables)).toBe(
"Hello World, welcome to Bun!",
)
})
test("interpolateInstruction handles missing variables", () => {
const template = "Hello {{name}}!"
const variables = {}
expect(interpolateInstruction(template, variables)).toBe("Hello !")
})
test("shellEscapeArg escapes single quotes", () => {
expect(shellEscapeArg("foo'bar")).toBe("'foo'\\''bar'")
expect(shellEscapeArg("simple")).toBe("'simple'")
})
test("wakeGateway sends POST request", async () => {
const fetchSpy = spyOn(global, "fetch").mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200 }),
)
try {
const result = await wakeGateway(
"test",
{ url: "https://example.com", method: "POST", timeout: 1000, type: "http" },
{ foo: "bar" },
)
expect(result.success).toBe(true)
expect(fetchSpy).toHaveBeenCalled()
const call = fetchSpy.mock.calls[0]
expect(call[0]).toBe("https://example.com")
expect(call[1]?.method).toBe("POST")
expect(call[1]?.body).toBe('{"foo":"bar"}')
} finally {
fetchSpy.mockRestore()
}
})
test("wakeGateway fails on invalid URL", async () => {
const result = await wakeGateway("test", { url: "http://example.com", method: "POST", timeout: 1000, type: "http" }, {})
expect(result.success).toBe(false)
expect(result.error).toContain("Invalid URL")
})
test("resolveCommandTimeoutMs reads OMO env fallback", () => {
const original = process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = "4321"
try {
// Call without explicit envTimeoutRaw so the function reads from process.env itself
expect(resolveCommandTimeoutMs(undefined)).toBe(4321)
} finally {
if (original === undefined) delete process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
else process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = original
}
})
})

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from "bun:test"
import { analyzePaneContent } from "../tmux"
describe("openclaw tmux helpers", () => {
test("analyzePaneContent recognizes the opencode welcome prompt", () => {
const content = "opencode\nAsk anything...\nRun /help"
expect(analyzePaneContent(content).confidence).toBeGreaterThanOrEqual(1)
})
test("analyzePaneContent returns zero confidence for empty content", () => {
expect(analyzePaneContent(null).confidence).toBe(0)
})
})

120
src/openclaw/config.ts Normal file
View File

@@ -0,0 +1,120 @@
import type {
OpenClawConfig,
OpenClawGateway,
OpenClawReplyListenerConfig,
} from "./types"
const DEFAULT_REPLY_POLL_INTERVAL_MS = 3000
const MIN_REPLY_POLL_INTERVAL_MS = 500
const MAX_REPLY_POLL_INTERVAL_MS = 60000
const DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE = 10
const MIN_REPLY_RATE_LIMIT_PER_MINUTE = 1
const DEFAULT_REPLY_MAX_MESSAGE_LENGTH = 500
const MIN_REPLY_MAX_MESSAGE_LENGTH = 1
const MAX_REPLY_MAX_MESSAGE_LENGTH = 4000
function normalizeInteger(
value: unknown,
fallback: number,
min: number,
max?: number,
): number {
const numeric =
typeof value === "number"
? Math.trunc(value)
: typeof value === "string" && value.trim()
? Number.parseInt(value, 10)
: Number.NaN
if (!Number.isFinite(numeric)) return fallback
if (numeric < min) return min
if (max !== undefined && numeric > max) return max
return numeric
}
export function normalizeReplyListenerConfig(config: OpenClawConfig): OpenClawConfig {
const replyListener = config.replyListener
if (!replyListener) return config
const normalizedReplyListener: OpenClawReplyListenerConfig = {
...replyListener,
discordBotToken: replyListener.discordBotToken,
discordChannelId: replyListener.discordChannelId,
telegramBotToken: replyListener.telegramBotToken,
telegramChatId: replyListener.telegramChatId,
pollIntervalMs: normalizeInteger(
replyListener.pollIntervalMs,
DEFAULT_REPLY_POLL_INTERVAL_MS,
MIN_REPLY_POLL_INTERVAL_MS,
MAX_REPLY_POLL_INTERVAL_MS,
),
rateLimitPerMinute: normalizeInteger(
replyListener.rateLimitPerMinute,
DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE,
MIN_REPLY_RATE_LIMIT_PER_MINUTE,
),
maxMessageLength: normalizeInteger(
replyListener.maxMessageLength,
DEFAULT_REPLY_MAX_MESSAGE_LENGTH,
MIN_REPLY_MAX_MESSAGE_LENGTH,
MAX_REPLY_MAX_MESSAGE_LENGTH,
),
includePrefix: replyListener.includePrefix !== false,
authorizedDiscordUserIds: Array.isArray(replyListener.authorizedDiscordUserIds)
? replyListener.authorizedDiscordUserIds.filter(
(id) => typeof id === "string" && id.trim() !== "",
)
: [],
}
return {
...config,
replyListener: normalizedReplyListener,
}
}
export function resolveGateway(
config: OpenClawConfig,
event: string,
): { gatewayName: string; gateway: OpenClawGateway; instruction: string } | null {
if (!config.enabled) return null
const mapping = config.hooks[event]
if (!mapping || !mapping.enabled) {
return null
}
const gateway = config.gateways[mapping.gateway]
if (!gateway) {
return null
}
// Validate based on gateway type
if (gateway.type === "command") {
if (!gateway.command) return null
} else {
// HTTP gateway
if (!gateway.url) return null
}
return { gatewayName: mapping.gateway, gateway, instruction: mapping.instruction }
}
export function validateGatewayUrl(url: string): boolean {
try {
const parsed = new URL(url)
if (parsed.protocol === "https:") return true
if (
parsed.protocol === "http:" &&
(parsed.hostname === "localhost" ||
parsed.hostname === "127.0.0.1" ||
parsed.hostname === "::1" ||
parsed.hostname === "[::1]")
) {
return true
}
return false
} catch {
return false
}
}

9
src/openclaw/daemon.ts Normal file
View File

@@ -0,0 +1,9 @@
import { pollLoop, logReplyListenerMessage } from "./reply-listener"
pollLoop().catch((err) => {
logReplyListenerMessage(
`FATAL: reply listener daemon crashed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`,
)
console.error(err)
process.exit(1)
})

180
src/openclaw/dispatcher.ts Normal file
View File

@@ -0,0 +1,180 @@
import { spawn } from "bun"
import type { OpenClawGateway } from "./types"
const DEFAULT_HTTP_TIMEOUT_MS = 10_000
const DEFAULT_COMMAND_TIMEOUT_MS = 5_000
const MIN_COMMAND_TIMEOUT_MS = 100
const MAX_COMMAND_TIMEOUT_MS = 300_000
const SHELL_METACHAR_RE = /[|&;><`$()]/
export function validateGatewayUrl(url: string): boolean {
try {
const parsed = new URL(url)
if (parsed.protocol === "https:") return true
if (
parsed.protocol === "http:" &&
(parsed.hostname === "localhost" ||
parsed.hostname === "127.0.0.1" ||
parsed.hostname === "::1" ||
parsed.hostname === "[::1]")
) {
return true
}
return false
} catch {
return false
}
}
export function interpolateInstruction(
template: string,
variables: Record<string, string | undefined>,
): string {
return template.replace(/\{\{(\w+)\}\}/g, (_match, key) => {
return variables[key] ?? ""
})
}
export function shellEscapeArg(value: string): string {
return "'" + value.replace(/'/g, "'\\''") + "'"
}
export function resolveCommandTimeoutMs(
gatewayTimeout?: number,
envTimeoutRaw =
process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
?? process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS,
): number {
const parseFinite = (value: unknown): number | undefined => {
if (typeof value !== "number" || !Number.isFinite(value)) return undefined
return value
}
const parseEnv = (value?: string): number | undefined => {
if (!value) return undefined
const parsed = Number(value)
return Number.isFinite(parsed) ? parsed : undefined
}
const rawTimeout =
parseFinite(gatewayTimeout) ??
parseEnv(envTimeoutRaw) ??
DEFAULT_COMMAND_TIMEOUT_MS
return Math.min(
MAX_COMMAND_TIMEOUT_MS,
Math.max(MIN_COMMAND_TIMEOUT_MS, Math.trunc(rawTimeout)),
)
}
export async function wakeGateway(
gatewayName: string,
gatewayConfig: OpenClawGateway,
payload: unknown,
): Promise<{ gateway: string; success: boolean; error?: string; statusCode?: number }> {
if (!gatewayConfig.url || !validateGatewayUrl(gatewayConfig.url)) {
return {
gateway: gatewayName,
success: false,
error: "Invalid URL (HTTPS required)",
}
}
try {
const headers = {
"Content-Type": "application/json",
...gatewayConfig.headers,
}
const timeout = gatewayConfig.timeout ?? DEFAULT_HTTP_TIMEOUT_MS
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), timeout)
const response = await fetch(gatewayConfig.url, {
method: gatewayConfig.method || "POST",
headers,
body: JSON.stringify(payload),
signal: controller.signal,
}).finally(() => {
clearTimeout(timeoutId)
})
if (!response.ok) {
return {
gateway: gatewayName,
success: false,
error: `HTTP ${response.status}`,
statusCode: response.status,
}
}
return { gateway: gatewayName, success: true, statusCode: response.status }
} catch (error) {
return {
gateway: gatewayName,
success: false,
error: error instanceof Error ? error.message : "Unknown error",
}
}
}
export async function wakeCommandGateway(
gatewayName: string,
gatewayConfig: OpenClawGateway,
variables: Record<string, string | undefined>,
): Promise<{ gateway: string; success: boolean; error?: string }> {
if (!gatewayConfig.command) {
return {
gateway: gatewayName,
success: false,
error: "No command configured",
}
}
try {
const timeout = resolveCommandTimeoutMs(gatewayConfig.timeout)
// Interpolate variables with shell escaping
const interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => {
const value = variables[key]
if (value === undefined) return _match
return shellEscapeArg(value)
})
// Always use sh -c to handle the shell command string correctly
const proc = spawn(["sh", "-c", interpolated], {
env: { ...process.env },
stdout: "ignore",
stderr: "ignore",
})
// Handle timeout manually
let timeoutId: ReturnType<typeof setTimeout> | undefined
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
proc.kill()
reject(new Error("Command timed out"))
}, timeout)
})
try {
await Promise.race([proc.exited, timeoutPromise])
} finally {
if (timeoutId !== undefined) {
clearTimeout(timeoutId)
}
}
if (proc.exitCode !== 0) {
throw new Error(`Command exited with code ${proc.exitCode}`)
}
return { gateway: gatewayName, success: true }
} catch (error) {
return {
gateway: gatewayName,
success: false,
error: error instanceof Error ? error.message : "Unknown error",
}
}
}

141
src/openclaw/index.ts Normal file
View File

@@ -0,0 +1,141 @@
import { basename } from "path"
import { resolveGateway } from "./config"
import {
wakeGateway,
wakeCommandGateway,
interpolateInstruction,
} from "./dispatcher"
import { getCurrentTmuxSession, captureTmuxPane } from "./tmux"
import { startReplyListener, stopReplyListener } from "./reply-listener"
import type { OpenClawConfig, OpenClawContext, OpenClawPayload, WakeResult } from "./types"
const DEBUG =
process.env.OMO_OPENCLAW_DEBUG === "1"
|| process.env.OMX_OPENCLAW_DEBUG === "1"
function buildWhitelistedContext(context: OpenClawContext): OpenClawContext {
const result: OpenClawContext = {}
if (context.sessionId !== undefined) result.sessionId = context.sessionId
if (context.projectPath !== undefined) result.projectPath = context.projectPath
if (context.tmuxSession !== undefined) result.tmuxSession = context.tmuxSession
if (context.prompt !== undefined) result.prompt = context.prompt
if (context.contextSummary !== undefined) result.contextSummary = context.contextSummary
if (context.reasoning !== undefined) result.reasoning = context.reasoning
if (context.question !== undefined) result.question = context.question
if (context.tmuxTail !== undefined) result.tmuxTail = context.tmuxTail
if (context.replyChannel !== undefined) result.replyChannel = context.replyChannel
if (context.replyTarget !== undefined) result.replyTarget = context.replyTarget
if (context.replyThread !== undefined) result.replyThread = context.replyThread
return result
}
export async function wakeOpenClaw(
config: OpenClawConfig,
event: string,
context: OpenClawContext,
): Promise<WakeResult | null> {
try {
if (!config.enabled) return null
const resolved = resolveGateway(config, event)
if (!resolved) return null
const { gatewayName, gateway, instruction } = resolved
const now = new Date().toISOString()
const replyChannel = context.replyChannel ?? process.env.OPENCLAW_REPLY_CHANNEL
const replyTarget = context.replyTarget ?? process.env.OPENCLAW_REPLY_TARGET
const replyThread = context.replyThread ?? process.env.OPENCLAW_REPLY_THREAD
const enrichedContext: OpenClawContext = {
...context,
...(replyChannel !== undefined && { replyChannel }),
...(replyTarget !== undefined && { replyTarget }),
...(replyThread !== undefined && { replyThread }),
}
const tmuxSession = enrichedContext.tmuxSession ?? getCurrentTmuxSession() ?? undefined
let tmuxTail = enrichedContext.tmuxTail
if (!tmuxTail && (event === "stop" || event === "session-end") && process.env.TMUX) {
try {
const paneId = process.env.TMUX_PANE
if (paneId) {
tmuxTail = (await captureTmuxPane(paneId, 15)) ?? undefined
}
} catch (error) {
if (DEBUG) {
console.error(
"[openclaw] failed to capture tmux tail:",
error instanceof Error ? error.message : error,
)
}
}
}
const variables: Record<string, string | undefined> = {
sessionId: enrichedContext.sessionId,
projectPath: enrichedContext.projectPath,
projectName: enrichedContext.projectPath ? basename(enrichedContext.projectPath) : undefined,
tmuxSession,
prompt: enrichedContext.prompt,
contextSummary: enrichedContext.contextSummary,
reasoning: enrichedContext.reasoning,
question: enrichedContext.question,
tmuxTail,
event,
timestamp: now,
replyChannel,
replyTarget,
replyThread,
}
const interpolatedInstruction = interpolateInstruction(instruction, variables)
variables.instruction = interpolatedInstruction
let result: WakeResult
if (gateway.type === "command") {
result = await wakeCommandGateway(gatewayName, gateway, variables)
} else {
const payload: OpenClawPayload = {
event,
instruction: interpolatedInstruction,
text: interpolatedInstruction,
timestamp: now,
sessionId: enrichedContext.sessionId,
projectPath: enrichedContext.projectPath,
projectName: enrichedContext.projectPath ? basename(enrichedContext.projectPath) : undefined,
tmuxSession,
tmuxTail,
...(replyChannel !== undefined && { channel: replyChannel }),
...(replyTarget !== undefined && { to: replyTarget }),
...(replyThread !== undefined && { threadId: replyThread }),
context: buildWhitelistedContext(enrichedContext),
}
result = await wakeGateway(gatewayName, gateway, payload)
}
if (DEBUG) {
console.error(`[openclaw] wake ${event} -> ${gatewayName}: ${result.success ? "ok" : result.error}`)
}
return result
} catch (error) {
if (DEBUG) {
console.error(`[openclaw] wakeOpenClaw error:`, error instanceof Error ? error.message : error)
}
return null
}
}
export async function initializeOpenClaw(config: OpenClawConfig): Promise<void> {
const replyListener = config.replyListener
if (config.enabled && (replyListener?.discordBotToken || replyListener?.telegramBotToken)) {
await startReplyListener(config)
}
}
export { startReplyListener, stopReplyListener }

View File

@@ -0,0 +1,717 @@
import {
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
unlinkSync,
chmodSync,
statSync,
appendFileSync,
renameSync,
} from "fs"
import { join, dirname } from "path"
import { homedir } from "os"
import { spawn } from "bun" // Use bun spawn
import { captureTmuxPane, analyzePaneContent, sendToPane, isTmuxAvailable } from "./tmux"
import { lookupByMessageId, removeMessagesByPane, pruneStale } from "./session-registry"
import type { OpenClawConfig } from "./types"
import { normalizeReplyListenerConfig } from "./config"
const SECURE_FILE_MODE = 0o600
const MAX_LOG_SIZE_BYTES = 1 * 1024 * 1024
const DAEMON_ENV_ALLOWLIST = [
"PATH",
"HOME",
"USERPROFILE",
"USER",
"USERNAME",
"LOGNAME",
"LANG",
"LC_ALL",
"LC_CTYPE",
"TERM",
"TMUX",
"TMUX_PANE",
"TMPDIR",
"TMP",
"TEMP",
"XDG_RUNTIME_DIR",
"XDG_DATA_HOME",
"XDG_CONFIG_HOME",
"SHELL",
"NODE_ENV",
"HTTP_PROXY",
"HTTPS_PROXY",
"http_proxy",
"https_proxy",
"NO_PROXY",
"no_proxy",
"SystemRoot",
"SYSTEMROOT",
"windir",
"COMSPEC",
]
const DEFAULT_STATE_DIR = join(homedir(), ".omx", "state")
const PID_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.pid")
const STATE_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-state.json")
const CONFIG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-config.json")
const LOG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.log")
export const DAEMON_IDENTITY_MARKER = "--openclaw-reply-listener-daemon"
function createMinimalDaemonEnv(): Record<string, string> {
const env: Record<string, string> = {}
for (const key of DAEMON_ENV_ALLOWLIST) {
if (process.env[key] !== undefined) {
env[key] = process.env[key] as string
}
}
return env
}
function ensureStateDir(): void {
if (!existsSync(DEFAULT_STATE_DIR)) {
mkdirSync(DEFAULT_STATE_DIR, { recursive: true, mode: 0o700 })
}
}
function writeSecureFile(filePath: string, content: string): void {
ensureStateDir()
writeFileSync(filePath, content, { mode: SECURE_FILE_MODE })
try {
chmodSync(filePath, SECURE_FILE_MODE)
} catch {
// Ignore
}
}
function rotateLogIfNeeded(logPath: string): void {
try {
if (!existsSync(logPath)) return
const stats = statSync(logPath)
if (stats.size > MAX_LOG_SIZE_BYTES) {
const backupPath = `${logPath}.old`
if (existsSync(backupPath)) {
unlinkSync(backupPath)
}
renameSync(logPath, backupPath)
}
} catch {
// Ignore
}
}
function log(message: string): void {
try {
ensureStateDir()
rotateLogIfNeeded(LOG_FILE_PATH)
const timestamp = new Date().toISOString()
const logLine = `[${timestamp}] ${message}\n`
appendFileSync(LOG_FILE_PATH, logLine, { mode: SECURE_FILE_MODE })
} catch {
// Ignore
}
}
export function logReplyListenerMessage(message: string): void {
log(message)
}
interface DaemonState {
isRunning: boolean
pid: number | null
startedAt: string
lastPollAt: string | null
telegramLastUpdateId: number | null
discordLastMessageId: string | null
messagesInjected: number
errors: number
lastError?: string
}
function readDaemonState(): DaemonState | null {
try {
if (!existsSync(STATE_FILE_PATH)) return null
const content = readFileSync(STATE_FILE_PATH, "utf-8")
return JSON.parse(content)
} catch {
return null
}
}
function writeDaemonState(state: DaemonState): void {
writeSecureFile(STATE_FILE_PATH, JSON.stringify(state, null, 2))
}
function readDaemonConfig(): OpenClawConfig | null {
try {
if (!existsSync(CONFIG_FILE_PATH)) return null
const content = readFileSync(CONFIG_FILE_PATH, "utf-8")
return JSON.parse(content)
} catch {
return null
}
}
function writeDaemonConfig(config: OpenClawConfig): void {
writeSecureFile(CONFIG_FILE_PATH, JSON.stringify(config, null, 2))
}
function readPidFile(): number | null {
try {
if (!existsSync(PID_FILE_PATH)) return null
const content = readFileSync(PID_FILE_PATH, "utf-8")
const pid = parseInt(content.trim(), 10)
if (Number.isNaN(pid)) return null
return pid
} catch {
return null
}
}
function writePidFile(pid: number): void {
writeSecureFile(PID_FILE_PATH, String(pid))
}
function removePidFile(): void {
if (existsSync(PID_FILE_PATH)) {
unlinkSync(PID_FILE_PATH)
}
}
function isProcessRunning(pid: number): boolean {
try {
process.kill(pid, 0)
return true
} catch {
return false
}
}
export async function isReplyListenerProcess(pid: number): Promise<boolean> {
try {
if (process.platform === "linux") {
const cmdline = readFileSync(`/proc/${pid}/cmdline`, "utf-8")
return cmdline.includes(DAEMON_IDENTITY_MARKER)
}
// macOS
const proc = spawn(["ps", "-p", String(pid), "-o", "args="], {
stdout: "pipe",
stderr: "ignore",
})
const stdout = await new Response(proc.stdout).text()
if (proc.exitCode !== 0) return false
return stdout.includes(DAEMON_IDENTITY_MARKER)
} catch {
return false
}
}
export async function isDaemonRunning(): Promise<boolean> {
const pid = readPidFile()
if (pid === null) return false
if (!isProcessRunning(pid)) {
removePidFile()
return false
}
if (!(await isReplyListenerProcess(pid))) {
removePidFile()
return false
}
return true
}
// Input Sanitization
export function sanitizeReplyInput(text: string): string {
return text
.replace(/[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]/g, "")
.replace(/[\u200e\u200f\u202a-\u202e\u2066-\u2069]/g, "")
.replace(/\r?\n/g, " ")
.replace(/\\/g, "\\\\")
.replace(/`/g, "\\`")
.replace(/\$\(/g, "\\$(")
.replace(/\$\{/g, "\\${")
.trim()
}
class RateLimiter {
maxPerMinute: number
timestamps: number[] = []
windowMs = 60 * 1000
constructor(maxPerMinute: number) {
this.maxPerMinute = maxPerMinute
}
canProceed(): boolean {
const now = Date.now()
this.timestamps = this.timestamps.filter((t) => now - t < this.windowMs)
if (this.timestamps.length >= this.maxPerMinute) return false
this.timestamps.push(now)
return true
}
}
async function injectReply(
paneId: string,
text: string,
platform: string,
config: OpenClawConfig,
): Promise<boolean> {
const replyListener = config.replyListener
const content = await captureTmuxPane(paneId, 15)
const analysis = analyzePaneContent(content)
if (analysis.confidence < 0.3) { // Lower threshold for simple check
log(
`WARN: Pane ${paneId} does not appear to be running OpenCode CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`,
)
removeMessagesByPane(paneId)
return false
}
const prefix = replyListener?.includePrefix === false ? "" : `[reply:${platform}] `
const sanitized = sanitizeReplyInput(prefix + text)
const truncated = sanitized.slice(0, replyListener?.maxMessageLength ?? 500)
const success = await sendToPane(paneId, truncated, true)
if (success) {
log(
`Injected reply from ${platform} into pane ${paneId}: "${truncated.slice(0, 50)}${truncated.length > 50 ? "..." : ""}"`,
)
} else {
log(`ERROR: Failed to inject reply into pane ${paneId}`)
}
return success
}
let discordBackoffUntil = 0
async function pollDiscord(
config: OpenClawConfig,
state: DaemonState,
rateLimiter: RateLimiter,
): Promise<void> {
const replyListener = config.replyListener
if (!replyListener?.discordBotToken || !replyListener.discordChannelId) return
if (
!replyListener.authorizedDiscordUserIds
|| replyListener.authorizedDiscordUserIds.length === 0
) {
return
}
if (Date.now() < discordBackoffUntil) return
try {
const after = state.discordLastMessageId
? `?after=${state.discordLastMessageId}&limit=10`
: "?limit=10"
const url = `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages${after}`
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), 10000)
const response = await fetch(url, {
method: "GET",
headers: { Authorization: `Bot ${replyListener.discordBotToken}` },
signal: controller.signal,
})
clearTimeout(timeout)
const remaining = response.headers.get("x-ratelimit-remaining")
const reset = response.headers.get("x-ratelimit-reset")
if (remaining !== null && parseInt(remaining, 10) < 2) {
const parsed = reset ? parseFloat(reset) : Number.NaN
const resetTime = Number.isFinite(parsed) ? parsed * 1000 : Date.now() + 10000
discordBackoffUntil = resetTime
log(
`WARN: Discord rate limit low (remaining: ${remaining}), backing off until ${new Date(resetTime).toISOString()}`,
)
}
if (!response.ok) {
log(`Discord API error: HTTP ${response.status}`)
return
}
const messages = await response.json()
if (!Array.isArray(messages) || messages.length === 0) return
const sorted = [...messages].reverse()
for (const msg of sorted) {
if (!msg.message_reference?.message_id) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
if (!replyListener.authorizedDiscordUserIds.includes(msg.author.id)) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
const mapping = lookupByMessageId("discord-bot", msg.message_reference.message_id)
if (!mapping) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
if (!rateLimiter.canProceed()) {
log(`WARN: Rate limit exceeded, dropping Discord message ${msg.id}`)
state.discordLastMessageId = msg.id
writeDaemonState(state)
state.errors++
continue
}
state.discordLastMessageId = msg.id
writeDaemonState(state)
const success = await injectReply(mapping.tmuxPaneId, msg.content, "discord", config)
if (success) {
state.messagesInjected++
// Add reaction
try {
await fetch(
`https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`,
{
method: "PUT",
headers: { Authorization: `Bot ${replyListener.discordBotToken}` },
},
)
} catch {
// Ignore
}
} else {
state.errors++
}
}
} catch (error) {
state.errors++
state.lastError = error instanceof Error ? error.message : String(error)
log(`Discord polling error: ${state.lastError}`)
}
}
async function pollTelegram(
config: OpenClawConfig,
state: DaemonState,
rateLimiter: RateLimiter,
): Promise<void> {
const replyListener = config.replyListener
if (!replyListener?.telegramBotToken || !replyListener.telegramChatId) return
try {
const offset = state.telegramLastUpdateId ? state.telegramLastUpdateId + 1 : 0
const url = `https://api.telegram.org/bot${replyListener.telegramBotToken}/getUpdates?offset=${offset}&timeout=0`
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), 10000)
const response = await fetch(url, {
method: "GET",
signal: controller.signal,
})
clearTimeout(timeout)
if (!response.ok) {
log(`Telegram API error: HTTP ${response.status}`)
return
}
const body = await response.json() as any
const updates = body.result || []
for (const update of updates) {
const msg = update.message
if (!msg) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
if (!msg.reply_to_message?.message_id) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
if (String(msg.chat.id) !== replyListener.telegramChatId) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
const mapping = lookupByMessageId("telegram", String(msg.reply_to_message.message_id))
if (!mapping) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
const text = msg.text || ""
if (!text) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
if (!rateLimiter.canProceed()) {
log(`WARN: Rate limit exceeded, dropping Telegram message ${msg.message_id}`)
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
state.errors++
continue
}
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
const success = await injectReply(mapping.tmuxPaneId, text, "telegram", config)
if (success) {
state.messagesInjected++
try {
await fetch(
`https://api.telegram.org/bot${replyListener.telegramBotToken}/sendMessage`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
chat_id: replyListener.telegramChatId,
text: "Injected into Codex CLI session.",
reply_to_message_id: msg.message_id,
}),
},
)
} catch {
// Ignore
}
} else {
state.errors++
}
}
} catch (error) {
state.errors++
state.lastError = error instanceof Error ? error.message : String(error)
log(`Telegram polling error: ${state.lastError}`)
}
}
const PRUNE_INTERVAL_MS = 60 * 60 * 1000
export async function pollLoop(): Promise<void> {
log("Reply listener daemon starting poll loop")
const config = readDaemonConfig()
if (!config) {
log("ERROR: No daemon config found, exiting")
process.exit(1)
}
const state = readDaemonState() || {
isRunning: true,
pid: process.pid,
startedAt: new Date().toISOString(),
lastPollAt: null,
telegramLastUpdateId: null,
discordLastMessageId: null,
messagesInjected: 0,
errors: 0,
}
state.isRunning = true
state.pid = process.pid
const rateLimiter = new RateLimiter(config.replyListener?.rateLimitPerMinute || 10)
let lastPruneAt = Date.now()
const shutdown = (): void => {
log("Shutdown signal received")
state.isRunning = false
writeDaemonState(state)
removePidFile()
process.exit(0)
}
process.on("SIGTERM", shutdown)
process.on("SIGINT", shutdown)
try {
pruneStale()
log("Pruned stale registry entries")
} catch (e) {
log(`WARN: Failed to prune stale entries: ${e}`)
}
while (state.isRunning) {
try {
state.lastPollAt = new Date().toISOString()
await pollDiscord(config, state, rateLimiter)
await pollTelegram(config, state, rateLimiter)
if (Date.now() - lastPruneAt > PRUNE_INTERVAL_MS) {
try {
pruneStale()
lastPruneAt = Date.now()
log("Pruned stale registry entries")
} catch (e) {
log(`WARN: Prune failed: ${e instanceof Error ? e.message : String(e)}`)
}
}
writeDaemonState(state)
await new Promise((resolve) =>
setTimeout(resolve, config.replyListener?.pollIntervalMs || 3000),
)
} catch (error) {
state.errors++
state.lastError = error instanceof Error ? error.message : String(error)
log(`Poll error: ${state.lastError}`)
writeDaemonState(state)
await new Promise((resolve) =>
setTimeout(resolve, (config.replyListener?.pollIntervalMs || 3000) * 2),
)
}
}
log("Poll loop ended")
}
export async function startReplyListener(config: OpenClawConfig): Promise<{ success: boolean; message: string; state?: DaemonState; error?: string }> {
if (await isDaemonRunning()) {
const state = readDaemonState()
return {
success: true,
message: "Reply listener daemon is already running",
state: state || undefined,
}
}
if (!(await isTmuxAvailable())) {
return {
success: false,
message: "tmux not available - reply injection requires tmux",
}
}
const normalizedConfig = normalizeReplyListenerConfig(config)
const replyListener = normalizedConfig.replyListener
if (!replyListener?.discordBotToken && !replyListener?.telegramBotToken) {
// Only warn if no platforms enabled, but user might just want outbound
// Actually, instructions say: "Fire-and-forget for outbound, daemon process for inbound"
// So if no inbound config, we shouldn't start daemon.
return {
success: false,
message: "No enabled reply listener platforms configured (missing bot tokens/channels)",
}
}
writeDaemonConfig(normalizedConfig)
ensureStateDir()
const currentFile = import.meta.url
const isTs = currentFile.endsWith(".ts")
const daemonScript = isTs
? join(dirname(new URL(currentFile).pathname), "daemon.ts")
: join(dirname(new URL(currentFile).pathname), "daemon.js")
try {
const proc = spawn(["bun", "run", daemonScript, DAEMON_IDENTITY_MARKER], {
detached: true,
stdio: ["ignore", "ignore", "ignore"],
cwd: process.cwd(),
env: createMinimalDaemonEnv(),
})
proc.unref()
const pid = proc.pid
if (pid) {
writePidFile(pid)
const state: DaemonState = {
isRunning: true,
pid,
startedAt: new Date().toISOString(),
lastPollAt: null,
telegramLastUpdateId: null,
discordLastMessageId: null,
messagesInjected: 0,
errors: 0,
}
writeDaemonState(state)
log(`Reply listener daemon started with PID ${pid}`)
return {
success: true,
message: `Reply listener daemon started with PID ${pid}`,
state,
}
}
return {
success: false,
message: "Failed to start daemon process",
}
} catch (error) {
return {
success: false,
message: "Failed to start daemon",
error: error instanceof Error ? error.message : String(error),
}
}
}
export async function stopReplyListener(): Promise<{ success: boolean; message: string; state?: DaemonState; error?: string }> {
const pid = readPidFile()
if (pid === null) {
return {
success: true,
message: "Reply listener daemon is not running",
}
}
if (!isProcessRunning(pid)) {
removePidFile()
return {
success: true,
message: "Reply listener daemon was not running (cleaned up stale PID file)",
}
}
if (!(await isReplyListenerProcess(pid))) {
removePidFile()
return {
success: false,
message: `Refusing to kill PID ${pid}: process identity does not match the reply listener daemon (stale or reused PID - removed PID file)`,
}
}
try {
process.kill(pid, "SIGTERM")
removePidFile()
const state = readDaemonState()
if (state) {
state.isRunning = false
state.pid = null
writeDaemonState(state)
}
log(`Reply listener daemon stopped (PID ${pid})`)
return {
success: true,
message: `Reply listener daemon stopped (PID ${pid})`,
state: state || undefined,
}
} catch (error) {
return {
success: false,
message: "Failed to stop daemon",
error: error instanceof Error ? error.message : String(error),
}
}
}

View File

@@ -0,0 +1,340 @@
import {
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
openSync,
closeSync,
writeSync,
unlinkSync,
statSync,
constants,
} from "fs"
import { join, dirname } from "path"
import { randomUUID } from "crypto"
import { getOpenCodeStorageDir } from "../shared/data-path"
const OPENCLAW_STORAGE_DIR = join(getOpenCodeStorageDir(), "openclaw")
const REGISTRY_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.jsonl")
const REGISTRY_LOCK_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.lock")
const SECURE_FILE_MODE = 0o600
const MAX_AGE_MS = 24 * 60 * 60 * 1000
const LOCK_TIMEOUT_MS = 2000
const LOCK_WAIT_TIMEOUT_MS = 4000
const LOCK_RETRY_MS = 20
const LOCK_STALE_MS = 10000
export interface SessionMapping {
sessionId: string
tmuxSession: string
tmuxPaneId: string
projectPath: string
platform: string
messageId: string
channelId?: string
threadId?: string
createdAt: string
}
function ensureRegistryDir(): void {
const registryDir = dirname(REGISTRY_PATH)
if (!existsSync(registryDir)) {
mkdirSync(registryDir, { recursive: true, mode: 0o700 })
}
}
function sleepMs(ms: number): void {
// Use Atomics.wait for synchronous sleep
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms)
}
function isPidAlive(pid: number): boolean {
if (!Number.isFinite(pid) || pid <= 0) return false
try {
process.kill(pid, 0)
return true
} catch (error) {
return (error as NodeJS.ErrnoException).code === "EPERM"
}
}
interface LockSnapshot {
raw: string
pid: number | null
token: string | null
}
function readLockSnapshot(): LockSnapshot | null {
try {
if (!existsSync(REGISTRY_LOCK_PATH)) return null
const raw = readFileSync(REGISTRY_LOCK_PATH, "utf-8")
const trimmed = raw.trim()
if (!trimmed) return { raw, pid: null, token: null }
try {
const parsed = JSON.parse(trimmed)
const pid =
typeof parsed.pid === "number" && Number.isFinite(parsed.pid) ? parsed.pid : null
const token =
typeof parsed.token === "string" && parsed.token.length > 0 ? parsed.token : null
return { raw, pid, token }
} catch {
// Legacy format or plain PID
const [pidStr] = trimmed.split(":")
const parsedPid = Number.parseInt(pidStr ?? "", 10)
return {
raw,
pid: Number.isFinite(parsedPid) && parsedPid > 0 ? parsedPid : null,
token: null,
}
}
} catch {
return null
}
}
function removeLockIfUnchanged(snapshot: LockSnapshot): boolean {
try {
if (!existsSync(REGISTRY_LOCK_PATH)) return false
const currentRaw = readFileSync(REGISTRY_LOCK_PATH, "utf-8")
if (currentRaw !== snapshot.raw) return false
unlinkSync(REGISTRY_LOCK_PATH)
return true
} catch {
return false
}
}
interface LockHandle {
fd: number
token: string
}
function acquireRegistryLock(): LockHandle | null {
ensureRegistryDir()
const started = Date.now()
while (Date.now() - started < LOCK_TIMEOUT_MS) {
try {
const token = randomUUID()
const fd = openSync(
REGISTRY_LOCK_PATH,
constants.O_CREAT | constants.O_EXCL | constants.O_WRONLY,
SECURE_FILE_MODE,
)
try {
const lockPayload = JSON.stringify({
pid: process.pid,
acquiredAt: Date.now(),
token,
})
writeSync(fd, lockPayload)
} catch (writeError) {
try {
closeSync(fd)
} catch {
// Ignore
}
try {
unlinkSync(REGISTRY_LOCK_PATH)
} catch {
// Ignore
}
throw writeError
}
return { fd, token }
} catch (error) {
const err = error as NodeJS.ErrnoException
if (err.code !== "EEXIST") throw error
try {
const stats = statSync(REGISTRY_LOCK_PATH)
const lockAgeMs = Date.now() - stats.mtimeMs
if (lockAgeMs > LOCK_STALE_MS) {
const snapshot = readLockSnapshot()
if (!snapshot) {
sleepMs(LOCK_RETRY_MS)
continue
}
if (snapshot.pid !== null && isPidAlive(snapshot.pid)) {
sleepMs(LOCK_RETRY_MS)
continue
}
if (removeLockIfUnchanged(snapshot)) {
continue
}
}
} catch {
// Ignore errors
}
sleepMs(LOCK_RETRY_MS)
}
}
return null
}
function acquireRegistryLockOrWait(maxWaitMs = LOCK_WAIT_TIMEOUT_MS): LockHandle | null {
const started = Date.now()
while (Date.now() - started < maxWaitMs) {
const lock = acquireRegistryLock()
if (lock !== null) return lock
if (Date.now() - started < maxWaitMs) {
sleepMs(LOCK_RETRY_MS)
}
}
return null
}
function releaseRegistryLock(lock: LockHandle): void {
try {
closeSync(lock.fd)
} catch {
// Ignore
}
const snapshot = readLockSnapshot()
if (!snapshot || snapshot.token !== lock.token) return
removeLockIfUnchanged(snapshot)
}
function withRegistryLockOrWait<T>(
onLocked: () => T,
onLockUnavailable: () => T,
): T {
const lock = acquireRegistryLockOrWait()
if (lock === null) return onLockUnavailable()
try {
return onLocked()
} finally {
releaseRegistryLock(lock)
}
}
function withRegistryLock(onLocked: () => void, onLockUnavailable: () => void): void {
const lock = acquireRegistryLock()
if (lock === null) {
onLockUnavailable()
return
}
try {
onLocked()
} finally {
releaseRegistryLock(lock)
}
}
function readAllMappingsUnsafe(): SessionMapping[] {
if (!existsSync(REGISTRY_PATH)) return []
try {
const content = readFileSync(REGISTRY_PATH, "utf-8")
return content
.split("\n")
.filter((line) => line.trim())
.map((line) => {
try {
return JSON.parse(line) as SessionMapping
} catch {
return null
}
})
.filter((m): m is SessionMapping => m !== null)
} catch {
return []
}
}
function rewriteRegistryUnsafe(mappings: SessionMapping[]): void {
ensureRegistryDir()
if (mappings.length === 0) {
writeFileSync(REGISTRY_PATH, "", { mode: SECURE_FILE_MODE })
return
}
const content = mappings.map((m) => JSON.stringify(m)).join("\n") + "\n"
writeFileSync(REGISTRY_PATH, content, { mode: SECURE_FILE_MODE })
}
export function registerMessage(mapping: SessionMapping): boolean {
return withRegistryLockOrWait(
() => {
ensureRegistryDir()
const line = JSON.stringify(mapping) + "\n"
const fd = openSync(
REGISTRY_PATH,
constants.O_WRONLY | constants.O_APPEND | constants.O_CREAT,
SECURE_FILE_MODE,
)
try {
writeSync(fd, line)
} finally {
closeSync(fd)
}
return true
},
() => {
console.warn(
"[notifications] session registry lock unavailable; skipping reply correlation write",
)
return false
},
)
}
export function loadAllMappings(): SessionMapping[] {
return withRegistryLockOrWait(
() => readAllMappingsUnsafe(),
() => [],
)
}
export function lookupByMessageId(platform: string, messageId: string): SessionMapping | null {
const mappings = loadAllMappings()
return mappings.find((m) => m.platform === platform && m.messageId === messageId) || null
}
export function removeSession(sessionId: string): void {
withRegistryLock(
() => {
const mappings = readAllMappingsUnsafe()
const filtered = mappings.filter((m) => m.sessionId !== sessionId)
if (filtered.length === mappings.length) return
rewriteRegistryUnsafe(filtered)
},
() => {
// Best-effort
},
)
}
export function removeMessagesByPane(paneId: string): void {
withRegistryLock(
() => {
const mappings = readAllMappingsUnsafe()
const filtered = mappings.filter((m) => m.tmuxPaneId !== paneId)
if (filtered.length === mappings.length) return
rewriteRegistryUnsafe(filtered)
},
() => {
// Best-effort
},
)
}
export function pruneStale(): void {
withRegistryLock(
() => {
const now = Date.now()
const mappings = readAllMappingsUnsafe()
const filtered = mappings.filter((m) => {
try {
const age = now - new Date(m.createdAt).getTime()
return age < MAX_AGE_MS
} catch {
return false
}
})
if (filtered.length === mappings.length) return
rewriteRegistryUnsafe(filtered)
},
() => {
// Best-effort
},
)
}

91
src/openclaw/tmux.ts Normal file
View File

@@ -0,0 +1,91 @@
import { spawn } from "bun"
export function getCurrentTmuxSession(): string | null {
const env = process.env.TMUX
if (!env) return null
const match = env.match(/(\d+)$/)
return match ? `session-${match[1]}` : null // Wait, TMUX env is /tmp/tmux-501/default,1234,0
// Reference tmux.js gets session name via `tmux display-message -p '#S'`
}
export async function getTmuxSessionName(): Promise<string | null> {
try {
const proc = spawn(["tmux", "display-message", "-p", "#S"], {
stdout: "pipe",
stderr: "ignore",
})
const outputPromise = new Response(proc.stdout).text()
await proc.exited
const output = await outputPromise
// Await proc.exited ensures exitCode is set; avoid race condition
if (proc.exitCode !== 0) return null
return output.trim() || null
} catch {
return null
}
}
export async function captureTmuxPane(paneId: string, lines = 15): Promise<string | null> {
try {
const proc = spawn(
["tmux", "capture-pane", "-p", "-t", paneId, "-S", `-${lines}`],
{
stdout: "pipe",
stderr: "ignore",
},
)
const outputPromise = new Response(proc.stdout).text()
await proc.exited
const output = await outputPromise
if (proc.exitCode !== 0) return null
return output.trim() || null
} catch {
return null
}
}
export async function sendToPane(paneId: string, text: string, confirm = true): Promise<boolean> {
try {
const literalProc = spawn(["tmux", "send-keys", "-t", paneId, "-l", "--", text], {
stdout: "ignore",
stderr: "ignore",
})
await literalProc.exited
if (literalProc.exitCode !== 0) return false
if (!confirm) return true
const enterProc = spawn(["tmux", "send-keys", "-t", paneId, "Enter"], {
stdout: "ignore",
stderr: "ignore",
})
await enterProc.exited
return enterProc.exitCode === 0
} catch {
return false
}
}
export async function isTmuxAvailable(): Promise<boolean> {
try {
const proc = spawn(["tmux", "-V"], {
stdout: "ignore",
stderr: "ignore",
})
await proc.exited
return proc.exitCode === 0
} catch {
return false
}
}
export function analyzePaneContent(content: string | null): { confidence: number } {
if (!content) return { confidence: 0 }
let confidence = 0
if (content.includes("opencode")) confidence += 0.3
if (content.includes("Ask anything...")) confidence += 0.5
if (content.includes("Run /help")) confidence += 0.2
return { confidence: Math.min(1, confidence) }
}

52
src/openclaw/types.ts Normal file
View File

@@ -0,0 +1,52 @@
import type {
OpenClawConfig,
OpenClawGateway,
OpenClawHook,
OpenClawReplyListenerConfig,
} from "../config/schema/openclaw"
export type {
OpenClawConfig,
OpenClawGateway,
OpenClawHook,
OpenClawReplyListenerConfig,
}
export interface OpenClawContext {
sessionId?: string
projectPath?: string
projectName?: string
tmuxSession?: string
prompt?: string
contextSummary?: string
reasoning?: string
question?: string
tmuxTail?: string
replyChannel?: string
replyTarget?: string
replyThread?: string
[key: string]: string | undefined
}
export interface OpenClawPayload {
event: string
instruction: string
text: string
timestamp: string
sessionId?: string
projectPath?: string
projectName?: string
tmuxSession?: string
tmuxTail?: string
channel?: string
to?: string
threadId?: string
context: OpenClawContext
}
export interface WakeResult {
gateway: string
success: boolean
error?: string
statusCode?: number
}