Fix OpenClaw review issues

This commit is contained in:
YeonGyu-Kim
2026-03-16 22:28:54 +09:00
parent b79df5e018
commit c644930753
14 changed files with 412 additions and 194 deletions

View File

@@ -9,7 +9,7 @@ export const OpenClawGatewaySchema = z.object({
// Command specific
command: z.string().optional(),
// Shared
timeout: z.number().default(10000),
timeout: z.number().optional(),
})
export const OpenClawHookSchema = z.object({
@@ -18,28 +18,33 @@ export const OpenClawHookSchema = z.object({
instruction: z.string(),
})
export const OpenClawConfigSchema = z.object({
enabled: z.boolean().default(false),
// Outbound Configuration
gateways: z.record(z.string(), OpenClawGatewaySchema).default({}),
hooks: z.record(z.string(), OpenClawHookSchema).default({}),
// Inbound Configuration (Reply Listener)
export const OpenClawReplyListenerConfigSchema = z.object({
discordBotToken: z.string().optional(),
discordChannelId: z.string().optional(),
discordMention: z.string().optional(), // For allowed_mentions
authorizedDiscordUserIds: z.array(z.string()).default([]),
telegramBotToken: z.string().optional(),
telegramChatId: z.string().optional(),
pollIntervalMs: z.number().default(3000),
rateLimitPerMinute: z.number().default(10),
maxMessageLength: z.number().default(500),
includePrefix: z.boolean().default(true),
})
export const OpenClawConfigSchema = z.object({
enabled: z.boolean().default(false),
// Outbound Configuration
gateways: z.record(z.string(), OpenClawGatewaySchema).default({}),
hooks: z.record(z.string(), OpenClawHookSchema).default({}),
// Inbound Configuration (Reply Listener)
replyListener: OpenClawReplyListenerConfigSchema.optional(),
})
export type OpenClawConfig = z.infer<typeof OpenClawConfigSchema>
export type OpenClawGateway = z.infer<typeof OpenClawGatewaySchema>
export type OpenClawHook = z.infer<typeof OpenClawHookSchema>
export type OpenClawReplyListenerConfig = z.infer<typeof OpenClawReplyListenerConfigSchema>

View File

@@ -0,0 +1,60 @@
import { beforeEach, describe, expect, mock, test } from "bun:test"
const wakeOpenClawMock = mock(async () => null)
mock.module("../openclaw", () => ({
wakeOpenClaw: wakeOpenClawMock,
}))
describe("createOpenClawHook", () => {
beforeEach(() => {
wakeOpenClawMock.mockClear()
})
test("maps session.stop events to stop", async () => {
const { createOpenClawHook } = await import("./openclaw")
const hook = createOpenClawHook(
{ directory: "/tmp/project" } as any,
{ openclaw: { enabled: true } } as any,
)
await hook?.event?.({
event: {
type: "session.stop",
properties: { sessionID: "session-1" },
},
})
expect(wakeOpenClawMock).toHaveBeenCalledWith(
expect.anything(),
"stop",
expect.objectContaining({
projectPath: "/tmp/project",
sessionId: "session-1",
}),
)
})
test("uses tool.execute.before for question tools", async () => {
const { createOpenClawHook } = await import("./openclaw")
const hook = createOpenClawHook(
{ directory: "/tmp/project" } as any,
{ openclaw: { enabled: true } } as any,
)
await hook?.["tool.execute.before"]?.(
{ tool: "ask_user_question", sessionID: "session-2" },
{ args: { question: "Need approval?" } },
)
expect(wakeOpenClawMock).toHaveBeenCalledWith(
expect.anything(),
"ask-user-question",
expect.objectContaining({
projectPath: "/tmp/project",
question: "Need approval?",
sessionId: "session-2",
}),
)
})
})

View File

@@ -3,7 +3,6 @@ import type { OhMyOpenCodeConfig } from "../config"
import { wakeOpenClaw } from "../openclaw"
import type { OpenClawContext } from "../openclaw/types"
export function createOpenClawHook(
ctx: PluginContext,
pluginConfig: OhMyOpenCodeConfig,
@@ -35,21 +34,31 @@ export function createOpenClawHook(
// This is heuristic. If the last message was from assistant and ended with a question?
// Or if the system is idle.
await handleWake("session-idle", context)
} else if (event.type === "session.stopped") { // Assuming this event exists or map from error?
} else if (event.type === "session.stop") {
await handleWake("stop", context)
}
},
toolExecuteBefore: async (input: any) => {
const { toolName, toolInput, sessionID } = input
if (toolName === "ask_user" || toolName === "ask_followup_question") {
const context: OpenClawContext = {
sessionId: sessionID,
projectPath: ctx.directory,
question: toolInput.question,
}
await handleWake("ask-user-question", context)
}
}
"tool.execute.before": async (
input: { tool: string; sessionID: string },
output: { args: Record<string, unknown> },
) => {
const normalizedToolName = input.tool.toLowerCase()
if (
normalizedToolName !== "question"
&& normalizedToolName !== "ask_user_question"
&& normalizedToolName !== "askuserquestion"
) {
return
}
const question = typeof output.args.question === "string" ? output.args.question : undefined
const context: OpenClawContext = {
sessionId: input.sessionID,
projectPath: ctx.directory,
question,
}
await handleWake("ask-user-question", context)
},
}
}

View File

@@ -1,6 +1,7 @@
import { describe, expect, test } from "bun:test"
import { resolveGateway, validateGatewayUrl, normalizeReplyListenerConfig } from "../config"
import type { OpenClawConfig } from "../types"
import { OpenClawConfigSchema } from "../../config/schema/openclaw"
describe("OpenClaw Config", () => {
test("resolveGateway resolves HTTP gateway", () => {
@@ -49,7 +50,7 @@ describe("OpenClaw Config", () => {
test("resolveGateway returns null for disabled hook", () => {
const config: OpenClawConfig = {
enabled: true,
gateways: { g: { url: "https://example.com" } },
gateways: { g: { type: "http", url: "https://example.com" } },
hooks: {
event: { enabled: false, gateway: "g", instruction: "i" },
},
@@ -69,4 +70,46 @@ describe("OpenClaw Config", () => {
expect(validateGatewayUrl("http://localhost:3000")).toBe(true)
expect(validateGatewayUrl("http://127.0.0.1:3000")).toBe(true)
})
test("normalizeReplyListenerConfig normalizes nested reply listener fields", () => {
const config = normalizeReplyListenerConfig({
enabled: true,
gateways: {},
hooks: {},
replyListener: {
discordBotToken: "discord-token",
discordChannelId: "channel-id",
authorizedDiscordUserIds: ["user-1", "", "user-2"],
pollIntervalMs: 100,
rateLimitPerMinute: 0,
maxMessageLength: 9000,
includePrefix: false,
},
} as OpenClawConfig)
expect(config.replyListener).toEqual({
discordBotToken: "discord-token",
discordChannelId: "channel-id",
authorizedDiscordUserIds: ["user-1", "user-2"],
pollIntervalMs: 500,
rateLimitPerMinute: 1,
maxMessageLength: 4000,
includePrefix: false,
})
})
test("gateway timeout remains optional so env fallback can apply", () => {
const parsed = OpenClawConfigSchema.parse({
enabled: true,
gateways: {
command: {
type: "command",
command: "echo hi",
},
},
hooks: {},
})
expect(parsed.gateways.command.timeout).toBeUndefined()
})
})

View File

@@ -1,6 +1,7 @@
import { describe, expect, test, mock, spyOn } from "bun:test"
import {
interpolateInstruction,
resolveCommandTimeoutMs,
shellEscapeArg,
wakeGateway,
wakeCommandGateway,
@@ -30,21 +31,22 @@ describe("OpenClaw Dispatcher", () => {
const fetchSpy = spyOn(global, "fetch").mockResolvedValue(
new Response(JSON.stringify({ ok: true }), { status: 200 }),
)
try {
const result = await wakeGateway(
"test",
{ url: "https://example.com", method: "POST", timeout: 1000, type: "http" },
{ foo: "bar" },
)
const result = await wakeGateway(
"test",
{ url: "https://example.com", method: "POST", timeout: 1000, type: "http" },
{ foo: "bar" },
)
expect(result.success).toBe(true)
expect(fetchSpy).toHaveBeenCalled()
const call = fetchSpy.mock.calls[0]
expect(call[0]).toBe("https://example.com")
expect(call[1]?.method).toBe("POST")
expect(call[1]?.body).toBe('{"foo":"bar"}')
fetchSpy.mockRestore()
expect(result.success).toBe(true)
expect(fetchSpy).toHaveBeenCalled()
const call = fetchSpy.mock.calls[0]
expect(call[0]).toBe("https://example.com")
expect(call[1]?.method).toBe("POST")
expect(call[1]?.body).toBe('{"foo":"bar"}')
} finally {
fetchSpy.mockRestore()
}
})
test("wakeGateway fails on invalid URL", async () => {
@@ -52,4 +54,16 @@ describe("OpenClaw Dispatcher", () => {
expect(result.success).toBe(false)
expect(result.error).toContain("Invalid URL")
})
test("resolveCommandTimeoutMs reads OMO env fallback", () => {
const original = process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = "4321"
try {
expect(resolveCommandTimeoutMs(undefined, process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS)).toBe(4321)
} finally {
if (original === undefined) delete process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
else process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS = original
}
})
})

View File

@@ -0,0 +1,13 @@
import { describe, expect, test } from "bun:test"
import { analyzePaneContent } from "../tmux"
describe("openclaw tmux helpers", () => {
test("analyzePaneContent recognizes the opencode welcome prompt", () => {
const content = "opencode\nAsk anything...\nRun /help"
expect(analyzePaneContent(content).confidence).toBeGreaterThanOrEqual(1)
})
test("analyzePaneContent returns zero confidence for empty content", () => {
expect(analyzePaneContent(null).confidence).toBe(0)
})
})

View File

@@ -1,4 +1,8 @@
import type { OpenClawConfig, OpenClawGateway } from "./types"
import type {
OpenClawConfig,
OpenClawGateway,
OpenClawReplyListenerConfig,
} from "./types"
const DEFAULT_REPLY_POLL_INTERVAL_MS = 3000
const MIN_REPLY_POLL_INTERVAL_MS = 500
@@ -29,41 +33,44 @@ function normalizeInteger(
}
export function normalizeReplyListenerConfig(config: OpenClawConfig): OpenClawConfig {
const discordEnabled =
config.discordBotToken && config.discordChannelId ? true : false
const telegramEnabled =
config.telegramBotToken && config.telegramChatId ? true : false
const replyListener = config.replyListener
if (!replyListener) return config
return {
...config,
discordBotToken: config.discordBotToken,
discordChannelId: config.discordChannelId,
telegramBotToken: config.telegramBotToken,
telegramChatId: config.telegramChatId,
const normalizedReplyListener: OpenClawReplyListenerConfig = {
...replyListener,
discordBotToken: replyListener.discordBotToken,
discordChannelId: replyListener.discordChannelId,
telegramBotToken: replyListener.telegramBotToken,
telegramChatId: replyListener.telegramChatId,
pollIntervalMs: normalizeInteger(
config.pollIntervalMs,
replyListener.pollIntervalMs,
DEFAULT_REPLY_POLL_INTERVAL_MS,
MIN_REPLY_POLL_INTERVAL_MS,
MAX_REPLY_POLL_INTERVAL_MS,
),
rateLimitPerMinute: normalizeInteger(
config.rateLimitPerMinute,
replyListener.rateLimitPerMinute,
DEFAULT_REPLY_RATE_LIMIT_PER_MINUTE,
MIN_REPLY_RATE_LIMIT_PER_MINUTE,
),
maxMessageLength: normalizeInteger(
config.maxMessageLength,
replyListener.maxMessageLength,
DEFAULT_REPLY_MAX_MESSAGE_LENGTH,
MIN_REPLY_MAX_MESSAGE_LENGTH,
MAX_REPLY_MAX_MESSAGE_LENGTH,
),
includePrefix: config.includePrefix !== false,
authorizedDiscordUserIds: Array.isArray(config.authorizedDiscordUserIds)
? config.authorizedDiscordUserIds.filter(
includePrefix: replyListener.includePrefix !== false,
authorizedDiscordUserIds: Array.isArray(replyListener.authorizedDiscordUserIds)
? replyListener.authorizedDiscordUserIds.filter(
(id) => typeof id === "string" && id.trim() !== "",
)
: [],
}
return {
...config,
replyListener: normalizedReplyListener,
}
}
export function resolveGateway(
@@ -71,17 +78,17 @@ export function resolveGateway(
event: string,
): { gatewayName: string; gateway: OpenClawGateway; instruction: string } | null {
if (!config.enabled) return null
const mapping = config.hooks[event]
if (!mapping || !mapping.enabled) {
return null
}
const gateway = config.gateways[mapping.gateway]
if (!gateway) {
return null
}
// Validate based on gateway type
if (gateway.type === "command") {
if (!gateway.command) return null
@@ -89,7 +96,7 @@ export function resolveGateway(
// HTTP gateway
if (!gateway.url) return null
}
return { gatewayName: mapping.gateway, gateway, instruction: mapping.instruction }
}

View File

@@ -1,6 +1,9 @@
import { pollLoop } from "./reply-listener"
import { pollLoop, logReplyListenerMessage } from "./reply-listener"
pollLoop().catch((err) => {
logReplyListenerMessage(
`FATAL: reply listener daemon crashed: ${err instanceof Error ? err.stack ?? err.message : String(err)}`,
)
console.error(err)
process.exit(1)
})

View File

@@ -41,7 +41,9 @@ export function shellEscapeArg(value: string): string {
export function resolveCommandTimeoutMs(
gatewayTimeout?: number,
envTimeoutRaw = process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS,
envTimeoutRaw =
process.env.OMO_OPENCLAW_COMMAND_TIMEOUT_MS
?? process.env.OMX_OPENCLAW_COMMAND_TIMEOUT_MS,
): number {
const parseFinite = (value: unknown): number | undefined => {
if (typeof value !== "number" || !Number.isFinite(value)) return undefined
@@ -52,12 +54,12 @@ export function resolveCommandTimeoutMs(
const parsed = Number(value)
return Number.isFinite(parsed) ? parsed : undefined
}
const rawTimeout =
parseFinite(gatewayTimeout) ??
parseEnv(envTimeoutRaw) ??
DEFAULT_COMMAND_TIMEOUT_MS
return Math.min(
MAX_COMMAND_TIMEOUT_MS,
Math.max(MIN_COMMAND_TIMEOUT_MS, Math.trunc(rawTimeout)),
@@ -76,27 +78,27 @@ export async function wakeGateway(
error: "Invalid URL (HTTPS required)",
}
}
try {
const headers = {
"Content-Type": "application/json",
...gatewayConfig.headers,
}
const timeout = gatewayConfig.timeout ?? DEFAULT_HTTP_TIMEOUT_MS
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), timeout)
const response = await fetch(gatewayConfig.url, {
method: gatewayConfig.method || "POST",
headers,
body: JSON.stringify(payload),
signal: controller.signal,
}).finally(() => {
clearTimeout(timeoutId)
})
clearTimeout(timeoutId)
if (!response.ok) {
return {
gateway: gatewayName,
@@ -128,38 +130,45 @@ export async function wakeCommandGateway(
error: "No command configured",
}
}
try {
const timeout = resolveCommandTimeoutMs(gatewayConfig.timeout)
// Interpolate variables with shell escaping
let interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => {
const interpolated = gatewayConfig.command.replace(/\{\{(\w+)\}\}/g, (_match, key) => {
const value = variables[key]
if (value === undefined) return _match
return shellEscapeArg(value)
})
// Always use sh -c to handle the shell command string correctly
const proc = spawn(["sh", "-c", interpolated], {
env: { ...process.env },
stdout: "ignore",
stderr: "ignore",
})
// Handle timeout manually
const timeoutPromise = new Promise<number>((_, reject) => {
setTimeout(() => {
let timeoutId: ReturnType<typeof setTimeout> | undefined
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
proc.kill()
reject(new Error("Command timed out"))
}, timeout)
})
await Promise.race([proc.exited, timeoutPromise])
try {
await Promise.race([proc.exited, timeoutPromise])
} finally {
if (timeoutId !== undefined) {
clearTimeout(timeoutId)
}
}
if (proc.exitCode !== 0) {
throw new Error(`Command exited with code ${proc.exitCode}`)
}
return { gateway: gatewayName, success: true }
} catch (error) {
return {
@@ -169,4 +178,3 @@ export async function wakeCommandGateway(
}
}
}

View File

@@ -9,7 +9,9 @@ import { getCurrentTmuxSession, captureTmuxPane } from "./tmux"
import { startReplyListener, stopReplyListener } from "./reply-listener"
import type { OpenClawConfig, OpenClawContext, OpenClawPayload, WakeResult } from "./types"
const DEBUG = process.env.OMX_OPENCLAW_DEBUG === "1"
const DEBUG =
process.env.OMO_OPENCLAW_DEBUG === "1"
|| process.env.OMX_OPENCLAW_DEBUG === "1"
function buildWhitelistedContext(context: OpenClawContext): OpenClawContext {
const result: OpenClawContext = {}
@@ -18,7 +20,7 @@ function buildWhitelistedContext(context: OpenClawContext): OpenClawContext {
if (context.tmuxSession !== undefined) result.tmuxSession = context.tmuxSession
if (context.prompt !== undefined) result.prompt = context.prompt
if (context.contextSummary !== undefined) result.contextSummary = context.contextSummary
if (context.reason !== undefined) result.reason = context.reason
if (context.reasoning !== undefined) result.reasoning = context.reasoning
if (context.question !== undefined) result.question = context.question
if (context.tmuxTail !== undefined) result.tmuxTail = context.tmuxTail
if (context.replyChannel !== undefined) result.replyChannel = context.replyChannel
@@ -34,27 +36,27 @@ export async function wakeOpenClaw(
): Promise<WakeResult | null> {
try {
if (!config.enabled) return null
const resolved = resolveGateway(config, event)
if (!resolved) return null
const { gatewayName, gateway, instruction } = resolved
const now = new Date().toISOString()
const replyChannel = context.replyChannel ?? process.env.OPENCLAW_REPLY_CHANNEL
const replyTarget = context.replyTarget ?? process.env.OPENCLAW_REPLY_TARGET
const replyThread = context.replyThread ?? process.env.OPENCLAW_REPLY_THREAD
const enrichedContext: OpenClawContext = {
...context,
...(replyChannel !== undefined && { replyChannel }),
...(replyTarget !== undefined && { replyTarget }),
...(replyThread !== undefined && { replyThread }),
}
const tmuxSession = enrichedContext.tmuxSession ?? getCurrentTmuxSession() ?? undefined
let tmuxTail = enrichedContext.tmuxTail
if (!tmuxTail && (event === "stop" || event === "session-end") && process.env.TMUX) {
try {
@@ -62,11 +64,16 @@ export async function wakeOpenClaw(
if (paneId) {
tmuxTail = (await captureTmuxPane(paneId, 15)) ?? undefined
}
} catch {
// Ignore
} catch (error) {
if (DEBUG) {
console.error(
"[openclaw] failed to capture tmux tail:",
error instanceof Error ? error.message : error,
)
}
}
}
const variables: Record<string, string | undefined> = {
sessionId: enrichedContext.sessionId,
projectPath: enrichedContext.projectPath,
@@ -74,7 +81,7 @@ export async function wakeOpenClaw(
tmuxSession,
prompt: enrichedContext.prompt,
contextSummary: enrichedContext.contextSummary,
reason: enrichedContext.reason,
reasoning: enrichedContext.reasoning,
question: enrichedContext.question,
tmuxTail,
event,
@@ -83,12 +90,12 @@ export async function wakeOpenClaw(
replyTarget,
replyThread,
}
const interpolatedInstruction = interpolateInstruction(instruction, variables)
variables.instruction = interpolatedInstruction
let result: WakeResult
if (gateway.type === "command") {
result = await wakeCommandGateway(gatewayName, gateway, variables)
} else {
@@ -107,14 +114,14 @@ export async function wakeOpenClaw(
...(replyThread !== undefined && { threadId: replyThread }),
context: buildWhitelistedContext(enrichedContext),
}
result = await wakeGateway(gatewayName, gateway, payload)
}
if (DEBUG) {
console.error(`[openclaw] wake ${event} -> ${gatewayName}: ${result.success ? "ok" : result.error}`)
}
return result
} catch (error) {
if (DEBUG) {
@@ -125,7 +132,8 @@ export async function wakeOpenClaw(
}
export async function initializeOpenClaw(config: OpenClawConfig): Promise<void> {
if (config.enabled && (config.discordBotToken || config.telegramBotToken)) {
const replyListener = config.replyListener
if (config.enabled && (replyListener?.discordBotToken || replyListener?.telegramBotToken)) {
await startReplyListener(config)
}
}

View File

@@ -58,7 +58,7 @@ const STATE_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-state.json")
const CONFIG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener-config.json")
const LOG_FILE_PATH = join(DEFAULT_STATE_DIR, "reply-listener.log")
const DAEMON_IDENTITY_MARKER = "pollLoop"
export const DAEMON_IDENTITY_MARKER = "--openclaw-reply-listener-daemon"
function createMinimalDaemonEnv(): Record<string, string> {
const env: Record<string, string> = {}
@@ -114,6 +114,10 @@ function log(message: string): void {
}
}
export function logReplyListenerMessage(message: string): void {
log(message)
}
interface DaemonState {
isRunning: boolean
pid: number | null
@@ -255,22 +259,23 @@ async function injectReply(
platform: string,
config: OpenClawConfig,
): Promise<boolean> {
const replyListener = config.replyListener
const content = await captureTmuxPane(paneId, 15)
const analysis = analyzePaneContent(content)
if (analysis.confidence < 0.3) { // Lower threshold for simple check
log(
`WARN: Pane ${paneId} does not appear to be running Codex CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`,
`WARN: Pane ${paneId} does not appear to be running OpenCode CLI (confidence: ${analysis.confidence}). Skipping injection, removing stale mapping.`,
)
removeMessagesByPane(paneId)
return false
}
const prefix = config.includePrefix ? `[reply:${platform}] ` : ""
const prefix = replyListener?.includePrefix === false ? "" : `[reply:${platform}] `
const sanitized = sanitizeReplyInput(prefix + text)
const truncated = sanitized.slice(0, config.maxMessageLength)
const truncated = sanitized.slice(0, replyListener?.maxMessageLength ?? 500)
const success = await sendToPane(paneId, truncated, true)
if (success) {
log(
`Injected reply from ${platform} into pane ${paneId}: "${truncated.slice(0, 50)}${truncated.length > 50 ? "..." : ""}"`,
@@ -288,30 +293,36 @@ async function pollDiscord(
state: DaemonState,
rateLimiter: RateLimiter,
): Promise<void> {
if (!config.discordBotToken || !config.discordChannelId) return
if (!config.authorizedDiscordUserIds || config.authorizedDiscordUserIds.length === 0) return
const replyListener = config.replyListener
if (!replyListener?.discordBotToken || !replyListener.discordChannelId) return
if (
!replyListener.authorizedDiscordUserIds
|| replyListener.authorizedDiscordUserIds.length === 0
) {
return
}
if (Date.now() < discordBackoffUntil) return
try {
const after = state.discordLastMessageId
? `?after=${state.discordLastMessageId}&limit=10`
: "?limit=10"
const url = `https://discord.com/api/v10/channels/${config.discordChannelId}/messages${after}`
const url = `https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages${after}`
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), 10000)
const response = await fetch(url, {
method: "GET",
headers: { Authorization: `Bot ${config.discordBotToken}` },
headers: { Authorization: `Bot ${replyListener.discordBotToken}` },
signal: controller.signal,
})
clearTimeout(timeout)
const remaining = response.headers.get("x-ratelimit-remaining")
const reset = response.headers.get("x-ratelimit-reset")
if (remaining !== null && parseInt(remaining, 10) < 2) {
const parsed = reset ? parseFloat(reset) : Number.NaN
const resetTime = Number.isFinite(parsed) ? parsed * 1000 : Date.now() + 10000
@@ -320,37 +331,37 @@ async function pollDiscord(
`WARN: Discord rate limit low (remaining: ${remaining}), backing off until ${new Date(resetTime).toISOString()}`,
)
}
if (!response.ok) {
log(`Discord API error: HTTP ${response.status}`)
return
}
const messages = await response.json()
if (!Array.isArray(messages) || messages.length === 0) return
const sorted = [...messages].reverse()
for (const msg of sorted) {
if (!msg.message_reference?.message_id) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
if (!config.authorizedDiscordUserIds.includes(msg.author.id)) {
if (!replyListener.authorizedDiscordUserIds.includes(msg.author.id)) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
const mapping = lookupByMessageId("discord-bot", msg.message_reference.message_id)
if (!mapping) {
state.discordLastMessageId = msg.id
writeDaemonState(state)
continue
}
if (!rateLimiter.canProceed()) {
log(`WARN: Rate limit exceeded, dropping Discord message ${msg.id}`)
state.discordLastMessageId = msg.id
@@ -358,21 +369,21 @@ async function pollDiscord(
state.errors++
continue
}
state.discordLastMessageId = msg.id
writeDaemonState(state)
const success = await injectReply(mapping.tmuxPaneId, msg.content, "discord", config)
if (success) {
state.messagesInjected++
// Add reaction
try {
await fetch(
`https://discord.com/api/v10/channels/${config.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`,
`https://discord.com/api/v10/channels/${replyListener.discordChannelId}/messages/${msg.id}/reactions/%E2%9C%85/@me`,
{
method: "PUT",
headers: { Authorization: `Bot ${config.discordBotToken}` },
headers: { Authorization: `Bot ${replyListener.discordBotToken}` },
},
)
} catch {
@@ -394,30 +405,31 @@ async function pollTelegram(
state: DaemonState,
rateLimiter: RateLimiter,
): Promise<void> {
if (!config.telegramBotToken || !config.telegramChatId) return
const replyListener = config.replyListener
if (!replyListener?.telegramBotToken || !replyListener.telegramChatId) return
try {
const offset = state.telegramLastUpdateId ? state.telegramLastUpdateId + 1 : 0
const url = `https://api.telegram.org/bot${config.telegramBotToken}/getUpdates?offset=${offset}&timeout=0`
const url = `https://api.telegram.org/bot${replyListener.telegramBotToken}/getUpdates?offset=${offset}&timeout=0`
const controller = new AbortController()
const timeout = setTimeout(() => controller.abort(), 10000)
const response = await fetch(url, {
method: "GET",
signal: controller.signal,
})
clearTimeout(timeout)
if (!response.ok) {
log(`Telegram API error: HTTP ${response.status}`)
return
}
const body = await response.json() as any
const updates = body.result || []
for (const update of updates) {
const msg = update.message
if (!msg) {
@@ -431,27 +443,27 @@ async function pollTelegram(
writeDaemonState(state)
continue
}
if (String(msg.chat.id) !== config.telegramChatId) {
if (String(msg.chat.id) !== replyListener.telegramChatId) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
const mapping = lookupByMessageId("telegram", String(msg.reply_to_message.message_id))
if (!mapping) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
const text = msg.text || ""
if (!text) {
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
continue
}
if (!rateLimiter.canProceed()) {
log(`WARN: Rate limit exceeded, dropping Telegram message ${msg.message_id}`)
state.telegramLastUpdateId = update.update_id
@@ -459,22 +471,22 @@ async function pollTelegram(
state.errors++
continue
}
state.telegramLastUpdateId = update.update_id
writeDaemonState(state)
const success = await injectReply(mapping.tmuxPaneId, text, "telegram", config)
if (success) {
state.messagesInjected++
try {
await fetch(
`https://api.telegram.org/bot${config.telegramBotToken}/sendMessage`,
`https://api.telegram.org/bot${replyListener.telegramBotToken}/sendMessage`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
chat_id: config.telegramChatId,
chat_id: replyListener.telegramChatId,
text: "Injected into Codex CLI session.",
reply_to_message_id: msg.message_id,
}),
@@ -517,10 +529,10 @@ export async function pollLoop(): Promise<void> {
state.isRunning = true
state.pid = process.pid
const rateLimiter = new RateLimiter(config.rateLimitPerMinute || 10)
const rateLimiter = new RateLimiter(config.replyListener?.rateLimitPerMinute || 10)
let lastPruneAt = Date.now()
const shutdown = (): void => {
log("Shutdown signal received")
state.isRunning = false
@@ -528,10 +540,10 @@ export async function pollLoop(): Promise<void> {
removePidFile()
process.exit(0)
}
process.on("SIGTERM", shutdown)
process.on("SIGINT", shutdown)
try {
pruneStale()
log("Pruned stale registry entries")
@@ -554,15 +566,19 @@ export async function pollLoop(): Promise<void> {
log(`WARN: Prune failed: ${e instanceof Error ? e.message : String(e)}`)
}
}
writeDaemonState(state)
await new Promise((resolve) => setTimeout(resolve, config.pollIntervalMs || 3000))
await new Promise((resolve) =>
setTimeout(resolve, config.replyListener?.pollIntervalMs || 3000),
)
} catch (error) {
state.errors++
state.lastError = error instanceof Error ? error.message : String(error)
log(`Poll error: ${state.lastError}`)
writeDaemonState(state)
await new Promise((resolve) => setTimeout(resolve, (config.pollIntervalMs || 3000) * 2))
await new Promise((resolve) =>
setTimeout(resolve, (config.replyListener?.pollIntervalMs || 3000) * 2),
)
}
}
log("Poll loop ended")
@@ -577,16 +593,17 @@ export async function startReplyListener(config: OpenClawConfig): Promise<{ succ
state: state || undefined,
}
}
if (!(await isTmuxAvailable())) {
return {
success: false,
message: "tmux not available - reply injection requires tmux",
}
}
const normalizedConfig = normalizeReplyListenerConfig(config)
if (!normalizedConfig.discordBotToken && !normalizedConfig.telegramBotToken) {
const replyListener = normalizedConfig.replyListener
if (!replyListener?.discordBotToken && !replyListener?.telegramBotToken) {
// Only warn if no platforms enabled, but user might just want outbound
// Actually, instructions say: "Fire-and-forget for outbound, daemon process for inbound"
// So if no inbound config, we shouldn't start daemon.
@@ -595,18 +612,18 @@ export async function startReplyListener(config: OpenClawConfig): Promise<{ succ
message: "No enabled reply listener platforms configured (missing bot tokens/channels)",
}
}
writeDaemonConfig(normalizedConfig)
ensureStateDir()
const currentFile = import.meta.url
const isTs = currentFile.endsWith(".ts")
const daemonScript = isTs
const daemonScript = isTs
? join(dirname(new URL(currentFile).pathname), "daemon.ts")
: join(dirname(new URL(currentFile).pathname), "daemon.js")
try {
const proc = spawn(["bun", "run", daemonScript], {
const proc = spawn(["bun", "run", daemonScript, DAEMON_IDENTITY_MARKER], {
detached: true,
stdio: ["ignore", "ignore", "ignore"],
cwd: process.cwd(),

View File

@@ -11,11 +11,12 @@ import {
constants,
} from "fs"
import { join, dirname } from "path"
import { homedir } from "os"
import { randomUUID } from "crypto"
import { getOpenCodeStorageDir } from "../shared/data-path"
const REGISTRY_PATH = join(homedir(), ".omx", "state", "reply-session-registry.jsonl")
const REGISTRY_LOCK_PATH = join(homedir(), ".omx", "state", "reply-session-registry.lock")
const OPENCLAW_STORAGE_DIR = join(getOpenCodeStorageDir(), "openclaw")
const REGISTRY_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.jsonl")
const REGISTRY_LOCK_PATH = join(OPENCLAW_STORAGE_DIR, "reply-session-registry.lock")
const SECURE_FILE_MODE = 0o600
const MAX_AGE_MS = 24 * 60 * 60 * 1000
const LOCK_TIMEOUT_MS = 2000
@@ -120,12 +121,26 @@ function acquireRegistryLock(): LockHandle | null {
constants.O_CREAT | constants.O_EXCL | constants.O_WRONLY,
SECURE_FILE_MODE,
)
const lockPayload = JSON.stringify({
pid: process.pid,
acquiredAt: Date.now(),
token,
})
writeSync(fd, lockPayload)
try {
const lockPayload = JSON.stringify({
pid: process.pid,
acquiredAt: Date.now(),
token,
})
writeSync(fd, lockPayload)
} catch (writeError) {
try {
closeSync(fd)
} catch {
// Ignore
}
try {
unlinkSync(REGISTRY_LOCK_PATH)
} catch {
// Ignore
}
throw writeError
}
return { fd, token }
} catch (error) {
const err = error as NodeJS.ErrnoException

View File

@@ -14,7 +14,9 @@ export async function getTmuxSessionName(): Promise<string | null> {
stdout: "pipe",
stderr: "ignore",
})
const output = await new Response(proc.stdout).text()
const outputPromise = new Response(proc.stdout).text()
await proc.exited
const output = await outputPromise
if (proc.exitCode !== 0) return null
return output.trim() || null
} catch {
@@ -31,7 +33,9 @@ export async function captureTmuxPane(paneId: string, lines = 15): Promise<strin
stderr: "ignore",
},
)
const output = await new Response(proc.stdout).text()
const outputPromise = new Response(proc.stdout).text()
await proc.exited
const output = await outputPromise
if (proc.exitCode !== 0) return null
return output.trim() || null
} catch {
@@ -41,12 +45,21 @@ export async function captureTmuxPane(paneId: string, lines = 15): Promise<strin
export async function sendToPane(paneId: string, text: string, confirm = true): Promise<boolean> {
try {
const proc = spawn(["tmux", "send-keys", "-t", paneId, text, ...(confirm ? ["Enter"] : [])], {
const literalProc = spawn(["tmux", "send-keys", "-t", paneId, "-l", "--", text], {
stdout: "ignore",
stderr: "ignore",
})
await proc.exited
return proc.exitCode === 0
await literalProc.exited
if (literalProc.exitCode !== 0) return false
if (!confirm) return true
const enterProc = spawn(["tmux", "send-keys", "-t", paneId, "Enter"], {
stdout: "ignore",
stderr: "ignore",
})
await enterProc.exited
return enterProc.exitCode === 0
} catch {
return false
}
@@ -67,18 +80,11 @@ export async function isTmuxAvailable(): Promise<boolean> {
export function analyzePaneContent(content: string | null): { confidence: number } {
if (!content) return { confidence: 0 }
// Simple heuristic: check for common CLI prompts or output
// Reference implementation had more logic, but for now simple check is okay
// Ideally, I should port the reference logic.
// Reference logic:
// if (content.includes("opencode")) confidence += 0.5
// if (content.includes("How can I help you?")) confidence += 0.8
// etc.
let confidence = 0
if (content.includes("opencode")) confidence += 0.3
if (content.includes("How can I help you?")) confidence += 0.5
if (content.includes("Type /help")) confidence += 0.2
if (content.includes("Ask anything...")) confidence += 0.5
if (content.includes("Run /help")) confidence += 0.2
return { confidence: Math.min(1, confidence) }
}

View File

@@ -1,6 +1,16 @@
import type { OpenClawConfig, OpenClawGateway, OpenClawHook } from "../config/schema/openclaw"
import type {
OpenClawConfig,
OpenClawGateway,
OpenClawHook,
OpenClawReplyListenerConfig,
} from "../config/schema/openclaw"
export type { OpenClawConfig, OpenClawGateway, OpenClawHook }
export type {
OpenClawConfig,
OpenClawGateway,
OpenClawHook,
OpenClawReplyListenerConfig,
}
export interface OpenClawContext {
sessionId?: string