Compare commits

...

4 Commits

Author SHA1 Message Date
YeonGyu-Kim
ed92a05e59 fix: address cubic review issues - abort handling, error metadata, logger binding 2026-03-16 15:18:41 +09:00
YeonGyu-Kim
40f25fb07d task: review and document agent selection 2026-03-16 15:06:33 +09:00
YeonGyu-Kim
c073169949 task: add agent resolver regression tests 2026-03-16 15:06:33 +09:00
YeonGyu-Kim
96f4b3b56c task: implement sdk runner 2026-03-16 15:06:33 +09:00
25 changed files with 1500 additions and 142 deletions

View File

@@ -22,6 +22,7 @@
},
"scripts": {
"build": "bun build src/index.ts --outdir dist --target bun --format esm --external @ast-grep/napi && tsc --emitDeclarationOnly && bun build src/cli/index.ts --outdir dist/cli --target bun --format esm --external @ast-grep/napi && bun run build:schema",
"build:sdk": "cd packages/sdk && bun run build",
"build:all": "bun run build && bun run build:binaries",
"build:binaries": "bun run script/build-binaries.ts",
"build:schema": "bun run script/build-schema.ts",
@@ -30,7 +31,9 @@
"postinstall": "node postinstall.mjs",
"prepublishOnly": "bun run clean && bun run build",
"typecheck": "tsc --noEmit",
"test": "bun test"
"typecheck:sdk": "cd packages/sdk && bun run typecheck",
"test": "bun test",
"test:sdk": "cd packages/sdk && bun run test"
},
"keywords": [
"opencode",

27
packages/sdk/README.md Normal file
View File

@@ -0,0 +1,27 @@
# @oh-my-openagent/sdk
Programmatic runner for starting or attaching to an OpenCode server, running oh-my-openagent sessions, and consuming normalized lifecycle events.
## `run()`
```ts
import { createOmoRunner } from "@oh-my-openagent/sdk"
const runner = createOmoRunner({ directory: process.cwd(), agent: "prometheus" })
const result = await runner.run("Plan the next release")
await runner.close()
```
## `stream()`
```ts
import { createOmoRunner } from "@oh-my-openagent/sdk"
const runner = createOmoRunner({ directory: process.cwd() })
for await (const event of runner.stream("Investigate the build failure")) {
console.log(event.type)
}
await runner.close()
```

23
packages/sdk/package.json Normal file
View File

@@ -0,0 +1,23 @@
{
"name": "@oh-my-openagent/sdk",
"version": "0.0.0",
"private": true,
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"files": [
"dist",
"README.md"
],
"scripts": {
"build": "rm -rf dist && bun build ./src/index.ts --outdir dist --target bun --format esm && cp ./src/*.d.ts ./dist/",
"test": "bun test",
"typecheck": "tsc -p tsconfig.json --noEmit"
}
}

View File

@@ -0,0 +1,3 @@
import type { CreateOmoRunnerOptions, OmoRunner } from "./types"
export declare function createOmoRunner(options: CreateOmoRunnerOptions): OmoRunner

View File

@@ -0,0 +1,106 @@
import { beforeEach, describe, expect, it, mock } from "bun:test"
import type { ServerConnection } from "../../../src/cli/run/types"
const mockCreateServerConnection = mock(
async (): Promise<ServerConnection> => ({
client: {} as never,
cleanup: mock(() => {}),
}),
)
const mockExecuteRunSession = mock(async (_options: unknown) => ({
exitCode: 0,
sessionId: "ses_runner",
result: {
sessionId: "ses_runner",
success: true,
durationMs: 10,
messageCount: 1,
summary: "done",
},
}))
mock.module("../../../src/cli/run/server-connection", () => ({
createServerConnection: mockCreateServerConnection,
}))
mock.module("../../../src/cli/run/run-engine", () => ({
executeRunSession: mockExecuteRunSession,
}))
const { createOmoRunner } = await import("./create-omo-runner")
describe("createOmoRunner", () => {
beforeEach(() => {
mockCreateServerConnection.mockClear()
mockExecuteRunSession.mockClear()
})
it("reuses the same connection and enables question-aware execution", async () => {
const runner = createOmoRunner({
directory: "/repo",
agent: "atlas",
})
const first = await runner.run("first")
const second = await runner.run("second", { agent: "prometheus" })
expect(first.summary).toBe("done")
expect(second.summary).toBe("done")
expect(mockCreateServerConnection).toHaveBeenCalledTimes(1)
expect(mockExecuteRunSession).toHaveBeenNthCalledWith(1, expect.objectContaining({
directory: "/repo",
agent: "atlas",
questionPermission: "allow",
questionToolEnabled: true,
renderOutput: false,
}))
expect(mockExecuteRunSession).toHaveBeenNthCalledWith(2, expect.objectContaining({
agent: "prometheus",
}))
await runner.close()
})
it("streams normalized events", async () => {
mockExecuteRunSession.mockImplementationOnce(async (options: { eventObserver?: { onEvent?: (event: unknown) => Promise<void> } }) => {
await options.eventObserver?.onEvent?.({
type: "session.started",
sessionId: "ses_runner",
agent: "Atlas (Plan Executor)",
resumed: false,
})
await options.eventObserver?.onEvent?.({
type: "session.completed",
sessionId: "ses_runner",
result: {
sessionId: "ses_runner",
success: true,
durationMs: 10,
messageCount: 1,
summary: "done",
},
})
return {
exitCode: 0,
sessionId: "ses_runner",
result: {
sessionId: "ses_runner",
success: true,
durationMs: 10,
messageCount: 1,
summary: "done",
},
}
})
const runner = createOmoRunner({ directory: "/repo" })
const seenTypes: string[] = []
for await (const event of runner.stream("stream")) {
seenTypes.push(event.type)
}
expect(seenTypes).toEqual(["session.started", "session.completed"])
await runner.close()
})
})

View File

@@ -0,0 +1,189 @@
import { createServerConnection } from "../../../src/cli/run/server-connection"
import { executeRunSession } from "../../../src/cli/run/run-engine"
import type { RunEventObserver, ServerConnection } from "../../../src/cli/run/types"
import type {
CreateOmoRunnerOptions,
OmoRunInvocationOptions,
OmoRunner,
RunResult,
StreamEvent,
} from "./types"
class AsyncEventQueue<T> implements AsyncIterable<T> {
private readonly values: T[] = []
private readonly waiters: Array<(value: IteratorResult<T>) => void> = []
private closed = false
push(value: T): void {
if (this.closed) return
const waiter = this.waiters.shift()
if (waiter) {
waiter({ value, done: false })
return
}
this.values.push(value)
}
close(): void {
if (this.closed) return
this.closed = true
while (this.waiters.length > 0) {
const waiter = this.waiters.shift()
waiter?.({ value: undefined, done: true })
}
}
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
next: () => {
const value = this.values.shift()
if (value !== undefined) {
return Promise.resolve({ value, done: false })
}
if (this.closed) {
return Promise.resolve({ value: undefined, done: true })
}
return new Promise<IteratorResult<T>>((resolve) => {
this.waiters.push(resolve)
})
},
}
}
}
export function createOmoRunner(options: CreateOmoRunnerOptions): OmoRunner {
const {
directory,
agent,
port,
model,
attach,
includeRawEvents = false,
onIdle,
onQuestion,
onComplete,
onError,
} = options
let connectionPromise: Promise<ServerConnection> | null = null
let connectionController: AbortController | null = null
let closed = false
let activeRun: Promise<unknown> | null = null
const silentLogger = {
log: () => {},
error: () => {},
}
const ensureConnection = async (): Promise<ServerConnection> => {
if (closed) {
throw new Error("Runner is closed")
}
if (connectionPromise === null) {
connectionController = new AbortController()
connectionPromise = createServerConnection({
port,
attach,
signal: connectionController.signal,
logger: silentLogger,
})
}
return await connectionPromise
}
const createObserver = (
queue?: AsyncEventQueue<StreamEvent>,
): RunEventObserver => ({
includeRawEvents,
onEvent: async (event) => {
queue?.push(event as StreamEvent)
},
onIdle,
onQuestion,
onComplete,
onError,
})
const runOnce = async (
prompt: string,
invocationOptions: OmoRunInvocationOptions | undefined,
observer: RunEventObserver,
): Promise<RunResult> => {
if (activeRun !== null) {
throw new Error("Runner already has an active operation")
}
const connection = await ensureConnection()
const execution = executeRunSession({
client: connection.client,
message: prompt,
directory,
agent: invocationOptions?.agent ?? agent,
model: invocationOptions?.model ?? model,
sessionId: invocationOptions?.sessionId,
questionPermission: "allow",
questionToolEnabled: true,
renderOutput: false,
logger: silentLogger,
eventObserver: observer,
signal: invocationOptions?.signal,
})
activeRun = execution
const abortHandler = () => {
void observer.onError?.({
type: "session.error",
sessionId: invocationOptions?.sessionId ?? "",
error: "Aborted by caller",
})
}
invocationOptions?.signal?.addEventListener("abort", abortHandler, { once: true })
try {
const { result } = await execution
return result
} finally {
invocationOptions?.signal?.removeEventListener("abort", abortHandler)
activeRun = null
}
}
return {
async run(prompt, invocationOptions) {
return await runOnce(prompt, invocationOptions, createObserver())
},
stream(prompt, invocationOptions) {
const queue = new AsyncEventQueue<StreamEvent>()
const execution = runOnce(prompt, invocationOptions, createObserver(queue))
.catch((error) => {
queue.push({
type: "session.error",
sessionId: invocationOptions?.sessionId ?? "",
error: error instanceof Error ? error.message : String(error),
})
})
.finally(() => {
queue.close()
})
return {
async *[Symbol.asyncIterator]() {
try {
for await (const event of queue) {
yield event
}
} finally {
await execution
}
},
}
},
async close() {
closed = true
connectionController?.abort()
const connection = await connectionPromise
connection?.cleanup()
connectionPromise = null
connectionController = null
},
}
}

8
packages/sdk/src/index.d.ts vendored Normal file
View File

@@ -0,0 +1,8 @@
export { createOmoRunner } from "./create-omo-runner"
export type {
CreateOmoRunnerOptions,
OmoRunInvocationOptions,
OmoRunner,
RunResult,
StreamEvent,
} from "./types"

View File

@@ -0,0 +1,8 @@
export { createOmoRunner } from "./create-omo-runner"
export type {
CreateOmoRunnerOptions,
OmoRunInvocationOptions,
OmoRunner,
RunResult,
StreamEvent,
} from "./types"

95
packages/sdk/src/types.d.ts vendored Normal file
View File

@@ -0,0 +1,95 @@
export interface RunResult {
sessionId: string
success: boolean
durationMs: number
messageCount: number
summary: string
}
export type StreamEvent =
| {
type: "session.started"
sessionId: string
agent: string
resumed: boolean
model?: { providerID: string; modelID: string }
}
| {
type: "message.delta"
sessionId: string
messageId?: string
partId?: string
delta: string
}
| {
type: "message.completed"
sessionId: string
messageId?: string
partId?: string
text: string
}
| {
type: "tool.started"
sessionId: string
toolName: string
input?: unknown
}
| {
type: "tool.completed"
sessionId: string
toolName: string
output?: string
status: "completed" | "error"
}
| {
type: "session.idle"
sessionId: string
}
| {
type: "session.question"
sessionId: string
toolName: string
input?: unknown
question?: string
}
| {
type: "session.completed"
sessionId: string
result: RunResult
}
| {
type: "session.error"
sessionId: string
error: string
}
| {
type: "raw"
sessionId: string
payload: unknown
}
export interface OmoRunInvocationOptions {
sessionId?: string
signal?: AbortSignal
agent?: string
model?: string
}
export interface CreateOmoRunnerOptions {
directory: string
agent?: string
port?: number
model?: string
attach?: string
includeRawEvents?: boolean
onIdle?: (event: Extract<StreamEvent, { type: "session.idle" }>) => void | Promise<void>
onQuestion?: (event: Extract<StreamEvent, { type: "session.question" }>) => void | Promise<void>
onComplete?: (event: Extract<StreamEvent, { type: "session.completed" }>) => void | Promise<void>
onError?: (event: Extract<StreamEvent, { type: "session.error" }>) => void | Promise<void>
}
export interface OmoRunner {
run(prompt: string, options?: OmoRunInvocationOptions): Promise<RunResult>
stream(prompt: string, options?: OmoRunInvocationOptions): AsyncIterable<StreamEvent>
close(): Promise<void>
}

95
packages/sdk/src/types.ts Normal file
View File

@@ -0,0 +1,95 @@
export interface RunResult {
sessionId: string
success: boolean
durationMs: number
messageCount: number
summary: string
}
export type StreamEvent =
| {
type: "session.started"
sessionId: string
agent: string
resumed: boolean
model?: { providerID: string; modelID: string }
}
| {
type: "message.delta"
sessionId: string
messageId?: string
partId?: string
delta: string
}
| {
type: "message.completed"
sessionId: string
messageId?: string
partId?: string
text: string
}
| {
type: "tool.started"
sessionId: string
toolName: string
input?: unknown
}
| {
type: "tool.completed"
sessionId: string
toolName: string
output?: string
status: "completed" | "error"
}
| {
type: "session.idle"
sessionId: string
}
| {
type: "session.question"
sessionId: string
toolName: string
input?: unknown
question?: string
}
| {
type: "session.completed"
sessionId: string
result: RunResult
}
| {
type: "session.error"
sessionId: string
error: string
}
| {
type: "raw"
sessionId: string
payload: unknown
}
export interface OmoRunInvocationOptions {
sessionId?: string
signal?: AbortSignal
agent?: string
model?: string
}
export interface CreateOmoRunnerOptions {
directory: string
agent?: string
port?: number
model?: string
attach?: string
includeRawEvents?: boolean
onIdle?: (event: Extract<StreamEvent, { type: "session.idle" }>) => void | Promise<void>
onQuestion?: (event: Extract<StreamEvent, { type: "session.question" }>) => void | Promise<void>
onComplete?: (event: Extract<StreamEvent, { type: "session.completed" }>) => void | Promise<void>
onError?: (event: Extract<StreamEvent, { type: "session.error" }>) => void | Promise<void>
}
export interface OmoRunner {
run(prompt: string, options?: OmoRunInvocationOptions): Promise<RunResult>
stream(prompt: string, options?: OmoRunInvocationOptions): AsyncIterable<StreamEvent>
close(): Promise<void>
}

View File

@@ -0,0 +1,24 @@
{
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"moduleResolution": "bundler",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"lib": ["ESNext"],
"types": ["bun-types"],
"rootDir": "../.."
},
"include": [
"src/**/*.ts",
"../../src/**/*.ts"
],
"exclude": [
"dist",
"**/*.test.ts",
"../../src/**/*.test.ts"
]
}

View File

@@ -94,9 +94,10 @@ Examples:
Agent resolution order:
1) --agent flag
2) OPENCODE_DEFAULT_AGENT
3) oh-my-opencode.json "default_run_agent"
4) Sisyphus (fallback)
2) OPENCODE_AGENT
3) OPENCODE_DEFAULT_AGENT
4) oh-my-opencode.json "default_run_agent"
5) Sisyphus (fallback)
Available core agents:
Sisyphus, Hephaestus, Prometheus, Atlas

View File

@@ -0,0 +1,88 @@
/// <reference types="bun-types" />
import { afterEach, describe, expect, it, mock, spyOn } from "bun:test"
import type { OhMyOpenCodeConfig } from "../../config"
import { resolveRunAgent } from "./agent-resolver"
const createConfig = (overrides: Partial<OhMyOpenCodeConfig> = {}): OhMyOpenCodeConfig => ({
...overrides,
})
describe("resolveRunAgent", () => {
afterEach(() => {
mock.restore()
})
it("preserves unknown explicit agents while honoring priority over env and config", () => {
//#given
const config = createConfig({ default_run_agent: "prometheus" })
const env = { OPENCODE_DEFAULT_AGENT: "Atlas" }
//#when
const agent = resolveRunAgent({ message: "test", agent: " custom-agent " }, config, env)
//#then
expect(agent).toBe("custom-agent")
})
it("falls back when an env-selected display-name agent is disabled", () => {
//#given
const config = createConfig({ disabled_agents: ["Atlas (Plan Executor)"] })
const env = { OPENCODE_DEFAULT_AGENT: "Atlas (Plan Executor)" }
const logSpy = spyOn(console, "log").mockImplementation(mock(() => undefined))
//#when
const agent = resolveRunAgent({ message: "test" }, config, env)
//#then
expect(agent).toBe("Sisyphus (Ultraworker)")
expect(logSpy).toHaveBeenCalledTimes(1)
expect(String(logSpy.mock.calls[0]?.[0] ?? "")).toContain("disabled")
expect(String(logSpy.mock.calls[0]?.[0] ?? "")).toContain("Sisyphus")
})
it("treats sisyphus_agent.disabled as disabling the config default agent", () => {
//#given
const config = createConfig({
default_run_agent: "sisyphus",
sisyphus_agent: { disabled: true },
})
const logSpy = spyOn(console, "log").mockImplementation(mock(() => undefined))
//#when
const agent = resolveRunAgent({ message: "test" }, config, {})
//#then
expect(agent).toBe("Hephaestus (Deep Agent)")
expect(logSpy).toHaveBeenCalledTimes(1)
expect(String(logSpy.mock.calls[0]?.[0] ?? "")).toContain("disabled")
expect(String(logSpy.mock.calls[0]?.[0] ?? "")).toContain("Hephaestus")
})
it("falls back to the default core agent when a requested core agent is disabled", () => {
//#given
const config = createConfig({ disabled_agents: ["Hephaestus"] })
//#when
const agent = resolveRunAgent({ message: "test", agent: "Hephaestus" }, config, {})
//#then
expect(agent).toBe("Sisyphus (Ultraworker)")
})
it("still returns sisyphus when every core agent is disabled", () => {
//#given
const config = createConfig({
disabled_agents: ["sisyphus", "hephaestus", "prometheus", "atlas"],
})
const logSpy = spyOn(console, "log").mockImplementation(mock(() => undefined))
//#when
const agent = resolveRunAgent({ message: "test", agent: "Atlas" }, config, {})
//#then
expect(agent).toBe("Sisyphus (Ultraworker)")
expect(logSpy).toHaveBeenCalledTimes(1)
expect(String(logSpy.mock.calls[0]?.[0] ?? "")).toContain("no enabled core agent was found")
})
})

View File

@@ -5,6 +5,7 @@ import { getAgentConfigKey, getAgentDisplayName } from "../../shared/agent-displ
const CORE_AGENT_ORDER = ["sisyphus", "hephaestus", "prometheus", "atlas"] as const
const DEFAULT_AGENT = "sisyphus"
const ENV_AGENT_KEYS = ["OPENCODE_AGENT", "OPENCODE_DEFAULT_AGENT"] as const
type EnvVars = Record<string, string | undefined>
type CoreAgentKey = (typeof CORE_AGENT_ORDER)[number]
@@ -54,7 +55,9 @@ export const resolveRunAgent = (
env: EnvVars = process.env
): string => {
const cliAgent = normalizeAgentName(options.agent)
const envAgent = normalizeAgentName(env.OPENCODE_DEFAULT_AGENT)
const envAgent = ENV_AGENT_KEYS
.map((key) => normalizeAgentName(env[key]))
.find((agent) => agent !== undefined)
const configAgent = normalizeAgentName(pluginConfig.default_run_agent)
const resolved =
cliAgent ??

View File

@@ -51,6 +51,10 @@ function getDeltaMessageId(props?: {
return props?.messageID
}
function shouldRender(ctx: RunContext): boolean {
return ctx.renderOutput !== false
}
function renderCompletionMetaLine(state: EventState, messageID: string): void {
if (state.completionMetaPrintedByMessageId[messageID]) return
@@ -95,9 +99,11 @@ export function handleSessionError(ctx: RunContext, payload: EventPayload, state
if (getSessionId(props) === ctx.sessionID) {
state.mainSessionError = true
state.lastError = serializeError(props?.error)
if (shouldRender(ctx)) {
console.error(pc.red(`\n[session.error] ${state.lastError}`))
}
}
}
export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload, state: EventState): void {
if (payload.type !== "message.part.updated") return
@@ -122,6 +128,11 @@ export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload,
}
if (part.type === "reasoning") {
if (!shouldRender(ctx)) {
state.lastReasoningText = part.text ?? ""
state.hasReceivedMeaningfulWork = true
return
}
ensureThinkBlockOpen(state)
const reasoningText = part.text ?? ""
const newText = reasoningText.slice(state.lastReasoningText.length)
@@ -139,15 +150,17 @@ export function handleMessagePartUpdated(ctx: RunContext, payload: EventPayload,
if (part.type === "text" && part.text) {
const newText = part.text.slice(state.lastPartText.length)
if (newText) {
if (newText && shouldRender(ctx)) {
const padded = writePaddedText(newText, state.textAtLineStart)
process.stdout.write(padded.output)
state.textAtLineStart = padded.atLineStart
}
if (newText) {
state.hasReceivedMeaningfulWork = true
}
state.lastPartText = part.text
if (part.time?.end) {
if (part.time?.end && shouldRender(ctx)) {
const messageID = part.messageID ?? state.currentMessageId
if (messageID) {
renderCompletionMetaLine(state, messageID)
@@ -180,6 +193,11 @@ export function handleMessagePartDelta(ctx: RunContext, payload: EventPayload, s
if (!delta) return
if (partType === "reasoning") {
if (!shouldRender(ctx)) {
state.lastReasoningText += delta
state.hasReceivedMeaningfulWork = true
return
}
ensureThinkBlockOpen(state)
const padded = writePaddedText(delta, state.thinkingAtLineStart)
process.stdout.write(pc.dim(padded.output))
@@ -191,9 +209,11 @@ export function handleMessagePartDelta(ctx: RunContext, payload: EventPayload, s
closeThinkBlockIfNeeded(state)
if (shouldRender(ctx)) {
const padded = writePaddedText(delta, state.textAtLineStart)
process.stdout.write(padded.output)
state.textAtLineStart = padded.atLineStart
}
state.lastPartText += delta
state.hasReceivedMeaningfulWork = true
}
@@ -209,16 +229,18 @@ function handleToolPart(
if (status === "running") {
if (state.currentTool !== null) return
state.currentTool = toolName
state.hasReceivedMeaningfulWork = true
if (shouldRender(_ctx)) {
const header = formatToolHeader(toolName, part.state?.input ?? {})
const suffix = header.description ? ` ${pc.dim(header.description)}` : ""
state.hasReceivedMeaningfulWork = true
process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`)
}
}
if (status === "completed" || status === "error") {
if (state.currentTool === null) return
const output = part.state?.output || ""
if (output.trim()) {
if (output.trim() && shouldRender(_ctx)) {
process.stdout.write(pc.dim(` ${displayChars.treeEnd} output \n`))
const padded = writePaddedText(output, true)
process.stdout.write(pc.dim(padded.output + (padded.atLineStart ? "" : " ")))
@@ -271,9 +293,11 @@ export function handleMessageUpdated(ctx: RunContext, payload: EventPayload, sta
state.currentAgent = agent
state.currentModel = model
state.currentVariant = variant
if (shouldRender(ctx)) {
renderAgentHeader(agent, model, variant, state.agentColorsByName)
}
}
}
export function handleToolExecute(ctx: RunContext, payload: EventPayload, state: EventState): void {
if (payload.type !== "tool.execute") return
@@ -287,12 +311,13 @@ export function handleToolExecute(ctx: RunContext, payload: EventPayload, state:
const toolName = props?.name || "unknown"
state.currentTool = toolName
state.hasReceivedMeaningfulWork = true
if (shouldRender(ctx)) {
const header = formatToolHeader(toolName, props?.input ?? {})
const suffix = header.description ? ` ${pc.dim(header.description)}` : ""
state.hasReceivedMeaningfulWork = true
process.stdout.write(`\n ${pc.cyan(header.icon)} ${pc.bold(header.title)}${suffix} \n`)
}
}
export function handleToolResult(ctx: RunContext, payload: EventPayload, state: EventState): void {
if (payload.type !== "tool.result") return
@@ -305,7 +330,7 @@ export function handleToolResult(ctx: RunContext, payload: EventPayload, state:
if (state.currentTool === null) return
const output = props?.output || ""
if (output.trim()) {
if (output.trim() && shouldRender(ctx)) {
process.stdout.write(pc.dim(` ${displayChars.treeEnd} output \n`))
const padded = writePaddedText(output, true)
process.stdout.write(pc.dim(padded.output + (padded.atLineStart ? "" : " ")))

View File

@@ -1,5 +1,19 @@
import pc from "picocolors"
import type { RunContext, EventPayload } from "./types"
import type {
EventPayload,
MessagePartDeltaProps,
MessagePartUpdatedProps,
RunContext,
RunEventObserver,
SessionErrorEvent,
SessionIdleEvent,
SessionQuestionEvent,
StreamEvent,
ToolCompletedEvent,
ToolExecuteProps,
ToolResultProps,
ToolStartedEvent,
} from "./types"
import type { EventState } from "./event-state"
import { logEventVerbose } from "./event-formatting"
import {
@@ -14,10 +28,133 @@ import {
handleTuiToast,
} from "./event-handlers"
const QUESTION_TOOL_NAMES = new Set(["question", "ask_user_question", "askuserquestion"])
async function emitObservedEvent(
observer: RunEventObserver | undefined,
event: StreamEvent,
): Promise<void> {
if (!observer) return
await observer.onEvent?.(event)
if (event.type === "session.idle") {
await observer.onIdle?.(event as SessionIdleEvent)
}
if (event.type === "session.question") {
await observer.onQuestion?.(event as SessionQuestionEvent)
}
if (event.type === "session.error") {
await observer.onError?.(event as SessionErrorEvent)
}
}
function getEventSessionId(payload: EventPayload): string | undefined {
const props = payload.properties as Record<string, unknown> | undefined
if (!props) return undefined
if (typeof props.sessionID === "string") return props.sessionID
if (typeof props.sessionId === "string") return props.sessionId
const info = props.info as Record<string, unknown> | undefined
if (typeof info?.sessionID === "string") return info.sessionID
if (typeof info?.sessionId === "string") return info.sessionId
const part = props.part as Record<string, unknown> | undefined
if (typeof part?.sessionID === "string") return part.sessionID
if (typeof part?.sessionId === "string") return part.sessionId
return undefined
}
function getQuestionText(input: unknown): string | undefined {
const args = input as { questions?: Array<{ question?: unknown }> } | undefined
const question = args?.questions?.[0]?.question
return typeof question === "string" && question.length > 0 ? question : undefined
}
function getToolStartFromPayload(
payload: EventPayload,
sessionId: string,
fallbackToolName: string,
): ToolStartedEvent | SessionQuestionEvent | undefined {
if (payload.type === "tool.execute") {
const props = payload.properties as ToolExecuteProps | undefined
const toolName = props?.name ?? fallbackToolName
if (QUESTION_TOOL_NAMES.has(toolName.toLowerCase())) {
return {
type: "session.question",
sessionId,
toolName,
input: props?.input,
question: getQuestionText(props?.input),
}
}
return {
type: "tool.started",
sessionId,
toolName,
input: props?.input,
}
}
if (payload.type === "message.part.updated") {
const props = payload.properties as MessagePartUpdatedProps | undefined
const toolName = props?.part?.tool ?? props?.part?.name ?? fallbackToolName
if (!toolName) return undefined
const input = props?.part?.state?.input
if (QUESTION_TOOL_NAMES.has(toolName.toLowerCase())) {
return {
type: "session.question",
sessionId,
toolName,
input,
question: getQuestionText(input),
}
}
return {
type: "tool.started",
sessionId,
toolName,
input,
}
}
return undefined
}
function getToolCompletedFromPayload(
payload: EventPayload,
sessionId: string,
fallbackToolName: string,
): ToolCompletedEvent | undefined {
if (payload.type === "tool.result") {
const props = payload.properties as ToolResultProps | undefined
return {
type: "tool.completed",
sessionId,
toolName: props?.name ?? fallbackToolName,
output: props?.output,
status: "completed",
}
}
if (payload.type === "message.part.updated") {
const props = payload.properties as MessagePartUpdatedProps | undefined
const status = props?.part?.state?.status
if (status !== "completed" && status !== "error") return undefined
return {
type: "tool.completed",
sessionId,
toolName: props?.part?.tool ?? props?.part?.name ?? fallbackToolName,
output: props?.part?.state?.output,
status,
}
}
return undefined
}
export async function processEvents(
ctx: RunContext,
stream: AsyncIterable<unknown>,
state: EventState
state: EventState,
observer?: RunEventObserver,
): Promise<void> {
for await (const event of stream) {
if (ctx.abortController.signal.aborted) break
@@ -37,6 +174,18 @@ export async function processEvents(
// Update last event timestamp for watchdog detection
state.lastEventTimestamp = Date.now()
const previousIdle = state.mainSessionIdle
const previousError = state.mainSessionError
const previousTool = state.currentTool
const sessionId = getEventSessionId(payload) ?? ctx.sessionID
if (observer?.includeRawEvents) {
await emitObservedEvent(observer, {
type: "raw",
sessionId,
payload,
})
}
handleSessionError(ctx, payload, state)
handleSessionIdle(ctx, payload, state)
@@ -47,8 +196,74 @@ export async function processEvents(
handleToolExecute(ctx, payload, state)
handleToolResult(ctx, payload, state)
handleTuiToast(ctx, payload, state)
if (!previousIdle && state.mainSessionIdle) {
await emitObservedEvent(observer, {
type: "session.idle",
sessionId: ctx.sessionID,
})
}
if (!previousError && state.mainSessionError) {
await emitObservedEvent(observer, {
type: "session.error",
sessionId: ctx.sessionID,
error: state.lastError ?? "Unknown session error",
})
}
if (payload.type === "message.part.delta") {
const props = payload.properties as MessagePartDeltaProps | undefined
if (
sessionId === ctx.sessionID
&& props?.field === "text"
&& typeof props.delta === "string"
&& props.delta.length > 0
) {
await emitObservedEvent(observer, {
type: "message.delta",
sessionId: ctx.sessionID,
messageId: props.messageID,
partId: props.partID,
delta: props.delta,
})
}
}
if (payload.type === "message.part.updated") {
const props = payload.properties as MessagePartUpdatedProps | undefined
if (
sessionId === ctx.sessionID
&& props?.part?.type === "text"
&& typeof props.part.text === "string"
&& props.part.time?.end
) {
await emitObservedEvent(observer, {
type: "message.completed",
sessionId: ctx.sessionID,
messageId: props.part.messageID,
partId: props.part.id,
text: props.part.text,
})
}
}
if (previousTool === null && state.currentTool !== null && sessionId === ctx.sessionID) {
const toolEvent = getToolStartFromPayload(payload, ctx.sessionID, state.currentTool)
if (toolEvent) {
await emitObservedEvent(observer, toolEvent)
}
}
if (previousTool !== null && state.currentTool === null && sessionId === ctx.sessionID) {
const toolEvent = getToolCompletedFromPayload(payload, ctx.sessionID, previousTool)
if (toolEvent) {
await emitObservedEvent(observer, toolEvent)
}
}
} catch (err) {
console.error(pc.red(`[event error] ${err}`))
const error = ctx.logger?.error ?? console.error
error(pc.red(`[event error] ${err}`))
}
}
}

View File

@@ -3,8 +3,16 @@ export { resolveRunAgent } from "./agent-resolver"
export { resolveRunModel } from "./model-resolver"
export { createServerConnection } from "./server-connection"
export { resolveSession } from "./session-resolver"
export { executeRunSession, waitForEventProcessorShutdown } from "./run-engine"
export { createJsonOutputManager } from "./json-output"
export { executeOnCompleteHook } from "./on-complete-hook"
export { createEventState, processEvents, serializeError } from "./events"
export type { EventState } from "./events"
export type { RunOptions, RunContext, RunResult, ServerConnection } from "./types"
export type {
RunContext,
RunEventObserver,
RunOptions,
RunResult,
ServerConnection,
StreamEvent,
} from "./types"

View File

@@ -0,0 +1,142 @@
/// <reference types="bun-types" />
import { describe, expect, it, mock } from "bun:test"
import { executeRunSession } from "./run-engine"
import type { OpencodeClient, StreamEvent } from "./types"
function toAsyncIterable(values: unknown[]): AsyncIterable<unknown> {
return {
async *[Symbol.asyncIterator]() {
for (const value of values) {
yield value
}
},
}
}
describe("executeRunSession", () => {
it("allows SDK sessions to enable questions and emits normalized events", async () => {
const seenEvents: StreamEvent[] = []
const client = {
session: {
create: mock(() => Promise.resolve({ data: { id: "ses_sdk" } })),
promptAsync: mock(() => Promise.resolve({})),
status: mock(() => Promise.resolve({ data: { ses_sdk: { type: "idle" } } })),
todo: mock(() => Promise.resolve({ data: [] })),
children: mock(() => Promise.resolve({ data: [] })),
},
event: {
subscribe: mock(() => Promise.resolve({
stream: toAsyncIterable([
{
type: "message.updated",
properties: {
info: {
id: "msg_1",
sessionID: "ses_sdk",
role: "assistant",
agent: "Prometheus (Plan Builder)",
},
},
},
{
type: "tool.execute",
properties: {
sessionID: "ses_sdk",
name: "question",
input: {
questions: [{ question: "Which agent should run?" }],
},
},
},
{
type: "message.part.delta",
properties: {
sessionID: "ses_sdk",
messageID: "msg_1",
partID: "part_1",
field: "text",
delta: "hello",
},
},
{
type: "tool.result",
properties: {
sessionID: "ses_sdk",
name: "question",
output: "waiting",
},
},
{
type: "message.part.updated",
properties: {
part: {
id: "part_1",
sessionID: "ses_sdk",
messageID: "msg_1",
type: "text",
text: "hello",
time: { end: 1 },
},
},
},
{
type: "session.status",
properties: {
sessionID: "ses_sdk",
status: { type: "idle" },
},
},
]),
})),
},
} as unknown as OpencodeClient
const result = await executeRunSession({
client,
directory: "/repo",
message: "hello",
agent: "prometheus",
questionPermission: "allow",
questionToolEnabled: true,
renderOutput: false,
logger: { log: () => {}, error: () => {} },
pluginConfig: {},
pollOptions: {
pollIntervalMs: 1,
minStabilizationMs: 0,
},
eventObserver: {
onEvent: async (event) => {
seenEvents.push(event)
},
},
})
expect(result.exitCode).toBe(0)
expect(result.result.success).toBe(true)
expect(client.session.create).toHaveBeenCalledWith({
body: {
title: "oh-my-opencode run",
permission: [
{ permission: "question", action: "allow", pattern: "*" },
],
},
query: { directory: "/repo" },
})
expect(client.session.promptAsync).toHaveBeenCalledWith({
path: { id: "ses_sdk" },
body: {
agent: "Prometheus (Plan Builder)",
tools: { question: true },
parts: [{ type: "text", text: "hello" }],
},
query: { directory: "/repo" },
})
expect(seenEvents.map((event) => event.type)).toContain("session.started")
expect(seenEvents.map((event) => event.type)).toContain("session.question")
expect(seenEvents.map((event) => event.type)).toContain("message.delta")
expect(seenEvents.map((event) => event.type)).toContain("message.completed")
expect(seenEvents.map((event) => event.type)).toContain("session.completed")
})
})

204
src/cli/run/run-engine.ts Normal file
View File

@@ -0,0 +1,204 @@
import pc from "picocolors"
import type { OhMyOpenCodeConfig } from "../../config"
import { loadPluginConfig } from "../../plugin-config"
import { createEventState, processEvents, serializeError } from "./events"
import { loadAgentProfileColors } from "./agent-profile-colors"
import { pollForCompletion, type PollOptions } from "./poll-for-completion"
import { resolveRunAgent } from "./agent-resolver"
import { resolveRunModel } from "./model-resolver"
import { resolveSession } from "./session-resolver"
import type {
OpencodeClient,
RunContext,
RunEventObserver,
RunLogger,
RunResult,
SessionCompletedEvent,
} from "./types"
const EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS = 2_000
export interface ExecuteRunSessionOptions {
client: OpencodeClient
message: string
directory: string
agent?: string
model?: string
sessionId?: string
verbose?: boolean
questionPermission?: "allow" | "deny"
questionToolEnabled?: boolean
pluginConfig?: OhMyOpenCodeConfig
logger?: RunLogger
renderOutput?: boolean
eventObserver?: RunEventObserver
pollOptions?: PollOptions
signal?: AbortSignal
}
export interface ExecuteRunSessionResult {
exitCode: number
result: RunResult
sessionId: string
}
export async function waitForEventProcessorShutdown(
eventProcessor: Promise<void>,
timeoutMs = EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS,
): Promise<void> {
const completed = await Promise.race([
eventProcessor.then(() => true),
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), timeoutMs)),
])
void completed
}
async function emitCompletionEvent(
observer: RunEventObserver | undefined,
result: RunResult,
): Promise<void> {
if (!observer) return
const event: SessionCompletedEvent = {
type: "session.completed",
sessionId: result.sessionId,
result,
}
await observer.onEvent?.(event)
await observer.onComplete?.(event)
}
export async function executeRunSession(
options: ExecuteRunSessionOptions,
): Promise<ExecuteRunSessionResult> {
const {
client,
message,
directory,
agent,
model,
sessionId,
verbose = false,
questionPermission = "deny",
questionToolEnabled = false,
pluginConfig = loadPluginConfig(directory, { command: "run" }),
logger,
renderOutput = true,
eventObserver,
pollOptions,
signal,
} = options
const log = logger?.log ?? console.log
const resolvedAgent = resolveRunAgent({ message, agent }, pluginConfig)
const resolvedModel = resolveRunModel(model)
const abortController = new AbortController()
const startTime = Date.now()
let resolvedSessionId: string | undefined
// Check if signal was already aborted before setting up listener
if (signal?.aborted) {
abortController.abort()
}
const forwardAbort = () => abortController.abort()
signal?.addEventListener("abort", forwardAbort, { once: true })
try {
resolvedSessionId = await resolveSession({
client,
sessionId,
directory,
questionPermission,
logger,
})
if (renderOutput) {
log(pc.dim(`Session: ${resolvedSessionId}`))
if (resolvedModel) {
log(pc.dim(`Model: ${resolvedModel.providerID}/${resolvedModel.modelID}`))
}
}
await eventObserver?.onEvent?.({
type: "session.started",
sessionId: resolvedSessionId,
agent: resolvedAgent,
resumed: Boolean(sessionId),
...(resolvedModel ? { model: resolvedModel } : {}),
})
const ctx: RunContext = {
client,
sessionID: resolvedSessionId,
directory,
abortController,
verbose,
renderOutput,
logger,
}
const events = await client.event.subscribe({ query: { directory } })
const eventState = createEventState()
if (renderOutput) {
eventState.agentColorsByName = await loadAgentProfileColors(client)
}
const eventProcessor = processEvents(
ctx,
events.stream,
eventState,
eventObserver,
).catch(() => {})
await client.session.promptAsync({
path: { id: resolvedSessionId },
body: {
agent: resolvedAgent,
...(resolvedModel ? { model: resolvedModel } : {}),
tools: {
question: questionToolEnabled,
},
parts: [{ type: "text", text: message }],
},
query: { directory },
})
const exitCode = await pollForCompletion(ctx, eventState, abortController, pollOptions)
abortController.abort()
await waitForEventProcessorShutdown(eventProcessor)
const result: RunResult = {
sessionId: resolvedSessionId,
success: exitCode === 0,
durationMs: Date.now() - startTime,
messageCount: eventState.messageCount,
summary: eventState.lastPartText.slice(0, 200) || "Run completed",
}
if (exitCode === 0) {
await emitCompletionEvent(eventObserver, result)
}
return {
exitCode,
result,
sessionId: resolvedSessionId,
}
} catch (error) {
abortController.abort()
const serialized = serializeError(error)
await eventObserver?.onEvent?.({
type: "session.error",
sessionId: resolvedSessionId ?? sessionId ?? "",
error: serialized,
})
await eventObserver?.onError?.({
type: "session.error",
sessionId: resolvedSessionId ?? sessionId ?? "",
error: serialized,
})
throw error
} finally {
signal?.removeEventListener("abort", forwardAbort)
}
}

View File

@@ -37,6 +37,38 @@ describe("resolveRunAgent", () => {
expect(agent).toBe("Atlas (Plan Executor)")
})
it("prefers OPENCODE_AGENT over OPENCODE_DEFAULT_AGENT", () => {
// given
const config = createConfig({ default_run_agent: "prometheus" })
const env = {
OPENCODE_AGENT: "oracle",
OPENCODE_DEFAULT_AGENT: "Atlas",
}
// when
const agent = resolveRunAgent({ message: "test" }, config, env)
// then
expect(agent).toBe("oracle")
})
it("supports specialist agents from env and config inputs", () => {
// given
const env = { OPENCODE_AGENT: " explore " }
// when
const envAgent = resolveRunAgent({ message: "test" }, createConfig(), env)
const configAgent = resolveRunAgent(
{ message: "test" },
createConfig({ default_run_agent: "oracle" }),
{}
)
// then
expect(envAgent).toBe("explore")
expect(configAgent).toBe("oracle")
})
it("uses config agent over default", () => {
// given
const config = createConfig({ default_run_agent: "Prometheus" })
@@ -80,6 +112,17 @@ describe("resolveRunAgent", () => {
// then
expect(agent).toBe("Sisyphus (Ultraworker)")
})
it("falls back when requested specialist agent is disabled", () => {
// given
const config = createConfig({ disabled_agents: ["oracle"] })
// when
const agent = resolveRunAgent({ message: "test", agent: "oracle" }, config, {})
// then
expect(agent).toBe("Sisyphus (Ultraworker)")
})
})
describe("waitForEventProcessorShutdown", () => {

View File

@@ -1,40 +1,23 @@
import pc from "picocolors"
import type { RunOptions, RunContext } from "./types"
import { createEventState, processEvents, serializeError } from "./events"
import { loadPluginConfig } from "../../plugin-config"
import { createServerConnection } from "./server-connection"
import { resolveSession } from "./session-resolver"
import type { RunOptions } from "./types"
import { createJsonOutputManager } from "./json-output"
import { executeOnCompleteHook } from "./on-complete-hook"
import { resolveRunAgent } from "./agent-resolver"
import { resolveRunModel } from "./model-resolver"
import { pollForCompletion } from "./poll-for-completion"
import { loadAgentProfileColors } from "./agent-profile-colors"
import { suppressRunInput } from "./stdin-suppression"
import { createServerConnection } from "./server-connection"
import {
executeRunSession,
waitForEventProcessorShutdown,
} from "./run-engine"
import { createTimestampedStdoutController } from "./timestamp-output"
import { serializeError } from "./events"
import { suppressRunInput } from "./stdin-suppression"
export { resolveRunAgent }
const EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS = 2_000
export async function waitForEventProcessorShutdown(
eventProcessor: Promise<void>,
timeoutMs = EVENT_PROCESSOR_SHUTDOWN_TIMEOUT_MS,
): Promise<void> {
const completed = await Promise.race([
eventProcessor.then(() => true),
new Promise<boolean>((resolve) => setTimeout(() => resolve(false), timeoutMs)),
])
void completed
}
export { resolveRunAgent } from "./agent-resolver"
export { waitForEventProcessorShutdown }
export async function run(options: RunOptions): Promise<number> {
process.env.OPENCODE_CLI_RUN_MODE = "true"
const startTime = Date.now()
const {
message,
directory = process.cwd(),
} = options
@@ -45,25 +28,19 @@ export async function run(options: RunOptions): Promise<number> {
: createTimestampedStdoutController()
timestampOutput?.enable()
const pluginConfig = loadPluginConfig(directory, { command: "run" })
const resolvedAgent = resolveRunAgent(options, pluginConfig)
const resolvedModel = resolveRunModel(options.model)
const abortController = new AbortController()
try {
const { client, cleanup: serverCleanup } = await createServerConnection({
const { client, cleanup } = await createServerConnection({
port: options.port,
attach: options.attach,
signal: abortController.signal,
})
const cleanup = () => {
serverCleanup()
}
const restoreInput = suppressRunInput()
const handleSigint = () => {
console.log(pc.yellow("\nInterrupted. Shutting down..."))
abortController.abort()
restoreInput()
cleanup()
process.exit(130)
@@ -72,81 +49,38 @@ export async function run(options: RunOptions): Promise<number> {
process.on("SIGINT", handleSigint)
try {
const sessionID = await resolveSession({
const { exitCode, result } = await executeRunSession({
client,
message: options.message,
directory,
agent: options.agent,
model: options.model,
sessionId: options.sessionId,
directory,
})
console.log(pc.dim(`Session: ${sessionID}`))
if (resolvedModel) {
console.log(pc.dim(`Model: ${resolvedModel.providerID}/${resolvedModel.modelID}`))
}
const ctx: RunContext = {
client,
sessionID,
directory,
abortController,
verbose: options.verbose ?? false,
}
const events = await client.event.subscribe({ query: { directory } })
const eventState = createEventState()
eventState.agentColorsByName = await loadAgentProfileColors(client)
const eventProcessor = processEvents(ctx, events.stream, eventState).catch(
() => {},
)
await client.session.promptAsync({
path: { id: sessionID },
body: {
agent: resolvedAgent,
...(resolvedModel ? { model: resolvedModel } : {}),
tools: {
question: false,
},
parts: [{ type: "text", text: message }],
},
query: { directory },
questionPermission: "deny",
questionToolEnabled: false,
renderOutput: true,
})
const exitCode = await pollForCompletion(ctx, eventState, abortController)
// Abort the event stream to stop the processor
abortController.abort()
await waitForEventProcessorShutdown(eventProcessor)
cleanup()
const durationMs = Date.now() - startTime
if (options.onComplete) {
await executeOnCompleteHook({
command: options.onComplete,
sessionId: sessionID,
sessionId: result.sessionId,
exitCode,
durationMs,
messageCount: eventState.messageCount,
durationMs: result.durationMs,
messageCount: result.messageCount,
})
}
if (jsonManager) {
jsonManager.emitResult({
sessionId: sessionID,
success: exitCode === 0,
durationMs,
messageCount: eventState.messageCount,
summary: eventState.lastPartText.slice(0, 200) || "Run completed",
})
jsonManager.emitResult(result)
}
return exitCode
} catch (err) {
cleanup()
throw err
} finally {
process.removeListener("SIGINT", handleSigint)
restoreInput()
cleanup()
}
} catch (err) {
if (jsonManager) jsonManager.restore()

View File

@@ -1,6 +1,6 @@
import { createOpencode, createOpencodeClient } from "@opencode-ai/sdk"
import pc from "picocolors"
import type { ServerConnection } from "./types"
import type { RunLogger, ServerConnection } from "./types"
import { getAvailableServerPort, isPortAvailable, DEFAULT_SERVER_PORT } from "../../shared/port-utils"
import { withWorkingOpencodePath } from "./opencode-binary-resolver"
@@ -20,13 +20,18 @@ function isPortRangeExhausted(error: unknown): boolean {
return error.message.includes("No available port found in range")
}
async function startServer(options: { signal: AbortSignal, port: number }): Promise<ServerConnection> {
const { signal, port } = options
async function startServer(options: {
signal: AbortSignal
port: number
logger?: RunLogger
}): Promise<ServerConnection> {
const { signal, port, logger } = options
const log = logger?.log?.bind(logger) ?? console.log
const { client, server } = await withWorkingOpencodePath(() =>
createOpencode({ signal, port, hostname: "127.0.0.1" }),
)
console.log(pc.dim("Server listening at"), pc.cyan(server.url))
log(pc.dim("Server listening at"), pc.cyan(server.url))
return { client, cleanup: () => server.close() }
}
@@ -34,11 +39,13 @@ export async function createServerConnection(options: {
port?: number
attach?: string
signal: AbortSignal
logger?: RunLogger
}): Promise<ServerConnection> {
const { port, attach, signal } = options
const { port, attach, signal, logger } = options
const log = logger?.log ?? console.log
if (attach !== undefined) {
console.log(pc.dim("Attaching to existing server at"), pc.cyan(attach))
log(pc.dim("Attaching to existing server at"), pc.cyan(attach))
const client = createOpencodeClient({ baseUrl: attach })
return { client, cleanup: () => {} }
}
@@ -51,9 +58,9 @@ export async function createServerConnection(options: {
const available = await isPortAvailable(port, "127.0.0.1")
if (available) {
console.log(pc.dim("Starting server on port"), pc.cyan(port.toString()))
log(pc.dim("Starting server on port"), pc.cyan(port.toString()))
try {
return await startServer({ signal, port })
return await startServer({ signal, port, logger })
} catch (error) {
if (!isPortStartFailure(error, port)) {
throw error
@@ -64,13 +71,13 @@ export async function createServerConnection(options: {
throw error
}
console.log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("became occupied, attaching to existing server"))
log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("became occupied, attaching to existing server"))
const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` })
return { client, cleanup: () => {} }
}
}
console.log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("is occupied, attaching to existing server"))
log(pc.dim("Port"), pc.cyan(port.toString()), pc.dim("is occupied, attaching to existing server"))
const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${port}` })
return { client, cleanup: () => {} }
}
@@ -91,26 +98,26 @@ export async function createServerConnection(options: {
throw error
}
console.log(pc.dim("Port range exhausted, attaching to existing server on"), pc.cyan(DEFAULT_SERVER_PORT.toString()))
log(pc.dim("Port range exhausted, attaching to existing server on"), pc.cyan(DEFAULT_SERVER_PORT.toString()))
const client = createOpencodeClient({ baseUrl: `http://127.0.0.1:${DEFAULT_SERVER_PORT}` })
return { client, cleanup: () => {} }
}
if (wasAutoSelected) {
console.log(pc.dim("Auto-selected port"), pc.cyan(selectedPort.toString()))
log(pc.dim("Auto-selected port"), pc.cyan(selectedPort.toString()))
} else {
console.log(pc.dim("Starting server on port"), pc.cyan(selectedPort.toString()))
log(pc.dim("Starting server on port"), pc.cyan(selectedPort.toString()))
}
try {
return await startServer({ signal, port: selectedPort })
return await startServer({ signal, port: selectedPort, logger })
} catch (error) {
if (!isPortStartFailure(error, selectedPort)) {
throw error
}
const { port: retryPort } = await getAvailableServerPort(selectedPort + 1, "127.0.0.1")
console.log(pc.dim("Retrying server start on port"), pc.cyan(retryPort.toString()))
return await startServer({ signal, port: retryPort })
log(pc.dim("Retrying server start on port"), pc.cyan(retryPort.toString()))
return await startServer({ signal, port: retryPort, logger })
}
}

View File

@@ -1,5 +1,5 @@
import pc from "picocolors"
import type { OpencodeClient } from "./types"
import type { OpencodeClient, RunLogger } from "./types"
import { serializeError } from "./events"
const SESSION_CREATE_MAX_RETRIES = 3
@@ -9,8 +9,18 @@ export async function resolveSession(options: {
client: OpencodeClient
sessionId?: string
directory: string
questionPermission?: "allow" | "deny"
logger?: RunLogger
}): Promise<string> {
const { client, sessionId, directory } = options
const {
client,
sessionId,
directory,
questionPermission = "deny",
logger,
} = options
const log = logger?.log ?? console.log
const error = logger?.error ?? console.error
if (sessionId) {
const res = await client.session.get({
@@ -27,23 +37,22 @@ export async function resolveSession(options: {
const res = await client.session.create({
body: {
title: "oh-my-opencode run",
// In CLI run mode there's no TUI to answer questions.
permission: [
{ permission: "question", action: "deny" as const, pattern: "*" },
{ permission: "question", action: questionPermission, pattern: "*" },
],
} as Record<string, unknown>,
query: { directory },
})
if (res.error) {
console.error(
error(
pc.yellow(`Session create attempt ${attempt}/${SESSION_CREATE_MAX_RETRIES} failed:`)
)
console.error(pc.dim(` Error: ${serializeError(res.error)}`))
error(pc.dim(` Error: ${serializeError(res.error)}`))
if (attempt < SESSION_CREATE_MAX_RETRIES) {
const delay = SESSION_CREATE_RETRY_DELAY_MS * attempt
console.log(pc.dim(` Retrying in ${delay}ms...`))
log(pc.dim(` Retrying in ${delay}ms...`))
await new Promise((resolve) => setTimeout(resolve, delay))
}
continue
@@ -53,7 +62,7 @@ export async function resolveSession(options: {
return res.data.id
}
console.error(
error(
pc.yellow(
`Session create attempt ${attempt}/${SESSION_CREATE_MAX_RETRIES}: No session ID returned`
)
@@ -61,7 +70,7 @@ export async function resolveSession(options: {
if (attempt < SESSION_CREATE_MAX_RETRIES) {
const delay = SESSION_CREATE_RETRY_DELAY_MS * attempt
console.log(pc.dim(` Retrying in ${delay}ms...`))
log(pc.dim(` Retrying in ${delay}ms...`))
await new Promise((resolve) => setTimeout(resolve, delay))
}
}

View File

@@ -15,6 +15,11 @@ export interface RunOptions {
sessionId?: string
}
export interface RunLogger {
log?: (...args: unknown[]) => void
error?: (...args: unknown[]) => void
}
export interface ServerConnection {
client: OpencodeClient
cleanup: () => void
@@ -34,6 +39,99 @@ export interface RunContext {
directory: string
abortController: AbortController
verbose?: boolean
renderOutput?: boolean
logger?: RunLogger
}
export interface SessionStartedEvent {
type: "session.started"
sessionId: string
agent: string
resumed: boolean
model?: { providerID: string; modelID: string }
}
export interface MessageDeltaEvent {
type: "message.delta"
sessionId: string
messageId?: string
partId?: string
delta: string
}
export interface MessageCompletedEvent {
type: "message.completed"
sessionId: string
messageId?: string
partId?: string
text: string
}
export interface ToolStartedEvent {
type: "tool.started"
sessionId: string
toolName: string
input?: unknown
}
export interface ToolCompletedEvent {
type: "tool.completed"
sessionId: string
toolName: string
output?: string
status: "completed" | "error"
}
export interface SessionIdleEvent {
type: "session.idle"
sessionId: string
}
export interface SessionQuestionEvent {
type: "session.question"
sessionId: string
toolName: string
input?: unknown
question?: string
}
export interface SessionCompletedEvent {
type: "session.completed"
sessionId: string
result: RunResult
}
export interface SessionErrorEvent {
type: "session.error"
sessionId: string
error: string
}
export interface RawStreamEvent {
type: "raw"
sessionId: string
payload: EventPayload
}
export type StreamEvent =
| SessionStartedEvent
| MessageDeltaEvent
| MessageCompletedEvent
| ToolStartedEvent
| ToolCompletedEvent
| SessionIdleEvent
| SessionQuestionEvent
| SessionCompletedEvent
| SessionErrorEvent
| RawStreamEvent
export interface RunEventObserver {
includeRawEvents?: boolean
onEvent?: (event: StreamEvent) => void | Promise<void>
onIdle?: (event: SessionIdleEvent) => void | Promise<void>
onQuestion?: (event: SessionQuestionEvent) => void | Promise<void>
onComplete?: (event: SessionCompletedEvent) => void | Promise<void>
onError?: (event: SessionErrorEvent) => void | Promise<void>
}
export interface Todo {

View File

@@ -25,7 +25,7 @@ export const OhMyOpenCodeConfigSchema = z.object({
$schema: z.string().optional(),
/** Enable new task system (default: false) */
new_task_system_enabled: z.boolean().optional(),
/** Default agent name for `oh-my-opencode run` (env: OPENCODE_DEFAULT_AGENT) */
/** Default agent name for `oh-my-opencode run` (env fallback: OPENCODE_DEFAULT_AGENT, after OPENCODE_AGENT) */
default_run_agent: z.string().optional(),
disabled_mcps: z.array(AnyMcpNameSchema).optional(),
disabled_agents: z.array(z.string()).optional(),