fix(delegation): replace message-count-stability polling with native finish-based completion detection

Sync task completion was fragile — detecting premature stability during
brief idle periods between tool calls. Now mirrors opencode's native
SessionPrompt.loop() logic: checks assistant finish reason is terminal
(not tool-calls/unknown) and assistant.id > user.id.

Also switches sync prompt sender from blocking HTTP (promptSync) to
async fire-and-forget (promptAsync) to avoid JSON parse errors in ACP.
This commit is contained in:
YeonGyu-Kim
2026-02-09 15:37:19 +09:00
parent 20d009964d
commit 11f587194f
6 changed files with 395 additions and 54 deletions

View File

@@ -23,8 +23,10 @@ export interface ParentContext {
export interface SessionMessage { export interface SessionMessage {
info?: { info?: {
id?: string
role?: string role?: string
time?: { created?: number } time?: { created?: number }
finish?: string
agent?: string agent?: string
model?: { providerID: string; modelID: string; variant?: string } model?: { providerID: string; modelID: string; variant?: string }
modelID?: string modelID?: string

View File

@@ -5,9 +5,11 @@ import { storeToolMetadata } from "../../features/tool-metadata-store"
import { getTaskToastManager } from "../../features/task-toast-manager" import { getTaskToastManager } from "../../features/task-toast-manager"
import { getAgentToolRestrictions } from "../../shared/agent-tool-restrictions" import { getAgentToolRestrictions } from "../../shared/agent-tool-restrictions"
import { getMessageDir } from "../../shared/session-utils" import { getMessageDir } from "../../shared/session-utils"
import { promptSyncWithModelSuggestionRetry } from "../../shared/model-suggestion-retry" import { promptWithModelSuggestionRetry } from "../../shared/model-suggestion-retry"
import { findNearestMessageWithFields } from "../../features/hook-message-injector" import { findNearestMessageWithFields } from "../../features/hook-message-injector"
import { formatDuration } from "./time-formatter" import { formatDuration } from "./time-formatter"
import { pollSyncSession } from "./sync-session-poller"
import { fetchSyncResult } from "./sync-result-fetcher"
export async function executeSyncContinuation( export async function executeSyncContinuation(
args: DelegateTaskArgs, args: DelegateTaskArgs,
@@ -45,11 +47,11 @@ export async function executeSyncContinuation(
storeToolMetadata(ctx.sessionID, ctx.callID, syncContMeta) storeToolMetadata(ctx.sessionID, ctx.callID, syncContMeta)
} }
try { let resumeAgent: string | undefined
let resumeAgent: string | undefined let resumeModel: { providerID: string; modelID: string } | undefined
let resumeModel: { providerID: string; modelID: string } | undefined let resumeVariant: string | undefined
let resumeVariant: string | undefined
try {
try { try {
const messagesResp = await client.session.messages({ path: { id: args.session_id! } }) const messagesResp = await client.session.messages({ path: { id: args.session_id! } })
const messages = (messagesResp.data ?? []) as SessionMessage[] const messages = (messagesResp.data ?? []) as SessionMessage[]
@@ -74,7 +76,7 @@ export async function executeSyncContinuation(
const allowTask = isPlanFamily(resumeAgent) const allowTask = isPlanFamily(resumeAgent)
await promptSyncWithModelSuggestionRetry(client, { await promptWithModelSuggestionRetry(client, {
path: { id: args.session_id! }, path: { id: args.session_id! },
body: { body: {
...(resumeAgent !== undefined ? { agent: resumeAgent } : {}), ...(resumeAgent !== undefined ? { agent: resumeAgent } : {}),
@@ -97,40 +99,35 @@ export async function executeSyncContinuation(
return `Failed to send continuation prompt: ${errorMessage}\n\nSession ID: ${args.session_id}` return `Failed to send continuation prompt: ${errorMessage}\n\nSession ID: ${args.session_id}`
} }
const messagesResult = await client.session.messages({ const pollError = await pollSyncSession(ctx, client, {
path: { id: args.session_id! }, sessionID: args.session_id!,
agentToUse: resumeAgent ?? "continue",
toastManager,
taskId,
}) })
if (pollError) {
return pollError
}
if (messagesResult.error) { const result = await fetchSyncResult(client, args.session_id!)
if (!result.ok) {
if (toastManager) { if (toastManager) {
toastManager.removeTask(taskId) toastManager.removeTask(taskId)
} }
return `Error fetching result: ${messagesResult.error}\n\nSession ID: ${args.session_id}` return result.error
} }
const messages = ((messagesResult as { data?: unknown }).data ?? messagesResult) as SessionMessage[]
const assistantMessages = messages
.filter((m) => m.info?.role === "assistant")
.sort((a, b) => (b.info?.time?.created ?? 0) - (a.info?.time?.created ?? 0))
const lastMessage = assistantMessages[0]
if (toastManager) { if (toastManager) {
toastManager.removeTask(taskId) toastManager.removeTask(taskId)
} }
if (!lastMessage) {
return `No assistant response found.\n\nSession ID: ${args.session_id}`
}
const textParts = lastMessage?.parts?.filter((p) => p.type === "text" || p.type === "reasoning") ?? []
const textContent = textParts.map((p) => p.text ?? "").filter(Boolean).join("\n")
const duration = formatDuration(startTime) const duration = formatDuration(startTime)
return `Task continued and completed in ${duration}. return `Task continued and completed in ${duration}.
--- ---
${textContent || "(No text output)"} ${result.textContent || "(No text output)"}
<task_metadata> <task_metadata>
session_id: ${args.session_id} session_id: ${args.session_id}

View File

@@ -1,6 +1,6 @@
import type { DelegateTaskArgs, OpencodeClient } from "./types" import type { DelegateTaskArgs, OpencodeClient } from "./types"
import { isPlanFamily } from "./constants" import { isPlanFamily } from "./constants"
import { promptSyncWithModelSuggestionRetry } from "../../shared/model-suggestion-retry" import { promptWithModelSuggestionRetry } from "../../shared/model-suggestion-retry"
import { formatDetailedError } from "./error-formatting" import { formatDetailedError } from "./error-formatting"
export async function sendSyncPrompt( export async function sendSyncPrompt(
@@ -17,7 +17,7 @@ export async function sendSyncPrompt(
): Promise<string | null> { ): Promise<string | null> {
try { try {
const allowTask = isPlanFamily(input.agentToUse) const allowTask = isPlanFamily(input.agentToUse)
await promptSyncWithModelSuggestionRetry(client, { await promptWithModelSuggestionRetry(client, {
path: { id: input.sessionID }, path: { id: input.sessionID },
body: { body: {
agent: input.agentToUse, agent: input.agentToUse,

View File

@@ -0,0 +1,329 @@
declare const require: (name: string) => any
const { describe, test, expect, beforeEach, afterEach } = require("bun:test")
import { __setTimingConfig, __resetTimingConfig } from "./timing"
function createMockCtx(aborted = false) {
const controller = new AbortController()
if (aborted) controller.abort()
return {
sessionID: "parent-session",
messageID: "parent-message",
abort: controller.signal,
}
}
describe("pollSyncSession", () => {
beforeEach(() => {
__setTimingConfig({
POLL_INTERVAL_MS: 10,
MIN_STABILITY_TIME_MS: 0,
STABILITY_POLLS_REQUIRED: 1,
MAX_POLL_TIME_MS: 5000,
})
})
afterEach(() => {
__resetTimingConfig()
})
describe("native finish-based completion", () => {
test("detects completion when assistant message has terminal finish reason", async () => {
//#given - session messages with a terminal assistant finish ("end_turn")
// and the assistant id > user id (native opencode condition)
const { pollSyncSession } = require("./sync-session-poller")
let pollCount = 0
const mockClient = {
session: {
messages: async () => ({
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Done" }],
},
],
}),
status: async () => {
pollCount++
return { data: { "ses_test": { type: "idle" } } }
},
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_test",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then - should return null (success, no error)
expect(result).toBeNull()
})
test("keeps polling when assistant finish is tool-calls (non-terminal)", async () => {
//#given - first poll returns tool-calls finish, second returns end_turn
const { pollSyncSession } = require("./sync-session-poller")
let callCount = 0
const mockClient = {
session: {
messages: async () => {
callCount++
if (callCount <= 2) {
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "tool-calls" },
parts: [{ type: "tool-call", text: "calling tool" }],
},
],
}
}
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "tool-calls" },
parts: [{ type: "tool-call", text: "calling tool" }],
},
{ info: { id: "msg_003", role: "user", time: { created: 3000 } } },
{
info: { id: "msg_004", role: "assistant", time: { created: 4000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Final answer" }],
},
],
}
},
status: async () => ({ data: { "ses_test": { type: "idle" } } }),
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_test",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then
expect(result).toBeNull()
expect(callCount).toBeGreaterThan(2)
})
test("keeps polling when finish is 'unknown' (non-terminal)", async () => {
//#given
const { pollSyncSession } = require("./sync-session-poller")
let callCount = 0
const mockClient = {
session: {
messages: async () => {
callCount++
if (callCount <= 1) {
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "unknown" },
parts: [],
},
],
}
}
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "unknown" },
parts: [],
},
{ info: { id: "msg_003", role: "user", time: { created: 3000 } } },
{
info: { id: "msg_004", role: "assistant", time: { created: 4000 }, finish: "stop" },
parts: [{ type: "text", text: "Done" }],
},
],
}
},
status: async () => ({ data: { "ses_test": { type: "idle" } } }),
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_test",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then
expect(result).toBeNull()
expect(callCount).toBeGreaterThan(1)
})
test("does not complete when assistant id < user id (user sent after assistant)", async () => {
//#given - assistant finished but user message came after it (agent still processing)
const { pollSyncSession } = require("./sync-session-poller")
let callCount = 0
const mockClient = {
session: {
messages: async () => {
callCount++
if (callCount <= 1) {
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Partial" }],
},
{ info: { id: "msg_003", role: "user", time: { created: 3000 } } },
],
}
}
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Partial" }],
},
{ info: { id: "msg_003", role: "user", time: { created: 3000 } } },
{
info: { id: "msg_004", role: "assistant", time: { created: 4000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Final" }],
},
],
}
},
status: async () => ({ data: { "ses_test": { type: "idle" } } }),
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_test",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then
expect(result).toBeNull()
expect(callCount).toBeGreaterThan(1)
})
})
describe("abort handling", () => {
test("returns abort message when signal is aborted", async () => {
//#given
const { pollSyncSession } = require("./sync-session-poller")
const mockClient = {
session: {
messages: async () => ({ data: [] }),
status: async () => ({ data: {} }),
},
}
//#when
const result = await pollSyncSession(createMockCtx(true), mockClient, {
sessionID: "ses_abort",
agentToUse: "test-agent",
toastManager: { removeTask: () => {} },
taskId: "task_123",
})
//#then
expect(result).toContain("Task aborted")
expect(result).toContain("ses_abort")
})
})
describe("timeout handling", () => {
test("returns null on timeout (graceful)", async () => {
//#given - never returns a terminal finish, but timeout is very short
const { pollSyncSession } = require("./sync-session-poller")
__setTimingConfig({
POLL_INTERVAL_MS: 10,
MIN_STABILITY_TIME_MS: 0,
STABILITY_POLLS_REQUIRED: 1,
MAX_POLL_TIME_MS: 50,
})
const mockClient = {
session: {
messages: async () => ({
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
],
}),
status: async () => ({ data: { "ses_timeout": { type: "idle" } } }),
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_timeout",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then - timeout returns null (not an error, result is fetched separately)
expect(result).toBeNull()
})
})
describe("non-idle session status", () => {
test("skips message check when session is not idle", async () => {
//#given
const { pollSyncSession } = require("./sync-session-poller")
let statusCallCount = 0
let messageCallCount = 0
const mockClient = {
session: {
messages: async () => {
messageCallCount++
return {
data: [
{ info: { id: "msg_001", role: "user", time: { created: 1000 } } },
{
info: { id: "msg_002", role: "assistant", time: { created: 2000 }, finish: "end_turn" },
parts: [{ type: "text", text: "Done" }],
},
],
}
},
status: async () => {
statusCallCount++
if (statusCallCount <= 2) {
return { data: { "ses_busy": { type: "running" } } }
}
return { data: { "ses_busy": { type: "idle" } } }
},
},
}
//#when
const result = await pollSyncSession(createMockCtx(), mockClient, {
sessionID: "ses_busy",
agentToUse: "test-agent",
toastManager: null,
taskId: undefined,
})
//#then - should have waited for idle before checking messages
expect(result).toBeNull()
expect(statusCallCount).toBeGreaterThanOrEqual(3)
})
})
})

View File

@@ -1,7 +1,27 @@
import type { ToolContextWithMetadata, OpencodeClient } from "./types" import type { ToolContextWithMetadata, OpencodeClient } from "./types"
import type { SessionMessage } from "./executor-types"
import { getTimingConfig } from "./timing" import { getTimingConfig } from "./timing"
import { log } from "../../shared/logger" import { log } from "../../shared/logger"
const NON_TERMINAL_FINISH_REASONS = new Set(["tool-calls", "unknown"])
export function isSessionComplete(messages: SessionMessage[]): boolean {
let lastUser: SessionMessage | undefined
let lastAssistant: SessionMessage | undefined
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i]
if (!lastAssistant && msg.info?.role === "assistant") lastAssistant = msg
if (!lastUser && msg.info?.role === "user") lastUser = msg
if (lastUser && lastAssistant) break
}
if (!lastAssistant?.info?.finish) return false
if (NON_TERMINAL_FINISH_REASONS.has(lastAssistant.info.finish)) return false
if (!lastUser?.info?.id || !lastAssistant?.info?.id) return false
return lastUser.info.id < lastAssistant.info.id
}
export async function pollSyncSession( export async function pollSyncSession(
ctx: ToolContextWithMetadata, ctx: ToolContextWithMetadata,
client: OpencodeClient, client: OpencodeClient,
@@ -14,8 +34,6 @@ export async function pollSyncSession(
): Promise<string | null> { ): Promise<string | null> {
const syncTiming = getTimingConfig() const syncTiming = getTimingConfig()
const pollStart = Date.now() const pollStart = Date.now()
let lastMsgCount = 0
let stablePolls = 0
let pollCount = 0 let pollCount = 0
log("[task] Starting poll loop", { sessionID: input.sessionID, agentToUse: input.agentToUse }) log("[task] Starting poll loop", { sessionID: input.sessionID, agentToUse: input.agentToUse })
@@ -35,45 +53,29 @@ export async function pollSyncSession(
const sessionStatus = allStatuses[input.sessionID] const sessionStatus = allStatuses[input.sessionID]
if (pollCount % 10 === 0) { if (pollCount % 10 === 0) {
log("[task] Poll status", { log("[task] Poll status", {
sessionID: input.sessionID, sessionID: input.sessionID,
pollCount, pollCount,
elapsed: Math.floor((Date.now() - pollStart) / 1000) + "s", elapsed: Math.floor((Date.now() - pollStart) / 1000) + "s",
sessionStatus: sessionStatus?.type ?? "not_in_status", sessionStatus: sessionStatus?.type ?? "not_in_status",
stablePolls,
lastMsgCount,
}) })
} }
if (sessionStatus && sessionStatus.type !== "idle") { if (sessionStatus && sessionStatus.type !== "idle") {
stablePolls = 0
lastMsgCount = 0
continue continue
} }
const elapsed = Date.now() - pollStart const messagesResult = await client.session.messages({ path: { id: input.sessionID } })
if (elapsed < syncTiming.MIN_STABILITY_TIME_MS) { const msgs = ((messagesResult as { data?: unknown }).data ?? messagesResult) as SessionMessage[]
continue
}
const messagesCheck = await client.session.messages({ path: { id: input.sessionID } }) if (isSessionComplete(msgs)) {
const msgs = ((messagesCheck as { data?: unknown }).data ?? messagesCheck) as Array<unknown> log("[task] Poll complete - terminal finish detected", { sessionID: input.sessionID, pollCount })
const currentMsgCount = msgs.length break
if (currentMsgCount === lastMsgCount) {
stablePolls++
if (stablePolls >= syncTiming.STABILITY_POLLS_REQUIRED) {
log("[task] Poll complete - messages stable", { sessionID: input.sessionID, pollCount, currentMsgCount })
break
}
} else {
stablePolls = 0
lastMsgCount = currentMsgCount
} }
} }
if (Date.now() - pollStart >= syncTiming.MAX_POLL_TIME_MS) { if (Date.now() - pollStart >= syncTiming.MAX_POLL_TIME_MS) {
log("[task] Poll timeout reached", { sessionID: input.sessionID, pollCount, lastMsgCount, stablePolls }) log("[task] Poll timeout reached", { sessionID: input.sessionID, pollCount })
} }
return null return null

View File

@@ -1073,11 +1073,16 @@ describe("sisyphus-task", () => {
messages: async () => ({ messages: async () => ({
data: [ data: [
{ {
info: { role: "assistant", time: { created: Date.now() } }, info: { id: "msg_001", role: "user", time: { created: Date.now() } },
parts: [{ type: "text", text: "Continue the task" }],
},
{
info: { id: "msg_002", role: "assistant", time: { created: Date.now() + 1 }, finish: "end_turn" },
parts: [{ type: "text", text: "This is the continued task result" }], parts: [{ type: "text", text: "This is the continued task result" }],
}, },
], ],
}), }),
status: async () => ({ data: { "ses_continue_test": { type: "idle" } } }),
}, },
config: { get: async () => ({ data: { model: SYSTEM_DEFAULT_MODEL } }) }, config: { get: async () => ({ data: { model: SYSTEM_DEFAULT_MODEL } }) },
app: { app: {
@@ -1125,11 +1130,12 @@ describe("sisyphus-task", () => {
const mockClient = { const mockClient = {
session: { session: {
prompt: promptMock, prompt: promptMock,
promptAsync: async () => ({ data: {} }), promptAsync: promptMock,
messages: async () => ({ messages: async () => ({
data: [ data: [
{ {
info: { info: {
id: "msg_001",
role: "user", role: "user",
agent: "sisyphus-junior", agent: "sisyphus-junior",
model: { providerID: "anthropic", modelID: "claude-opus-4-6" }, model: { providerID: "anthropic", modelID: "claude-opus-4-6" },
@@ -1139,11 +1145,12 @@ describe("sisyphus-task", () => {
parts: [{ type: "text", text: "previous message" }], parts: [{ type: "text", text: "previous message" }],
}, },
{ {
info: { role: "assistant", time: { created: Date.now() + 1 } }, info: { id: "msg_002", role: "assistant", time: { created: Date.now() + 1 }, finish: "end_turn" },
parts: [{ type: "text", text: "Completed." }], parts: [{ type: "text", text: "Completed." }],
}, },
], ],
}), }),
status: async () => ({ data: { "ses_var_test": { type: "idle" } } }),
}, },
config: { get: async () => ({ data: { model: SYSTEM_DEFAULT_MODEL } }) }, config: { get: async () => ({ data: { model: SYSTEM_DEFAULT_MODEL } }) },
app: { app: {
@@ -1316,7 +1323,11 @@ describe("sisyphus-task", () => {
messages: async () => ({ messages: async () => ({
data: [ data: [
{ {
info: { role: "assistant", time: { created: Date.now() } }, info: { id: "msg_001", role: "user", time: { created: Date.now() } },
parts: [{ type: "text", text: "Do something" }],
},
{
info: { id: "msg_002", role: "assistant", time: { created: Date.now() + 1 }, finish: "end_turn" },
parts: [{ type: "text", text: "Sync task completed successfully" }], parts: [{ type: "text", text: "Sync task completed successfully" }],
}, },
], ],