From 96f4b3b56c568f5f899a25211d8aa799501a5e13 Mon Sep 17 00:00:00 2001 From: YeonGyu-Kim Date: Mon, 16 Mar 2026 14:43:47 +0900 Subject: [PATCH] task: implement sdk runner --- package.json | 5 +- packages/sdk/README.md | 27 +++ packages/sdk/package.json | 23 +++ packages/sdk/src/create-omo-runner.d.ts | 3 + packages/sdk/src/create-omo-runner.test.ts | 106 ++++++++++ packages/sdk/src/create-omo-runner.ts | 186 +++++++++++++++++ packages/sdk/src/index.d.ts | 8 + packages/sdk/src/index.ts | 8 + packages/sdk/src/types.d.ts | 95 +++++++++ packages/sdk/src/types.ts | 95 +++++++++ packages/sdk/tsconfig.json | 24 +++ src/cli/run/event-handlers.ts | 57 ++++-- src/cli/run/event-stream-processor.ts | 221 ++++++++++++++++++++- src/cli/run/index.ts | 10 +- src/cli/run/run-engine.test.ts | 142 +++++++++++++ src/cli/run/run-engine.ts | 197 ++++++++++++++++++ src/cli/run/runner.ts | 116 +++-------- src/cli/run/server-connection.ts | 39 ++-- src/cli/run/session-resolver.ts | 27 ++- src/cli/run/types.ts | 98 +++++++++ 20 files changed, 1350 insertions(+), 137 deletions(-) create mode 100644 packages/sdk/README.md create mode 100644 packages/sdk/package.json create mode 100644 packages/sdk/src/create-omo-runner.d.ts create mode 100644 packages/sdk/src/create-omo-runner.test.ts create mode 100644 packages/sdk/src/create-omo-runner.ts create mode 100644 packages/sdk/src/index.d.ts create mode 100644 packages/sdk/src/index.ts create mode 100644 packages/sdk/src/types.d.ts create mode 100644 packages/sdk/src/types.ts create mode 100644 packages/sdk/tsconfig.json create mode 100644 src/cli/run/run-engine.test.ts create mode 100644 src/cli/run/run-engine.ts diff --git a/package.json b/package.json index 952fdbcfc..abb6a7bf6 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ }, "scripts": { "build": "bun build src/index.ts --outdir dist --target bun --format esm --external @ast-grep/napi && tsc --emitDeclarationOnly && bun build src/cli/index.ts --outdir dist/cli --target bun --format esm --external @ast-grep/napi && bun run build:schema", + "build:sdk": "cd packages/sdk && bun run build", "build:all": "bun run build && bun run build:binaries", "build:binaries": "bun run script/build-binaries.ts", "build:schema": "bun run script/build-schema.ts", @@ -30,7 +31,9 @@ "postinstall": "node postinstall.mjs", "prepublishOnly": "bun run clean && bun run build", "typecheck": "tsc --noEmit", - "test": "bun test" + "typecheck:sdk": "cd packages/sdk && bun run typecheck", + "test": "bun test", + "test:sdk": "cd packages/sdk && bun run test" }, "keywords": [ "opencode", diff --git a/packages/sdk/README.md b/packages/sdk/README.md new file mode 100644 index 000000000..4364e7c80 --- /dev/null +++ b/packages/sdk/README.md @@ -0,0 +1,27 @@ +# @oh-my-openagent/sdk + +Programmatic runner for starting or attaching to an OpenCode server, running oh-my-openagent sessions, and consuming normalized lifecycle events. + +## `run()` + +```ts +import { createOmoRunner } from "@oh-my-openagent/sdk" + +const runner = createOmoRunner({ directory: process.cwd(), agent: "prometheus" }) +const result = await runner.run("Plan the next release") +await runner.close() +``` + +## `stream()` + +```ts +import { createOmoRunner } from "@oh-my-openagent/sdk" + +const runner = createOmoRunner({ directory: process.cwd() }) + +for await (const event of runner.stream("Investigate the build failure")) { + console.log(event.type) +} + +await runner.close() +``` diff --git a/packages/sdk/package.json b/packages/sdk/package.json new file mode 100644 index 000000000..9a37575ea --- /dev/null +++ b/packages/sdk/package.json @@ -0,0 +1,23 @@ +{ + "name": "@oh-my-openagent/sdk", + "version": "0.0.0", + "private": true, + "type": "module", + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "files": [ + "dist", + "README.md" + ], + "scripts": { + "build": "rm -rf dist && bun build ./src/index.ts --outdir dist --target bun --format esm && cp ./src/*.d.ts ./dist/", + "test": "bun test", + "typecheck": "tsc -p tsconfig.json --noEmit" + } +} diff --git a/packages/sdk/src/create-omo-runner.d.ts b/packages/sdk/src/create-omo-runner.d.ts new file mode 100644 index 000000000..d184f6299 --- /dev/null +++ b/packages/sdk/src/create-omo-runner.d.ts @@ -0,0 +1,3 @@ +import type { CreateOmoRunnerOptions, OmoRunner } from "./types" + +export declare function createOmoRunner(options: CreateOmoRunnerOptions): OmoRunner diff --git a/packages/sdk/src/create-omo-runner.test.ts b/packages/sdk/src/create-omo-runner.test.ts new file mode 100644 index 000000000..d637e5d5a --- /dev/null +++ b/packages/sdk/src/create-omo-runner.test.ts @@ -0,0 +1,106 @@ +import { beforeEach, describe, expect, it, mock } from "bun:test" +import type { ServerConnection } from "../../../src/cli/run/types" + +const mockCreateServerConnection = mock( + async (): Promise => ({ + client: {} as never, + cleanup: mock(() => {}), + }), +) + +const mockExecuteRunSession = mock(async (_options: unknown) => ({ + exitCode: 0, + sessionId: "ses_runner", + result: { + sessionId: "ses_runner", + success: true, + durationMs: 10, + messageCount: 1, + summary: "done", + }, +})) + +mock.module("../../../src/cli/run/server-connection", () => ({ + createServerConnection: mockCreateServerConnection, +})) + +mock.module("../../../src/cli/run/run-engine", () => ({ + executeRunSession: mockExecuteRunSession, +})) + +const { createOmoRunner } = await import("./create-omo-runner") + +describe("createOmoRunner", () => { + beforeEach(() => { + mockCreateServerConnection.mockClear() + mockExecuteRunSession.mockClear() + }) + + it("reuses the same connection and enables question-aware execution", async () => { + const runner = createOmoRunner({ + directory: "/repo", + agent: "atlas", + }) + + const first = await runner.run("first") + const second = await runner.run("second", { agent: "prometheus" }) + + expect(first.summary).toBe("done") + expect(second.summary).toBe("done") + expect(mockCreateServerConnection).toHaveBeenCalledTimes(1) + expect(mockExecuteRunSession).toHaveBeenNthCalledWith(1, expect.objectContaining({ + directory: "/repo", + agent: "atlas", + questionPermission: "allow", + questionToolEnabled: true, + renderOutput: false, + })) + expect(mockExecuteRunSession).toHaveBeenNthCalledWith(2, expect.objectContaining({ + agent: "prometheus", + })) + await runner.close() + }) + + it("streams normalized events", async () => { + mockExecuteRunSession.mockImplementationOnce(async (options: { eventObserver?: { onEvent?: (event: unknown) => Promise } }) => { + await options.eventObserver?.onEvent?.({ + type: "session.started", + sessionId: "ses_runner", + agent: "Atlas (Plan Executor)", + resumed: false, + }) + await options.eventObserver?.onEvent?.({ + type: "session.completed", + sessionId: "ses_runner", + result: { + sessionId: "ses_runner", + success: true, + durationMs: 10, + messageCount: 1, + summary: "done", + }, + }) + return { + exitCode: 0, + sessionId: "ses_runner", + result: { + sessionId: "ses_runner", + success: true, + durationMs: 10, + messageCount: 1, + summary: "done", + }, + } + }) + + const runner = createOmoRunner({ directory: "/repo" }) + const seenTypes: string[] = [] + + for await (const event of runner.stream("stream")) { + seenTypes.push(event.type) + } + + expect(seenTypes).toEqual(["session.started", "session.completed"]) + await runner.close() + }) +}) diff --git a/packages/sdk/src/create-omo-runner.ts b/packages/sdk/src/create-omo-runner.ts new file mode 100644 index 000000000..48782c871 --- /dev/null +++ b/packages/sdk/src/create-omo-runner.ts @@ -0,0 +1,186 @@ +import { createServerConnection } from "../../../src/cli/run/server-connection" +import { executeRunSession } from "../../../src/cli/run/run-engine" +import type { RunEventObserver, ServerConnection } from "../../../src/cli/run/types" +import type { + CreateOmoRunnerOptions, + OmoRunInvocationOptions, + OmoRunner, + RunResult, + StreamEvent, +} from "./types" + +class AsyncEventQueue implements AsyncIterable { + private readonly values: T[] = [] + private readonly waiters: Array<(value: IteratorResult) => void> = [] + private closed = false + + push(value: T): void { + if (this.closed) return + const waiter = this.waiters.shift() + if (waiter) { + waiter({ value, done: false }) + return + } + this.values.push(value) + } + + close(): void { + if (this.closed) return + this.closed = true + while (this.waiters.length > 0) { + const waiter = this.waiters.shift() + waiter?.({ value: undefined, done: true }) + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: () => { + const value = this.values.shift() + if (value !== undefined) { + return Promise.resolve({ value, done: false }) + } + if (this.closed) { + return Promise.resolve({ value: undefined, done: true }) + } + return new Promise>((resolve) => { + this.waiters.push(resolve) + }) + }, + } + } +} + +export function createOmoRunner(options: CreateOmoRunnerOptions): OmoRunner { + const { + directory, + agent, + port, + model, + attach, + includeRawEvents = false, + onIdle, + onQuestion, + onComplete, + onError, + } = options + let connectionPromise: Promise | null = null + let closed = false + let activeRun: Promise | null = null + + const silentLogger = { + log: () => {}, + error: () => {}, + } + + const ensureConnection = async (): Promise => { + if (closed) { + throw new Error("Runner is closed") + } + if (connectionPromise === null) { + const controller = new AbortController() + connectionPromise = createServerConnection({ + port, + attach, + signal: controller.signal, + logger: silentLogger, + }) + } + return await connectionPromise + } + + const createObserver = ( + queue?: AsyncEventQueue, + ): RunEventObserver => ({ + includeRawEvents, + onEvent: async (event) => { + queue?.push(event as StreamEvent) + }, + onIdle, + onQuestion, + onComplete, + onError, + }) + + const runOnce = async ( + prompt: string, + invocationOptions: OmoRunInvocationOptions | undefined, + observer: RunEventObserver, + ): Promise => { + if (activeRun !== null) { + throw new Error("Runner already has an active operation") + } + + const connection = await ensureConnection() + const execution = executeRunSession({ + client: connection.client, + message: prompt, + directory, + agent: invocationOptions?.agent ?? agent, + model: invocationOptions?.model ?? model, + sessionId: invocationOptions?.sessionId, + questionPermission: "allow", + questionToolEnabled: true, + renderOutput: false, + logger: silentLogger, + eventObserver: observer, + signal: invocationOptions?.signal, + }) + activeRun = execution + + const abortHandler = () => { + void observer.onError?.({ + type: "session.error", + sessionId: invocationOptions?.sessionId ?? "", + error: "Aborted by caller", + }) + } + invocationOptions?.signal?.addEventListener("abort", abortHandler, { once: true }) + + try { + const { result } = await execution + return result + } finally { + invocationOptions?.signal?.removeEventListener("abort", abortHandler) + activeRun = null + } + } + + return { + async run(prompt, invocationOptions) { + return await runOnce(prompt, invocationOptions, createObserver()) + }, + stream(prompt, invocationOptions) { + const queue = new AsyncEventQueue() + const execution = runOnce(prompt, invocationOptions, createObserver(queue)) + .catch((error) => { + queue.push({ + type: "session.error", + sessionId: invocationOptions?.sessionId ?? "", + error: error instanceof Error ? error.message : String(error), + }) + }) + .finally(() => { + queue.close() + }) + + return { + async *[Symbol.asyncIterator]() { + try { + for await (const event of queue) { + yield event + } + } finally { + await execution + } + }, + } + }, + async close() { + closed = true + const connection = await connectionPromise + connection?.cleanup() + connectionPromise = null + }, + } +} diff --git a/packages/sdk/src/index.d.ts b/packages/sdk/src/index.d.ts new file mode 100644 index 000000000..4898d43ac --- /dev/null +++ b/packages/sdk/src/index.d.ts @@ -0,0 +1,8 @@ +export { createOmoRunner } from "./create-omo-runner" +export type { + CreateOmoRunnerOptions, + OmoRunInvocationOptions, + OmoRunner, + RunResult, + StreamEvent, +} from "./types" diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts new file mode 100644 index 000000000..4898d43ac --- /dev/null +++ b/packages/sdk/src/index.ts @@ -0,0 +1,8 @@ +export { createOmoRunner } from "./create-omo-runner" +export type { + CreateOmoRunnerOptions, + OmoRunInvocationOptions, + OmoRunner, + RunResult, + StreamEvent, +} from "./types" diff --git a/packages/sdk/src/types.d.ts b/packages/sdk/src/types.d.ts new file mode 100644 index 000000000..fbff4f87c --- /dev/null +++ b/packages/sdk/src/types.d.ts @@ -0,0 +1,95 @@ +export interface RunResult { + sessionId: string + success: boolean + durationMs: number + messageCount: number + summary: string +} + +export type StreamEvent = + | { + type: "session.started" + sessionId: string + agent: string + resumed: boolean + model?: { providerID: string; modelID: string } + } + | { + type: "message.delta" + sessionId: string + messageId?: string + partId?: string + delta: string + } + | { + type: "message.completed" + sessionId: string + messageId?: string + partId?: string + text: string + } + | { + type: "tool.started" + sessionId: string + toolName: string + input?: unknown + } + | { + type: "tool.completed" + sessionId: string + toolName: string + output?: string + status: "completed" | "error" + } + | { + type: "session.idle" + sessionId: string + } + | { + type: "session.question" + sessionId: string + toolName: string + input?: unknown + question?: string + } + | { + type: "session.completed" + sessionId: string + result: RunResult + } + | { + type: "session.error" + sessionId: string + error: string + } + | { + type: "raw" + sessionId: string + payload: unknown + } + +export interface OmoRunInvocationOptions { + sessionId?: string + signal?: AbortSignal + agent?: string + model?: string +} + +export interface CreateOmoRunnerOptions { + directory: string + agent?: string + port?: number + model?: string + attach?: string + includeRawEvents?: boolean + onIdle?: (event: Extract) => void | Promise + onQuestion?: (event: Extract) => void | Promise + onComplete?: (event: Extract) => void | Promise + onError?: (event: Extract) => void | Promise +} + +export interface OmoRunner { + run(prompt: string, options?: OmoRunInvocationOptions): Promise + stream(prompt: string, options?: OmoRunInvocationOptions): AsyncIterable + close(): Promise +} diff --git a/packages/sdk/src/types.ts b/packages/sdk/src/types.ts new file mode 100644 index 000000000..fbff4f87c --- /dev/null +++ b/packages/sdk/src/types.ts @@ -0,0 +1,95 @@ +export interface RunResult { + sessionId: string + success: boolean + durationMs: number + messageCount: number + summary: string +} + +export type StreamEvent = + | { + type: "session.started" + sessionId: string + agent: string + resumed: boolean + model?: { providerID: string; modelID: string } + } + | { + type: "message.delta" + sessionId: string + messageId?: string + partId?: string + delta: string + } + | { + type: "message.completed" + sessionId: string + messageId?: string + partId?: string + text: string + } + | { + type: "tool.started" + sessionId: string + toolName: string + input?: unknown + } + | { + type: "tool.completed" + sessionId: string + toolName: string + output?: string + status: "completed" | "error" + } + | { + type: "session.idle" + sessionId: string + } + | { + type: "session.question" + sessionId: string + toolName: string + input?: unknown + question?: string + } + | { + type: "session.completed" + sessionId: string + result: RunResult + } + | { + type: "session.error" + sessionId: string + error: string + } + | { + type: "raw" + sessionId: string + payload: unknown + } + +export interface OmoRunInvocationOptions { + sessionId?: string + signal?: AbortSignal + agent?: string + model?: string +} + +export interface CreateOmoRunnerOptions { + directory: string + agent?: string + port?: number + model?: string + attach?: string + includeRawEvents?: boolean + onIdle?: (event: Extract) => void | Promise + onQuestion?: (event: Extract) => void | Promise + onComplete?: (event: Extract) => void | Promise + onError?: (event: Extract) => void | Promise +} + +export interface OmoRunner { + run(prompt: string, options?: OmoRunInvocationOptions): Promise + stream(prompt: string, options?: OmoRunInvocationOptions): AsyncIterable + close(): Promise +} diff --git a/packages/sdk/tsconfig.json b/packages/sdk/tsconfig.json new file mode 100644 index 000000000..b49c3db4b --- /dev/null +++ b/packages/sdk/tsconfig.json @@ -0,0 +1,24 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "ESNext", + "moduleResolution": "bundler", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "lib": ["ESNext"], + "types": ["bun-types"], + "rootDir": "../.." + }, + "include": [ + "src/**/*.ts", + "../../src/**/*.ts" + ], + "exclude": [ + "dist", + "**/*.test.ts", + "../../src/**/*.test.ts" + ] +} diff --git a/src/cli/run/event-handlers.ts b/src/cli/run/event-handlers.ts index ba6559cdd..86e5575a6 100644 --- a/src/cli/run/event-handlers.ts +++ b/src/cli/run/event-handlers.ts @@ -51,6 +51,10 @@ function getDeltaMessageId(props?: { return props?.messageID } +function shouldRender(ctx: RunContext): boolean { + return ctx.renderOutput !== false +} + function renderCompletionMetaLine(state: EventState, messageID: string): void { if (state.completionMetaPrintedByMessageId[messageID]) return @@ -95,7 +99,9 @@ export function handleSessionError(ctx: RunContext, payload: EventPayload, state if (getSessionId(props) === ctx.sessionID) { state.mainSessionError = true state.lastError = serializeError(props?.error) - console.error(pc.red(`\n[session.error] ${state.lastError}`)) + if (shouldRender(ctx)) { + console.error(pc.red(`\n[session.error] ${state.lastError}`)) + } } } @@ -122,6 +128,11 @@ export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload, } if (part.type === "reasoning") { + if (!shouldRender(ctx)) { + state.lastReasoningText = part.text ?? "" + state.hasReceivedMeaningfulWork = true + return + } ensureThinkBlockOpen(state) const reasoningText = part.text ?? "" const newText = reasoningText.slice(state.lastReasoningText.length) @@ -139,15 +150,17 @@ export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload, if (part.type === "text" && part.text) { const newText = part.text.slice(state.lastPartText.length) - if (newText) { + if (newText && shouldRender(ctx)) { const padded = writePaddedText(newText, state.textAtLineStart) process.stdout.write(padded.output) state.textAtLineStart = padded.atLineStart + } + if (newText) { state.hasReceivedMeaningfulWork = true } state.lastPartText = part.text - if (part.time?.end) { + if (part.time?.end && shouldRender(ctx)) { const messageID = part.messageID ?? state.currentMessageId if (messageID) { renderCompletionMetaLine(state, messageID) @@ -180,6 +193,11 @@ export function handleMessagePartDelta(ctx: RunContext, payload: EventPayload, s if (!delta) return if (partType === "reasoning") { + if (!shouldRender(ctx)) { + state.lastReasoningText += delta + state.hasReceivedMeaningfulWork = true + return + } ensureThinkBlockOpen(state) const padded = writePaddedText(delta, state.thinkingAtLineStart) process.stdout.write(pc.dim(padded.output)) @@ -191,9 +209,11 @@ export function handleMessagePartDelta(ctx: RunContext, payload: EventPayload, s closeThinkBlockIfNeeded(state) - const padded = writePaddedText(delta, state.textAtLineStart) - process.stdout.write(padded.output) - state.textAtLineStart = padded.atLineStart + if (shouldRender(ctx)) { + const padded = writePaddedText(delta, state.textAtLineStart) + process.stdout.write(padded.output) + state.textAtLineStart = padded.atLineStart + } state.lastPartText += delta state.hasReceivedMeaningfulWork = true } @@ -209,16 +229,18 @@ function handleToolPart( if (status === "running") { if (state.currentTool !== null) return state.currentTool = toolName - const header = formatToolHeader(toolName, part.state?.input ?? {}) - const suffix = header.description ? ` ${pc.dim(header.description)}` : "" state.hasReceivedMeaningfulWork = true - process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`) + if (shouldRender(_ctx)) { + const header = formatToolHeader(toolName, part.state?.input ?? {}) + const suffix = header.description ? ` ${pc.dim(header.description)}` : "" + process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`) + } } if (status === "completed" || status === "error") { if (state.currentTool === null) return const output = part.state?.output || "" - if (output.trim()) { + if (output.trim() && shouldRender(_ctx)) { process.stdout.write(pc.dim(` ${displayChars.treeEnd} output \n`)) const padded = writePaddedText(output, true) process.stdout.write(pc.dim(padded.output + (padded.atLineStart ? "" : " "))) @@ -271,7 +293,9 @@ export function handleMessageUpdated(ctx: RunContext, payload: EventPayload, sta state.currentAgent = agent state.currentModel = model state.currentVariant = variant - renderAgentHeader(agent, model, variant, state.agentColorsByName) + if (shouldRender(ctx)) { + renderAgentHeader(agent, model, variant, state.agentColorsByName) + } } } @@ -287,11 +311,12 @@ export function handleToolExecute(ctx: RunContext, payload: EventPayload, state: const toolName = props?.name || "unknown" state.currentTool = toolName - const header = formatToolHeader(toolName, props?.input ?? {}) - const suffix = header.description ? ` ${pc.dim(header.description)}` : "" - state.hasReceivedMeaningfulWork = true - process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`) + if (shouldRender(ctx)) { + const header = formatToolHeader(toolName, props?.input ?? {}) + const suffix = header.description ? ` ${pc.dim(header.description)}` : "" + process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`) + } } export function handleToolResult(ctx: RunContext, payload: EventPayload, state: EventState): void { @@ -305,7 +330,7 @@ export function handleToolResult(ctx: RunContext, payload: EventPayload, state: if (state.currentTool === null) return const output = props?.output || "" - if (output.trim()) { + if (output.trim() && shouldRender(ctx)) { process.stdout.write(pc.dim(` ${displayChars.treeEnd} output \n`)) const padded = writePaddedText(output, true) process.stdout.write(pc.dim(padded.output + (padded.atLineStart ? "" : " "))) diff --git a/src/cli/run/event-stream-processor.ts b/src/cli/run/event-stream-processor.ts index 757c1a447..1edd02ab0 100644 --- a/src/cli/run/event-stream-processor.ts +++ b/src/cli/run/event-stream-processor.ts @@ -1,5 +1,19 @@ import pc from "picocolors" -import type { RunContext, EventPayload } from "./types" +import type { + EventPayload, + MessagePartDeltaProps, + MessagePartUpdatedProps, + RunContext, + RunEventObserver, + SessionErrorEvent, + SessionIdleEvent, + SessionQuestionEvent, + StreamEvent, + ToolCompletedEvent, + ToolExecuteProps, + ToolResultProps, + ToolStartedEvent, +} from "./types" import type { EventState } from "./event-state" import { logEventVerbose } from "./event-formatting" import { @@ -14,10 +28,133 @@ import { handleTuiToast, } from "./event-handlers" +const QUESTION_TOOL_NAMES = new Set(["question", "ask_user_question", "askuserquestion"]) + +async function emitObservedEvent( + observer: RunEventObserver | undefined, + event: StreamEvent, +): Promise { + if (!observer) return + + await observer.onEvent?.(event) + if (event.type === "session.idle") { + await observer.onIdle?.(event as SessionIdleEvent) + } + if (event.type === "session.question") { + await observer.onQuestion?.(event as SessionQuestionEvent) + } + if (event.type === "session.error") { + await observer.onError?.(event as SessionErrorEvent) + } +} + +function getEventSessionId(payload: EventPayload): string | undefined { + const props = payload.properties as Record | undefined + if (!props) return undefined + if (typeof props.sessionID === "string") return props.sessionID + if (typeof props.sessionId === "string") return props.sessionId + const info = props.info as Record | undefined + if (typeof info?.sessionID === "string") return info.sessionID + if (typeof info?.sessionId === "string") return info.sessionId + const part = props.part as Record | undefined + if (typeof part?.sessionID === "string") return part.sessionID + if (typeof part?.sessionId === "string") return part.sessionId + return undefined +} + +function getQuestionText(input: unknown): string | undefined { + const args = input as { questions?: Array<{ question?: unknown }> } | undefined + const question = args?.questions?.[0]?.question + return typeof question === "string" && question.length > 0 ? question : undefined +} + +function getToolStartFromPayload( + payload: EventPayload, + sessionId: string, + fallbackToolName: string, +): ToolStartedEvent | SessionQuestionEvent | undefined { + if (payload.type === "tool.execute") { + const props = payload.properties as ToolExecuteProps | undefined + const toolName = props?.name ?? fallbackToolName + if (QUESTION_TOOL_NAMES.has(toolName.toLowerCase())) { + return { + type: "session.question", + sessionId, + toolName, + input: props?.input, + question: getQuestionText(props?.input), + } + } + return { + type: "tool.started", + sessionId, + toolName, + input: props?.input, + } + } + + if (payload.type === "message.part.updated") { + const props = payload.properties as MessagePartUpdatedProps | undefined + const toolName = props?.part?.tool ?? props?.part?.name ?? fallbackToolName + if (!toolName) return undefined + const input = props?.part?.state?.input + if (QUESTION_TOOL_NAMES.has(toolName.toLowerCase())) { + return { + type: "session.question", + sessionId, + toolName, + input, + question: getQuestionText(input), + } + } + return { + type: "tool.started", + sessionId, + toolName, + input, + } + } + + return undefined +} + +function getToolCompletedFromPayload( + payload: EventPayload, + sessionId: string, + fallbackToolName: string, +): ToolCompletedEvent | undefined { + if (payload.type === "tool.result") { + const props = payload.properties as ToolResultProps | undefined + return { + type: "tool.completed", + sessionId, + toolName: props?.name ?? fallbackToolName, + output: props?.output, + status: "completed", + } + } + + if (payload.type === "message.part.updated") { + const props = payload.properties as MessagePartUpdatedProps | undefined + const status = props?.part?.state?.status + if (status !== "completed" && status !== "error") return undefined + return { + type: "tool.completed", + sessionId, + toolName: props?.part?.tool ?? props?.part?.name ?? fallbackToolName, + output: props?.part?.state?.output, + status, + } + } + + return undefined +} + export async function processEvents( ctx: RunContext, stream: AsyncIterable, - state: EventState + state: EventState, + observer?: RunEventObserver, ): Promise { for await (const event of stream) { if (ctx.abortController.signal.aborted) break @@ -37,6 +174,18 @@ export async function processEvents( // Update last event timestamp for watchdog detection state.lastEventTimestamp = Date.now() + const previousIdle = state.mainSessionIdle + const previousError = state.mainSessionError + const previousTool = state.currentTool + const sessionId = getEventSessionId(payload) ?? ctx.sessionID + + if (observer?.includeRawEvents) { + await emitObservedEvent(observer, { + type: "raw", + sessionId, + payload, + }) + } handleSessionError(ctx, payload, state) handleSessionIdle(ctx, payload, state) @@ -47,8 +196,74 @@ export async function processEvents( handleToolExecute(ctx, payload, state) handleToolResult(ctx, payload, state) handleTuiToast(ctx, payload, state) + + if (!previousIdle && state.mainSessionIdle) { + await emitObservedEvent(observer, { + type: "session.idle", + sessionId: ctx.sessionID, + }) + } + + if (!previousError && state.mainSessionError) { + await emitObservedEvent(observer, { + type: "session.error", + sessionId: ctx.sessionID, + error: state.lastError ?? "Unknown session error", + }) + } + + if (payload.type === "message.part.delta") { + const props = payload.properties as MessagePartDeltaProps | undefined + if ( + sessionId === ctx.sessionID + && props?.field === "text" + && typeof props.delta === "string" + && props.delta.length > 0 + ) { + await emitObservedEvent(observer, { + type: "message.delta", + sessionId: ctx.sessionID, + messageId: props.messageID, + partId: props.partID, + delta: props.delta, + }) + } + } + + if (payload.type === "message.part.updated") { + const props = payload.properties as MessagePartUpdatedProps | undefined + if ( + sessionId === ctx.sessionID + && props?.part?.type === "text" + && typeof props.part.text === "string" + && props.part.time?.end + ) { + await emitObservedEvent(observer, { + type: "message.completed", + sessionId: ctx.sessionID, + messageId: props.part.messageID, + partId: props.part.id, + text: props.part.text, + }) + } + } + + if (previousTool === null && state.currentTool !== null && sessionId === ctx.sessionID) { + const toolEvent = getToolStartFromPayload(payload, ctx.sessionID, state.currentTool) + if (toolEvent) { + await emitObservedEvent(observer, toolEvent) + } + } + + if (previousTool !== null && state.currentTool === null && sessionId === ctx.sessionID) { + const toolEvent = getToolCompletedFromPayload(payload, ctx.sessionID, previousTool) + if (toolEvent) { + await emitObservedEvent(observer, toolEvent) + } + } } catch (err) { - console.error(pc.red(`[event error] ${err}`)) + const error = ctx.logger?.error ?? console.error + error(pc.red(`[event error] ${err}`)) } } } diff --git a/src/cli/run/index.ts b/src/cli/run/index.ts index 65c4d9330..cdd5d7c28 100644 --- a/src/cli/run/index.ts +++ b/src/cli/run/index.ts @@ -3,8 +3,16 @@ export { resolveRunAgent } from "./agent-resolver" export { resolveRunModel } from "./model-resolver" export { createServerConnection } from "./server-connection" export { resolveSession } from "./session-resolver" +export { executeRunSession, waitForEventProcessorShutdown } from "./run-engine" export { createJsonOutputManager } from "./json-output" export { executeOnCompleteHook } from "./on-complete-hook" export { createEventState, processEvents, serializeError } from "./events" export type { EventState } from "./events" -export type { RunOptions, RunContext, RunResult, ServerConnection } from "./types" +export type { + RunContext, + RunEventObserver, + RunOptions, + RunResult, + ServerConnection, + StreamEvent, +} from "./types" diff --git a/src/cli/run/run-engine.test.ts b/src/cli/run/run-engine.test.ts new file mode 100644 index 000000000..a32a98fed --- /dev/null +++ b/src/cli/run/run-engine.test.ts @@ -0,0 +1,142 @@ +/// + +import { describe, expect, it, mock } from "bun:test" +import { executeRunSession } from "./run-engine" +import type { OpencodeClient, StreamEvent } from "./types" + +function toAsyncIterable(values: unknown[]): AsyncIterable { + return { + async *[Symbol.asyncIterator]() { + for (const value of values) { + yield value + } + }, + } +} + +describe("executeRunSession", () => { + it("allows SDK sessions to enable questions and emits normalized events", async () => { + const seenEvents: StreamEvent[] = [] + const client = { + session: { + create: mock(() => Promise.resolve({ data: { id: "ses_sdk" } })), + promptAsync: mock(() => Promise.resolve({})), + status: mock(() => Promise.resolve({ data: { ses_sdk: { type: "idle" } } })), + todo: mock(() => Promise.resolve({ data: [] })), + children: mock(() => Promise.resolve({ data: [] })), + }, + event: { + subscribe: mock(() => Promise.resolve({ + stream: toAsyncIterable([ + { + type: "message.updated", + properties: { + info: { + id: "msg_1", + sessionID: "ses_sdk", + role: "assistant", + agent: "Prometheus (Plan Builder)", + }, + }, + }, + { + type: "tool.execute", + properties: { + sessionID: "ses_sdk", + name: "question", + input: { + questions: [{ question: "Which agent should run?" }], + }, + }, + }, + { + type: "message.part.delta", + properties: { + sessionID: "ses_sdk", + messageID: "msg_1", + partID: "part_1", + field: "text", + delta: "hello", + }, + }, + { + type: "tool.result", + properties: { + sessionID: "ses_sdk", + name: "question", + output: "waiting", + }, + }, + { + type: "message.part.updated", + properties: { + part: { + id: "part_1", + sessionID: "ses_sdk", + messageID: "msg_1", + type: "text", + text: "hello", + time: { end: 1 }, + }, + }, + }, + { + type: "session.status", + properties: { + sessionID: "ses_sdk", + status: { type: "idle" }, + }, + }, + ]), + })), + }, + } as unknown as OpencodeClient + + const result = await executeRunSession({ + client, + directory: "/repo", + message: "hello", + agent: "prometheus", + questionPermission: "allow", + questionToolEnabled: true, + renderOutput: false, + logger: { log: () => {}, error: () => {} }, + pluginConfig: {}, + pollOptions: { + pollIntervalMs: 1, + minStabilizationMs: 0, + }, + eventObserver: { + onEvent: async (event) => { + seenEvents.push(event) + }, + }, + }) + + expect(result.exitCode).toBe(0) + expect(result.result.success).toBe(true) + expect(client.session.create).toHaveBeenCalledWith({ + body: { + title: "oh-my-opencode run", + permission: [ + { permission: "question", action: "allow", pattern: "*" }, + ], + }, + query: { directory: "/repo" }, + }) + expect(client.session.promptAsync).toHaveBeenCalledWith({ + path: { id: "ses_sdk" }, + body: { + agent: "Prometheus (Plan Builder)", + tools: { question: true }, + parts: [{ type: "text", text: "hello" }], + }, + query: { directory: "/repo" }, + }) + expect(seenEvents.map((event) => event.type)).toContain("session.started") + expect(seenEvents.map((event) => event.type)).toContain("session.question") + expect(seenEvents.map((event) => event.type)).toContain("message.delta") + expect(seenEvents.map((event) => event.type)).toContain("message.completed") + expect(seenEvents.map((event) => event.type)).toContain("session.completed") + }) +}) diff --git a/src/cli/run/run-engine.ts b/src/cli/run/run-engine.ts new file mode 100644 index 000000000..0e5551e65 --- /dev/null +++ b/src/cli/run/run-engine.ts @@ -0,0 +1,197 @@ +import pc from "picocolors" +import type { OhMyOpenCodeConfig } from "../../config" +import { loadPluginConfig } from "../../plugin-config" +import { createEventState, processEvents, serializeError } from "./events" +import { loadAgentProfileColors } from "./agent-profile-colors" +import { pollForCompletion, type PollOptions } from "./poll-for-completion" +import { resolveRunAgent } from "./agent-resolver" +import { resolveRunModel } from "./model-resolver" +import { resolveSession } from "./session-resolver" +import type { + OpencodeClient, + RunContext, + RunEventObserver, + RunLogger, + RunResult, + SessionCompletedEvent, +} from "./types" + +const EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS = 2_000 + +export interface ExecuteRunSessionOptions { + client: OpencodeClient + message: string + directory: string + agent?: string + model?: string + sessionId?: string + verbose?: boolean + questionPermission?: "allow" | "deny" + questionToolEnabled?: boolean + pluginConfig?: OhMyOpenCodeConfig + logger?: RunLogger + renderOutput?: boolean + eventObserver?: RunEventObserver + pollOptions?: PollOptions + signal?: AbortSignal +} + +export interface ExecuteRunSessionResult { + exitCode: number + result: RunResult + sessionId: string +} + +export async function waitForEventProcessorShutdown( + eventProcessor: Promise, + timeoutMs = EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS, +): Promise { + const completed = await Promise.race([ + eventProcessor.then(() => true), + new Promise((resolve) => setTimeout(() => resolve(false), timeoutMs)), + ]) + + void completed +} + +async function emitCompletionEvent( + observer: RunEventObserver | undefined, + result: RunResult, +): Promise { + if (!observer) return + + const event: SessionCompletedEvent = { + type: "session.completed", + sessionId: result.sessionId, + result, + } + await observer.onEvent?.(event) + await observer.onComplete?.(event) +} + +export async function executeRunSession( + options: ExecuteRunSessionOptions, +): Promise { + const { + client, + message, + directory, + agent, + model, + sessionId, + verbose = false, + questionPermission = "deny", + questionToolEnabled = false, + pluginConfig = loadPluginConfig(directory, { command: "run" }), + logger, + renderOutput = true, + eventObserver, + pollOptions, + signal, + } = options + const log = logger?.log ?? console.log + + const resolvedAgent = resolveRunAgent({ message, agent }, pluginConfig) + const resolvedModel = resolveRunModel(model) + const abortController = new AbortController() + const startTime = Date.now() + const forwardAbort = () => abortController.abort() + signal?.addEventListener("abort", forwardAbort, { once: true }) + + try { + const resolvedSessionId = await resolveSession({ + client, + sessionId, + directory, + questionPermission, + logger, + }) + + if (renderOutput) { + log(pc.dim(`Session: ${resolvedSessionId}`)) + if (resolvedModel) { + log(pc.dim(`Model: ${resolvedModel.providerID}/${resolvedModel.modelID}`)) + } + } + + await eventObserver?.onEvent?.({ + type: "session.started", + sessionId: resolvedSessionId, + agent: resolvedAgent, + resumed: Boolean(sessionId), + ...(resolvedModel ? { model: resolvedModel } : {}), + }) + + const ctx: RunContext = { + client, + sessionID: resolvedSessionId, + directory, + abortController, + verbose, + renderOutput, + logger, + } + const events = await client.event.subscribe({ query: { directory } }) + const eventState = createEventState() + if (renderOutput) { + eventState.agentColorsByName = await loadAgentProfileColors(client) + } + const eventProcessor = processEvents( + ctx, + events.stream, + eventState, + eventObserver, + ).catch(() => {}) + + await client.session.promptAsync({ + path: { id: resolvedSessionId }, + body: { + agent: resolvedAgent, + ...(resolvedModel ? { model: resolvedModel } : {}), + tools: { + question: questionToolEnabled, + }, + parts: [{ type: "text", text: message }], + }, + query: { directory }, + }) + + const exitCode = await pollForCompletion(ctx, eventState, abortController, pollOptions) + abortController.abort() + await waitForEventProcessorShutdown(eventProcessor) + + const result: RunResult = { + sessionId: resolvedSessionId, + success: exitCode === 0, + durationMs: Date.now() - startTime, + messageCount: eventState.messageCount, + summary: eventState.lastPartText.slice(0, 200) || "Run completed", + } + + if (exitCode === 0) { + await emitCompletionEvent(eventObserver, result) + } + + return { + exitCode, + result, + sessionId: resolvedSessionId, + } + } catch (error) { + abortController.abort() + const serialized = serializeError(error) + await eventObserver?.onEvent?.({ + type: "session.error", + sessionId: sessionId ?? "", + error: serialized, + }) + await eventObserver?.onError?.({ + type: "session.error", + sessionId: sessionId ?? "", + error: serialized, + }) + throw error + } finally { + signal?.removeEventListener("abort", forwardAbort) + } +} diff --git a/src/cli/run/runner.ts b/src/cli/run/runner.ts index 84dd22a42..8bf3b6fc3 100644 --- a/src/cli/run/runner.ts +++ b/src/cli/run/runner.ts @@ -1,40 +1,23 @@ import pc from "picocolors" -import type { RunOptions, RunContext } from "./types" -import { createEventState, processEvents, serializeError } from "./events" -import { loadPluginConfig } from "../../plugin-config" -import { createServerConnection } from "./server-connection" -import { resolveSession } from "./session-resolver" +import type { RunOptions } from "./types" import { createJsonOutputManager } from "./json-output" import { executeOnCompleteHook } from "./on-complete-hook" -import { resolveRunAgent } from "./agent-resolver" -import { resolveRunModel } from "./model-resolver" -import { pollForCompletion } from "./poll-for-completion" -import { loadAgentProfileColors } from "./agent-profile-colors" -import { suppressRunInput } from "./stdin-suppression" +import { createServerConnection } from "./server-connection" +import { + executeRunSession, + waitForEventProcessorShutdown, +} from "./run-engine" import { createTimestampedStdoutController } from "./timestamp-output" +import { serializeError } from "./events" +import { suppressRunInput } from "./stdin-suppression" -export { resolveRunAgent } - -const EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS = 2_000 - -export async function waitForEventProcessorShutdown( - eventProcessor: Promise, - timeoutMs = EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS, -): Promise { - const completed = await Promise.race([ - eventProcessor.then(() => true), - new Promise((resolve) => setTimeout(() => resolve(false), timeoutMs)), - ]) - - void completed -} +export { resolveRunAgent } from "./agent-resolver" +export { waitForEventProcessorShutdown } export async function run(options: RunOptions): Promise { process.env.OPENCODE_CLI_RUN_MODE = "true" - const startTime = Date.now() const { - message, directory = process.cwd(), } = options @@ -45,25 +28,19 @@ export async function run(options: RunOptions): Promise { : createTimestampedStdoutController() timestampOutput?.enable() - const pluginConfig = loadPluginConfig(directory, { command: "run" }) - const resolvedAgent = resolveRunAgent(options, pluginConfig) - const resolvedModel = resolveRunModel(options.model) const abortController = new AbortController() try { - const { client, cleanup: serverCleanup } = await createServerConnection({ + const { client, cleanup } = await createServerConnection({ port: options.port, attach: options.attach, signal: abortController.signal, }) - const cleanup = () => { - serverCleanup() - } - const restoreInput = suppressRunInput() const handleSigint = () => { console.log(pc.yellow("\nInterrupted. Shutting down...")) + abortController.abort() restoreInput() cleanup() process.exit(130) @@ -72,81 +49,38 @@ export async function run(options: RunOptions): Promise { process.on("SIGINT", handleSigint) try { - const sessionID = await resolveSession({ + const { exitCode, result } = await executeRunSession({ client, + message: options.message, + directory, + agent: options.agent, + model: options.model, sessionId: options.sessionId, - directory, - }) - - console.log(pc.dim(`Session: ${sessionID}`)) - - if (resolvedModel) { - console.log(pc.dim(`Model: ${resolvedModel.providerID}/${resolvedModel.modelID}`)) - } - - const ctx: RunContext = { - client, - sessionID, - directory, - abortController, verbose: options.verbose ?? false, - } - const events = await client.event.subscribe({ query: { directory } }) - const eventState = createEventState() - eventState.agentColorsByName = await loadAgentProfileColors(client) - const eventProcessor = processEvents(ctx, events.stream, eventState).catch( - () => {}, - ) - - await client.session.promptAsync({ - path: { id: sessionID }, - body: { - agent: resolvedAgent, - ...(resolvedModel ? { model: resolvedModel } : {}), - tools: { - question: false, - }, - parts: [{ type: "text", text: message }], - }, - query: { directory }, + questionPermission: "deny", + questionToolEnabled: false, + renderOutput: true, }) - const exitCode = await pollForCompletion(ctx, eventState, abortController) - - // Abort the event stream to stop the processor - abortController.abort() - - await waitForEventProcessorShutdown(eventProcessor) - cleanup() - - const durationMs = Date.now() - startTime if (options.onComplete) { await executeOnCompleteHook({ command: options.onComplete, - sessionId: sessionID, + sessionId: result.sessionId, exitCode, - durationMs, - messageCount: eventState.messageCount, + durationMs: result.durationMs, + messageCount: result.messageCount, }) } if (jsonManager) { - jsonManager.emitResult({ - sessionId: sessionID, - success: exitCode === 0, - durationMs, - messageCount: eventState.messageCount, - summary: eventState.lastPartText.slice(0, 200) || "Run completed", - }) + jsonManager.emitResult(result) } return exitCode - } catch (err) { - cleanup() - throw err } finally { process.removeListener("SIGINT", handleSigint) restoreInput() + cleanup() } } catch (err) { if (jsonManager) jsonManager.restore() diff --git a/src/cli/run/server-connection.ts b/src/cli/run/server-connection.ts index bf658ff05..2c292ad68 100644 --- a/src/cli/run/server-connection.ts +++ b/src/cli/run/server-connection.ts @@ -1,6 +1,6 @@ import { createOpencode, createOpencodeClient } from "@opencode-ai/sdk" import pc from "picocolors" -import type { ServerConnection } from "./types" +import type { RunLogger, ServerConnection } from "./types" import { getAvailableServerPort, isPortAvailable, DEFAULT_SERVER_PORT } from "../../shared/port-utils" import { withWorkingOpencodePath } from "./opencode-binary-resolver" @@ -20,13 +20,18 @@ function isPortRangeExhausted(error: unknown): boolean { return error.message.includes("No available port found in range") } -async function startServer(options: { signal: AbortSignal, port: number }): Promise { - const { signal, port } = options +async function startServer(options: { + signal: AbortSignal + port: number + logger?: RunLogger +}): Promise { + const { signal, port, logger } = options + const log = logger?.log ?? console.log const { client, server } = await withWorkingOpencodePath(() => createOpencode({ signal, port, hostname: "127.0.0.1" }), ) - console.log(pc.dim("Server listening at"), pc.cyan(server.url)) + log(pc.dim("Server listening at"), pc.cyan(server.url)) return { client, cleanup: () => server.close() } } @@ -34,11 +39,13 @@ export async function createServerConnection(options: { port?: number attach?: string signal: AbortSignal + logger?: RunLogger }): Promise { - const { port, attach, signal } = options + const { port, attach, signal, logger } = options + const log = logger?.log ?? console.log if (attach !== undefined) { - console.log(pc.dim("Attaching to existing server at"), pc.cyan(attach)) + log(pc.dim("Attaching to existing server at"), pc.cyan(attach)) const client = createOpencodeClient({ baseUrl: attach }) return { client, cleanup: () => {} } } @@ -51,9 +58,9 @@ export async function createServerConnection(options: { const available = await isPortAvailable(port, "127.0.0.1") if (available) { - console.log(pc.dim("Starting server on port"), pc.cyan(port.toString())) + log(pc.dim("Starting server on port"), pc.cyan(port.toString())) try { - return await startServer({ signal, port }) + return await startServer({ signal, port, logger }) } catch (error) { if (!isPortStartFailure(error, port)) { throw error @@ -64,13 +71,13 @@ export async function createServerConnection(options: { throw error } - console.log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("became occupied, attaching to existing server")) + log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("became occupied, attaching to existing server")) const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` }) return { client, cleanup: () => {} } } } - console.log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("is occupied, attaching to existing server")) + log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("is occupied, attaching to existing server")) const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` }) return { client, cleanup: () => {} } } @@ -91,26 +98,26 @@ export async function createServerConnection(options: { throw error } - console.log(pc.dim("Port range exhausted, attaching to existing server on"), pc.cyan(DEFAULT_SERVER_PORT.toString())) + log(pc.dim("Port range exhausted, attaching to existing server on"), pc.cyan(DEFAULT_SERVER_PORT.toString())) const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${DEFAULT_SERVER_PORT}` }) return { client, cleanup: () => {} } } if (wasAutoSelected) { - console.log(pc.dim("Auto-selected port"), pc.cyan(selectedPort.toString())) + log(pc.dim("Auto-selected port"), pc.cyan(selectedPort.toString())) } else { - console.log(pc.dim("Starting server on port"), pc.cyan(selectedPort.toString())) + log(pc.dim("Starting server on port"), pc.cyan(selectedPort.toString())) } try { - return await startServer({ signal, port: selectedPort }) + return await startServer({ signal, port: selectedPort, logger }) } catch (error) { if (!isPortStartFailure(error, selectedPort)) { throw error } const { port: retryPort } = await getAvailableServerPort(selectedPort + 1, "127.0.0.1") - console.log(pc.dim("Retrying server start on port"), pc.cyan(retryPort.toString())) - return await startServer({ signal, port: retryPort }) + log(pc.dim("Retrying server start on port"), pc.cyan(retryPort.toString())) + return await startServer({ signal, port: retryPort, logger }) } } diff --git a/src/cli/run/session-resolver.ts b/src/cli/run/session-resolver.ts index c5d9cb5e4..a01c71caa 100644 --- a/src/cli/run/session-resolver.ts +++ b/src/cli/run/session-resolver.ts @@ -1,5 +1,5 @@ import pc from "picocolors" -import type { OpencodeClient } from "./types" +import type { OpencodeClient, RunLogger } from "./types" import { serializeError } from "./events" const SESSION_CREATE_MAX_RETRIES = 3 @@ -9,8 +9,18 @@ export async function resolveSession(options: { client: OpencodeClient sessionId?: string directory: string + questionPermission?: "allow" | "deny" + logger?: RunLogger }): Promise { - const { client, sessionId, directory } = options + const { + client, + sessionId, + directory, + questionPermission = "deny", + logger, + } = options + const log = logger?.log ?? console.log + const error = logger?.error ?? console.error if (sessionId) { const res = await client.session.get({ @@ -27,23 +37,22 @@ export async function resolveSession(options: { const res = await client.session.create({ body: { title: "oh-my-opencode run", - // In CLI run mode there's no TUI to answer questions. permission: [ - { permission: "question", action: "deny" as const, pattern: "*" }, + { permission: "question", action: questionPermission, pattern: "*" }, ], } as Record, query: { directory }, }) if (res.error) { - console.error( + error( pc.yellow(`Session create attempt ${attempt}/${SESSION_CREATE_MAX_RETRIES} failed:`) ) - console.error(pc.dim(` Error: ${serializeError(res.error)}`)) + error(pc.dim(` Error: ${serializeError(res.error)}`)) if (attempt < SESSION_CREATE_MAX_RETRIES) { const delay = SESSION_CREATE_RETRY_DELAY_MS * attempt - console.log(pc.dim(` Retrying in ${delay}ms...`)) + log(pc.dim(` Retrying in ${delay}ms...`)) await new Promise((resolve) => setTimeout(resolve, delay)) } continue @@ -53,7 +62,7 @@ export async function resolveSession(options: { return res.data.id } - console.error( + error( pc.yellow( `Session create attempt ${attempt}/${SESSION_CREATE_MAX_RETRIES}: No session ID returned` ) @@ -61,7 +70,7 @@ export async function resolveSession(options: { if (attempt < SESSION_CREATE_MAX_RETRIES) { const delay = SESSION_CREATE_RETRY_DELAY_MS * attempt - console.log(pc.dim(` Retrying in ${delay}ms...`)) + log(pc.dim(` Retrying in ${delay}ms...`)) await new Promise((resolve) => setTimeout(resolve, delay)) } } diff --git a/src/cli/run/types.ts b/src/cli/run/types.ts index 30bacaee7..345a49947 100644 --- a/src/cli/run/types.ts +++ b/src/cli/run/types.ts @@ -15,6 +15,11 @@ export interface RunOptions { sessionId?: string } +export interface RunLogger { + log?: (...args: unknown[]) => void + error?: (...args: unknown[]) => void +} + export interface ServerConnection { client: OpencodeClient cleanup: () => void @@ -34,6 +39,99 @@ export interface RunContext { directory: string abortController: AbortController verbose?: boolean + renderOutput?: boolean + logger?: RunLogger +} + +export interface SessionStartedEvent { + type: "session.started" + sessionId: string + agent: string + resumed: boolean + model?: { providerID: string; modelID: string } +} + +export interface MessageDeltaEvent { + type: "message.delta" + sessionId: string + messageId?: string + partId?: string + delta: string +} + +export interface MessageCompletedEvent { + type: "message.completed" + sessionId: string + messageId?: string + partId?: string + text: string +} + +export interface ToolStartedEvent { + type: "tool.started" + sessionId: string + toolName: string + input?: unknown +} + +export interface ToolCompletedEvent { + type: "tool.completed" + sessionId: string + toolName: string + output?: string + status: "completed" | "error" +} + +export interface SessionIdleEvent { + type: "session.idle" + sessionId: string +} + +export interface SessionQuestionEvent { + type: "session.question" + sessionId: string + toolName: string + input?: unknown + question?: string +} + +export interface SessionCompletedEvent { + type: "session.completed" + sessionId: string + result: RunResult +} + +export interface SessionErrorEvent { + type: "session.error" + sessionId: string + error: string +} + +export interface RawStreamEvent { + type: "raw" + sessionId: string + payload: EventPayload +} + +export type StreamEvent = + | SessionStartedEvent + | MessageDeltaEvent + | MessageCompletedEvent + | ToolStartedEvent + | ToolCompletedEvent + | SessionIdleEvent + | SessionQuestionEvent + | SessionCompletedEvent + | SessionErrorEvent + | RawStreamEvent + +export interface RunEventObserver { + includeRawEvents?: boolean + onEvent?: (event: StreamEvent) => void | Promise + onIdle?: (event: SessionIdleEvent) => void | Promise + onQuestion?: (event: SessionQuestionEvent) => void | Promise + onComplete?: (event: SessionCompletedEvent) => void | Promise + onError?: (event: SessionErrorEvent) => void | Promise } export interface Todo {