fix(runtime-fallback): scope visible-assistant check to current turn and cleanup retry dedupe keys
This commit is contained in:
107
src/hooks/runtime-fallback/event-handler.test.ts
Normal file
107
src/hooks/runtime-fallback/event-handler.test.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { describe, expect, it } from "bun:test"
|
||||
import type { HookDeps, RuntimeFallbackPluginInput } from "./types"
|
||||
import type { AutoRetryHelpers } from "./auto-retry"
|
||||
import { createFallbackState } from "./fallback-state"
|
||||
import { createEventHandler } from "./event-handler"
|
||||
|
||||
function createContext(): RuntimeFallbackPluginInput {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
abort: async () => ({}),
|
||||
messages: async () => ({ data: [] }),
|
||||
promptAsync: async () => ({}),
|
||||
},
|
||||
tui: {
|
||||
showToast: async () => ({}),
|
||||
},
|
||||
},
|
||||
directory: "/test/dir",
|
||||
}
|
||||
}
|
||||
|
||||
function createDeps(): HookDeps {
|
||||
return {
|
||||
ctx: createContext(),
|
||||
config: {
|
||||
enabled: true,
|
||||
retry_on_errors: [429, 503, 529],
|
||||
max_fallback_attempts: 3,
|
||||
cooldown_seconds: 60,
|
||||
timeout_seconds: 30,
|
||||
notify_on_fallback: false,
|
||||
},
|
||||
options: undefined,
|
||||
pluginConfig: {},
|
||||
sessionStates: new Map(),
|
||||
sessionLastAccess: new Map(),
|
||||
sessionRetryInFlight: new Set(),
|
||||
sessionAwaitingFallbackResult: new Set(),
|
||||
sessionFallbackTimeouts: new Map(),
|
||||
sessionStatusRetryKeys: new Map(),
|
||||
}
|
||||
}
|
||||
|
||||
function createHelpers(deps: HookDeps, abortCalls: string[], clearCalls: string[]): AutoRetryHelpers {
|
||||
return {
|
||||
abortSessionRequest: async (sessionID: string) => {
|
||||
abortCalls.push(sessionID)
|
||||
},
|
||||
clearSessionFallbackTimeout: (sessionID: string) => {
|
||||
clearCalls.push(sessionID)
|
||||
deps.sessionFallbackTimeouts.delete(sessionID)
|
||||
},
|
||||
scheduleSessionFallbackTimeout: () => {},
|
||||
autoRetryWithFallback: async () => {},
|
||||
resolveAgentForSessionFromContext: async () => undefined,
|
||||
cleanupStaleSessions: () => {},
|
||||
}
|
||||
}
|
||||
|
||||
describe("createEventHandler", () => {
|
||||
it("#given a session retry dedupe key #when session.stop fires #then the retry dedupe key is cleared", async () => {
|
||||
// given
|
||||
const sessionID = "session-stop"
|
||||
const deps = createDeps()
|
||||
const abortCalls: string[] = []
|
||||
const clearCalls: string[] = []
|
||||
const state = createFallbackState("google/gemini-2.5-pro")
|
||||
state.pendingFallbackModel = "openai/gpt-5.4"
|
||||
deps.sessionStates.set(sessionID, state)
|
||||
deps.sessionRetryInFlight.add(sessionID)
|
||||
deps.sessionStatusRetryKeys.set(sessionID, "retry:1")
|
||||
const handler = createEventHandler(deps, createHelpers(deps, abortCalls, clearCalls))
|
||||
|
||||
// when
|
||||
await handler({ event: { type: "session.stop", properties: { sessionID } } })
|
||||
|
||||
// then
|
||||
expect(deps.sessionStatusRetryKeys.has(sessionID)).toBe(false)
|
||||
expect(clearCalls).toEqual([sessionID])
|
||||
expect(abortCalls).toEqual([sessionID])
|
||||
})
|
||||
|
||||
it("#given a session retry dedupe key without a pending fallback result #when session.idle fires #then the retry dedupe key is cleared", async () => {
|
||||
// given
|
||||
const sessionID = "session-idle"
|
||||
const deps = createDeps()
|
||||
const abortCalls: string[] = []
|
||||
const clearCalls: string[] = []
|
||||
const state = createFallbackState("google/gemini-2.5-pro")
|
||||
state.pendingFallbackModel = "openai/gpt-5.4"
|
||||
deps.sessionStates.set(sessionID, state)
|
||||
deps.sessionRetryInFlight.add(sessionID)
|
||||
deps.sessionFallbackTimeouts.set(sessionID, 1)
|
||||
deps.sessionStatusRetryKeys.set(sessionID, "retry:1")
|
||||
const handler = createEventHandler(deps, createHelpers(deps, abortCalls, clearCalls))
|
||||
|
||||
// when
|
||||
await handler({ event: { type: "session.idle", properties: { sessionID } } })
|
||||
|
||||
// then
|
||||
expect(deps.sessionStatusRetryKeys.has(sessionID)).toBe(false)
|
||||
expect(clearCalls).toEqual([sessionID])
|
||||
expect(abortCalls).toEqual([])
|
||||
expect(state.pendingFallbackModel).toBe(undefined)
|
||||
})
|
||||
})
|
||||
@@ -54,6 +54,7 @@ export function createEventHandler(deps: HookDeps, helpers: AutoRetryHelpers) {
|
||||
|
||||
sessionRetryInFlight.delete(sessionID)
|
||||
sessionAwaitingFallbackResult.delete(sessionID)
|
||||
sessionStatusRetryKeys.delete(sessionID)
|
||||
|
||||
const state = sessionStates.get(sessionID)
|
||||
if (state?.pendingFallbackModel) {
|
||||
@@ -75,6 +76,7 @@ export function createEventHandler(deps: HookDeps, helpers: AutoRetryHelpers) {
|
||||
const hadTimeout = sessionFallbackTimeouts.has(sessionID)
|
||||
helpers.clearSessionFallbackTimeout(sessionID)
|
||||
sessionRetryInFlight.delete(sessionID)
|
||||
sessionStatusRetryKeys.delete(sessionID)
|
||||
|
||||
const state = sessionStates.get(sessionID)
|
||||
if (state?.pendingFallbackModel) {
|
||||
|
||||
88
src/hooks/runtime-fallback/hook-dispose-cleanup.test.ts
Normal file
88
src/hooks/runtime-fallback/hook-dispose-cleanup.test.ts
Normal file
@@ -0,0 +1,88 @@
|
||||
import { describe, expect, it } from "bun:test"
|
||||
import type { RuntimeFallbackPluginInput } from "./types"
|
||||
import { createRuntimeFallbackHook } from "./hook"
|
||||
import { SessionCategoryRegistry } from "../../shared/session-category-registry"
|
||||
|
||||
function createContext(promptCalls: unknown[]): RuntimeFallbackPluginInput {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
abort: async () => ({}),
|
||||
messages: async () => ({
|
||||
data: [{ info: { role: "user" }, parts: [{ type: "text", text: "retry this" }] }],
|
||||
}),
|
||||
promptAsync: async (args: unknown) => {
|
||||
promptCalls.push(args)
|
||||
return {}
|
||||
},
|
||||
},
|
||||
tui: {
|
||||
showToast: async () => ({}),
|
||||
},
|
||||
},
|
||||
directory: "/test/dir",
|
||||
}
|
||||
}
|
||||
|
||||
describe("createRuntimeFallbackHook dispose retry-key cleanup", () => {
|
||||
it("#given a session.status retry key #when dispose() is called #then the same retry event is not deduplicated afterward", async () => {
|
||||
// given
|
||||
const promptCalls: unknown[] = []
|
||||
const sessionID = "session-dispose-retry-key"
|
||||
const hook = createRuntimeFallbackHook(createContext(promptCalls), {
|
||||
config: {
|
||||
enabled: true,
|
||||
retry_on_errors: [429, 503, 529],
|
||||
max_fallback_attempts: 3,
|
||||
cooldown_seconds: 60,
|
||||
timeout_seconds: 30,
|
||||
notify_on_fallback: false,
|
||||
},
|
||||
pluginConfig: {
|
||||
categories: {
|
||||
test: {
|
||||
fallback_models: ["openai/gpt-5.2"],
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
SessionCategoryRegistry.register(sessionID, "test")
|
||||
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.created",
|
||||
properties: { info: { id: sessionID, model: "quotio/claude-opus-4-6" } },
|
||||
},
|
||||
})
|
||||
|
||||
const retryEvent = {
|
||||
event: {
|
||||
type: "session.status",
|
||||
properties: {
|
||||
sessionID,
|
||||
status: {
|
||||
type: "retry",
|
||||
attempt: 1,
|
||||
message: "All credentials for model claude-opus-4-6 are cooling down [retrying in 7m 56s attempt #1]",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
await hook.event(retryEvent)
|
||||
expect(promptCalls).toHaveLength(1)
|
||||
|
||||
// when
|
||||
hook.dispose?.()
|
||||
await hook.event({
|
||||
event: {
|
||||
type: "session.created",
|
||||
properties: { info: { id: sessionID, model: "quotio/claude-opus-4-6" } },
|
||||
},
|
||||
})
|
||||
await hook.event(retryEvent)
|
||||
|
||||
// then
|
||||
expect(promptCalls).toHaveLength(2)
|
||||
})
|
||||
})
|
||||
@@ -76,6 +76,7 @@ export function createRuntimeFallbackHook(
|
||||
deps.sessionRetryInFlight.clear()
|
||||
deps.sessionAwaitingFallbackResult.clear()
|
||||
deps.sessionFallbackTimeouts.clear()
|
||||
deps.sessionStatusRetryKeys.clear()
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
56
src/hooks/runtime-fallback/message-update-handler.test.ts
Normal file
56
src/hooks/runtime-fallback/message-update-handler.test.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import { describe, expect, it } from "bun:test"
|
||||
import type { RuntimeFallbackPluginInput } from "./types"
|
||||
import { hasVisibleAssistantResponse } from "./visible-assistant-response"
|
||||
|
||||
function createContext(messagesResponse: unknown): RuntimeFallbackPluginInput {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
abort: async () => ({}),
|
||||
messages: async () => messagesResponse,
|
||||
promptAsync: async () => ({}),
|
||||
},
|
||||
tui: {
|
||||
showToast: async () => ({}),
|
||||
},
|
||||
},
|
||||
directory: "/test/dir",
|
||||
}
|
||||
}
|
||||
|
||||
describe("hasVisibleAssistantResponse", () => {
|
||||
it("#given only an old assistant reply before the latest user turn #when visibility is checked #then the stale reply is ignored", async () => {
|
||||
// given
|
||||
const checkVisibleResponse = hasVisibleAssistantResponse(() => undefined)
|
||||
const ctx = createContext({
|
||||
data: [
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "older question" }] },
|
||||
{ info: { role: "assistant" }, parts: [{ type: "text", text: "older answer" }] },
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "latest question" }] },
|
||||
],
|
||||
})
|
||||
|
||||
// when
|
||||
const result = await checkVisibleResponse(ctx, "session-old-assistant", undefined)
|
||||
|
||||
// then
|
||||
expect(result).toBe(false)
|
||||
})
|
||||
|
||||
it("#given an assistant reply after the latest user turn #when visibility is checked #then the current reply is treated as visible", async () => {
|
||||
// given
|
||||
const checkVisibleResponse = hasVisibleAssistantResponse(() => undefined)
|
||||
const ctx = createContext({
|
||||
data: [
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "latest question" }] },
|
||||
{ info: { role: "assistant" }, parts: [{ type: "text", text: "visible answer" }] },
|
||||
],
|
||||
})
|
||||
|
||||
// when
|
||||
const result = await checkVisibleResponse(ctx, "session-visible-assistant", undefined)
|
||||
|
||||
// then
|
||||
expect(result).toBe(true)
|
||||
})
|
||||
})
|
||||
@@ -7,49 +7,12 @@ import { createFallbackState } from "./fallback-state"
|
||||
import { getFallbackModelsForSession } from "./fallback-models"
|
||||
import { resolveFallbackBootstrapModel } from "./fallback-bootstrap-model"
|
||||
import { dispatchFallbackRetry } from "./fallback-retry-dispatcher"
|
||||
import { extractSessionMessages } from "./session-messages"
|
||||
import { hasVisibleAssistantResponse } from "./visible-assistant-response"
|
||||
|
||||
export function hasVisibleAssistantResponse(extractAutoRetrySignalFn: typeof extractAutoRetrySignal) {
|
||||
return async (
|
||||
ctx: HookDeps["ctx"],
|
||||
sessionID: string,
|
||||
_info: Record<string, unknown> | undefined,
|
||||
): Promise<boolean> => {
|
||||
try {
|
||||
const messagesResp = await ctx.client.session.messages({
|
||||
path: { id: sessionID },
|
||||
query: { directory: ctx.directory },
|
||||
})
|
||||
|
||||
const msgs = extractSessionMessages(messagesResp)
|
||||
|
||||
if (!msgs || msgs.length === 0) return false
|
||||
|
||||
const lastAssistant = [...msgs].reverse().find((m) => m.info?.role === "assistant")
|
||||
if (!lastAssistant) return false
|
||||
if (lastAssistant.info?.error) return false
|
||||
|
||||
const parts = lastAssistant.parts ??
|
||||
(lastAssistant.info?.parts as Array<{ type?: string; text?: string }> | undefined)
|
||||
|
||||
const textFromParts = (parts ?? [])
|
||||
.filter((p) => p.type === "text" && typeof p.text === "string")
|
||||
.map((p) => p.text!.trim())
|
||||
.filter((text) => text.length > 0)
|
||||
.join("\n")
|
||||
|
||||
if (!textFromParts) return false
|
||||
if (extractAutoRetrySignalFn({ message: textFromParts })) return false
|
||||
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
export { hasVisibleAssistantResponse } from "./visible-assistant-response"
|
||||
|
||||
export function createMessageUpdateHandler(deps: HookDeps, helpers: AutoRetryHelpers) {
|
||||
const { ctx, config, pluginConfig, sessionStates, sessionLastAccess, sessionRetryInFlight, sessionAwaitingFallbackResult } = deps
|
||||
const { ctx, config, pluginConfig, sessionStates, sessionLastAccess, sessionRetryInFlight, sessionAwaitingFallbackResult, sessionStatusRetryKeys } = deps
|
||||
const checkVisibleResponse = hasVisibleAssistantResponse(extractAutoRetrySignal)
|
||||
|
||||
return async (props: Record<string, unknown> | undefined) => {
|
||||
@@ -91,6 +54,7 @@ export function createMessageUpdateHandler(deps: HookDeps, helpers: AutoRetryHel
|
||||
}
|
||||
|
||||
sessionAwaitingFallbackResult.delete(sessionID)
|
||||
sessionStatusRetryKeys.delete(sessionID)
|
||||
helpers.clearSessionFallbackTimeout(sessionID)
|
||||
const state = sessionStates.get(sessionID)
|
||||
if (state?.pendingFallbackModel) {
|
||||
|
||||
97
src/hooks/runtime-fallback/success-retry-key-cleanup.test.ts
Normal file
97
src/hooks/runtime-fallback/success-retry-key-cleanup.test.ts
Normal file
@@ -0,0 +1,97 @@
|
||||
import { describe, expect, it } from "bun:test"
|
||||
import type { HookDeps, RuntimeFallbackPluginInput } from "./types"
|
||||
import type { AutoRetryHelpers } from "./auto-retry"
|
||||
import { createFallbackState } from "./fallback-state"
|
||||
|
||||
type MessageUpdateHandlerModule = typeof import("./message-update-handler")
|
||||
|
||||
async function importFreshMessageUpdateHandlerModule(): Promise<MessageUpdateHandlerModule> {
|
||||
return import(`./message-update-handler?success-retry-key-${Date.now()}-${Math.random()}`)
|
||||
}
|
||||
|
||||
function createContext(messagesResponse: unknown): RuntimeFallbackPluginInput {
|
||||
return {
|
||||
client: {
|
||||
session: {
|
||||
abort: async () => ({}),
|
||||
messages: async () => messagesResponse,
|
||||
promptAsync: async () => ({}),
|
||||
},
|
||||
tui: {
|
||||
showToast: async () => ({}),
|
||||
},
|
||||
},
|
||||
directory: "/test/dir",
|
||||
}
|
||||
}
|
||||
|
||||
function createDeps(messagesResponse: unknown): HookDeps {
|
||||
return {
|
||||
ctx: createContext(messagesResponse),
|
||||
config: {
|
||||
enabled: true,
|
||||
retry_on_errors: [429, 503, 529],
|
||||
max_fallback_attempts: 3,
|
||||
cooldown_seconds: 60,
|
||||
timeout_seconds: 30,
|
||||
notify_on_fallback: false,
|
||||
},
|
||||
options: undefined,
|
||||
pluginConfig: {},
|
||||
sessionStates: new Map(),
|
||||
sessionLastAccess: new Map(),
|
||||
sessionRetryInFlight: new Set(),
|
||||
sessionAwaitingFallbackResult: new Set(),
|
||||
sessionFallbackTimeouts: new Map(),
|
||||
sessionStatusRetryKeys: new Map(),
|
||||
}
|
||||
}
|
||||
|
||||
function createHelpers(clearCalls: string[]): AutoRetryHelpers {
|
||||
return {
|
||||
abortSessionRequest: async () => {},
|
||||
clearSessionFallbackTimeout: (sessionID: string) => {
|
||||
clearCalls.push(sessionID)
|
||||
},
|
||||
scheduleSessionFallbackTimeout: () => {},
|
||||
autoRetryWithFallback: async () => {},
|
||||
resolveAgentForSessionFromContext: async () => undefined,
|
||||
cleanupStaleSessions: () => {},
|
||||
}
|
||||
}
|
||||
|
||||
describe("createMessageUpdateHandler retry-key cleanup", () => {
|
||||
it("#given a visible assistant reply after the latest user turn #when a non-error assistant update arrives #then the retry dedupe key is cleared with the fallback watchdog", async () => {
|
||||
// given
|
||||
const { createMessageUpdateHandler } = await importFreshMessageUpdateHandlerModule()
|
||||
const sessionID = "session-visible-assistant"
|
||||
const clearCalls: string[] = []
|
||||
const deps = createDeps({
|
||||
data: [
|
||||
{ info: { role: "user" }, parts: [{ type: "text", text: "latest question" }] },
|
||||
{ info: { role: "assistant" }, parts: [{ type: "text", text: "visible answer" }] },
|
||||
],
|
||||
})
|
||||
const state = createFallbackState("google/gemini-2.5-pro")
|
||||
state.pendingFallbackModel = "openai/gpt-5.4"
|
||||
deps.sessionStates.set(sessionID, state)
|
||||
deps.sessionAwaitingFallbackResult.add(sessionID)
|
||||
deps.sessionStatusRetryKeys.set(sessionID, "retry:1")
|
||||
const handler = createMessageUpdateHandler(deps, createHelpers(clearCalls))
|
||||
|
||||
// when
|
||||
await handler({
|
||||
info: {
|
||||
sessionID,
|
||||
role: "assistant",
|
||||
model: "openai/gpt-5.4",
|
||||
},
|
||||
})
|
||||
|
||||
// then
|
||||
expect(deps.sessionAwaitingFallbackResult.has(sessionID)).toBe(false)
|
||||
expect(deps.sessionStatusRetryKeys.has(sessionID)).toBe(false)
|
||||
expect(state.pendingFallbackModel).toBe(undefined)
|
||||
expect(clearCalls).toEqual([sessionID])
|
||||
})
|
||||
})
|
||||
80
src/hooks/runtime-fallback/visible-assistant-response.ts
Normal file
80
src/hooks/runtime-fallback/visible-assistant-response.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import type { HookDeps } from "./types"
|
||||
import type { SessionMessage, SessionMessagePart } from "./session-messages"
|
||||
import { extractSessionMessages } from "./session-messages"
|
||||
import { extractAutoRetrySignal } from "./error-classifier"
|
||||
|
||||
function getLastUserMessageIndex(messages: SessionMessage[]): number {
|
||||
for (let index = messages.length - 1; index >= 0; index--) {
|
||||
if (messages[index]?.info?.role === "user") {
|
||||
return index
|
||||
}
|
||||
}
|
||||
|
||||
return -1
|
||||
}
|
||||
|
||||
function getAssistantText(parts: SessionMessagePart[] | undefined): string {
|
||||
return (parts ?? [])
|
||||
.flatMap((part) => {
|
||||
if (part.type !== "text") {
|
||||
return []
|
||||
}
|
||||
|
||||
const text = typeof part.text === "string" ? part.text.trim() : ""
|
||||
return text.length > 0 ? [text] : []
|
||||
})
|
||||
.join("\n")
|
||||
}
|
||||
|
||||
export function hasVisibleAssistantResponse(extractAutoRetrySignalFn: typeof extractAutoRetrySignal) {
|
||||
return async (
|
||||
ctx: HookDeps["ctx"],
|
||||
sessionID: string,
|
||||
_info: Record<string, unknown> | undefined,
|
||||
): Promise<boolean> => {
|
||||
try {
|
||||
const messagesResponse = await ctx.client.session.messages({
|
||||
path: { id: sessionID },
|
||||
query: { directory: ctx.directory },
|
||||
})
|
||||
const messages = extractSessionMessages(messagesResponse)
|
||||
if (!messages || messages.length === 0) return false
|
||||
|
||||
const lastUserMessageIndex = getLastUserMessageIndex(messages)
|
||||
if (lastUserMessageIndex === -1) return false
|
||||
|
||||
for (let index = lastUserMessageIndex + 1; index < messages.length; index++) {
|
||||
const message = messages[index]
|
||||
if (message?.info?.role !== "assistant") {
|
||||
continue
|
||||
}
|
||||
|
||||
if (message.info?.error) {
|
||||
continue
|
||||
}
|
||||
|
||||
const infoParts = message.info?.parts
|
||||
const infoMessageParts = Array.isArray(infoParts)
|
||||
? infoParts.filter((part): part is SessionMessagePart => typeof part === "object" && part !== null)
|
||||
: undefined
|
||||
const parts = message.parts && message.parts.length > 0
|
||||
? message.parts
|
||||
: infoMessageParts
|
||||
const assistantText = getAssistantText(parts)
|
||||
if (!assistantText) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (extractAutoRetrySignalFn({ message: assistantText })) {
|
||||
continue
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user