fix: address cubic review — SDK compatibility and race condition fixes
This commit is contained in:
@@ -1,12 +1,15 @@
|
||||
import type { OhMyOpenCodeConfig } from "../config"
|
||||
import { log } from "../shared/logger"
|
||||
import { isStepOnlyNoTextParts, resolveNoTextTailFromSession } from "./preemptive-compaction-no-text-tail"
|
||||
import { resolveNoTextTailFromSession } from "./preemptive-compaction-no-text-tail"
|
||||
import { resolveCompactionModel } from "./shared/compaction-model-resolver"
|
||||
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
const POST_COMPACTION_MONITOR_COUNT = 5
|
||||
const POST_COMPACTION_NO_TEXT_THRESHOLD = 3
|
||||
|
||||
declare function setTimeout(handler: () => void, timeout?: number): unknown
|
||||
declare function clearTimeout(timeoutID: unknown): void
|
||||
|
||||
interface CompactionTargetState {
|
||||
providerID: string
|
||||
modelID: string
|
||||
@@ -16,12 +19,12 @@ interface ClientLike {
|
||||
session: {
|
||||
summarize: (input: {
|
||||
path: { id: string }
|
||||
body: { providerID: string; modelID: string; auto: true }
|
||||
body: { providerID: string; modelID: string }
|
||||
query: { directory: string }
|
||||
}) => Promise<unknown>
|
||||
messages: (input: {
|
||||
path: { id: string }
|
||||
query: { directory: string }
|
||||
sessionID: string
|
||||
directory: string
|
||||
}) => Promise<unknown>
|
||||
}
|
||||
tui: {
|
||||
@@ -39,7 +42,6 @@ interface ClientLike {
|
||||
export interface AssistantCompactionMessageInfo {
|
||||
sessionID: string
|
||||
id?: string
|
||||
parts?: unknown
|
||||
}
|
||||
|
||||
async function withTimeout<TValue>(
|
||||
@@ -47,7 +49,7 @@ async function withTimeout<TValue>(
|
||||
timeoutMs: number,
|
||||
errorMessage: string,
|
||||
): Promise<TValue> {
|
||||
let timeoutID: ReturnType<typeof setTimeout> | undefined
|
||||
let timeoutID: unknown
|
||||
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timeoutID = setTimeout(() => {
|
||||
@@ -56,7 +58,7 @@ async function withTimeout<TValue>(
|
||||
})
|
||||
|
||||
return await Promise.race([promise, timeoutPromise]).finally(() => {
|
||||
if (timeoutID !== undefined) clearTimeout(timeoutID)
|
||||
clearTimeout(timeoutID)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -71,14 +73,18 @@ export function createPostCompactionDegradationMonitor(args: {
|
||||
const postCompactionRemaining = new Map<string, number>()
|
||||
const postCompactionNoTextStreak = new Map<string, number>()
|
||||
const postCompactionRecoveryTriggered = new Set<string>()
|
||||
const postCompactionEpoch = new Map<string, number>()
|
||||
|
||||
const clear = (sessionID: string): void => {
|
||||
postCompactionRemaining.delete(sessionID)
|
||||
postCompactionNoTextStreak.delete(sessionID)
|
||||
postCompactionRecoveryTriggered.delete(sessionID)
|
||||
postCompactionEpoch.delete(sessionID)
|
||||
}
|
||||
|
||||
const onSessionCompacted = (sessionID: string): void => {
|
||||
const nextEpoch = (postCompactionEpoch.get(sessionID) ?? 0) + 1
|
||||
postCompactionEpoch.set(sessionID, nextEpoch)
|
||||
postCompactionRemaining.set(sessionID, POST_COMPACTION_MONITOR_COUNT)
|
||||
postCompactionNoTextStreak.set(sessionID, 0)
|
||||
postCompactionRecoveryTriggered.delete(sessionID)
|
||||
@@ -95,6 +101,7 @@ export function createPostCompactionDegradationMonitor(args: {
|
||||
|
||||
postCompactionRecoveryTriggered.add(sessionID)
|
||||
compactionInProgress.add(sessionID)
|
||||
const recoveryEpoch = postCompactionEpoch.get(sessionID) ?? 0
|
||||
|
||||
try {
|
||||
const { providerID: targetProviderID, modelID: targetModelID } = resolveCompactionModel(
|
||||
@@ -118,7 +125,7 @@ export function createPostCompactionDegradationMonitor(args: {
|
||||
await withTimeout(
|
||||
client.session.summarize({
|
||||
path: { id: sessionID },
|
||||
body: { providerID: targetProviderID, modelID: targetModelID, auto: true },
|
||||
body: { providerID: targetProviderID, modelID: targetModelID },
|
||||
query: { directory },
|
||||
}),
|
||||
PREEMPTIVE_COMPACTION_TIMEOUT_MS,
|
||||
@@ -133,7 +140,9 @@ export function createPostCompactionDegradationMonitor(args: {
|
||||
})
|
||||
} finally {
|
||||
compactionInProgress.delete(sessionID)
|
||||
clear(sessionID)
|
||||
if ((postCompactionEpoch.get(sessionID) ?? 0) === recoveryEpoch) {
|
||||
clear(sessionID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,13 +156,12 @@ export function createPostCompactionDegradationMonitor(args: {
|
||||
postCompactionRemaining.set(info.sessionID, remaining - 1)
|
||||
}
|
||||
|
||||
const isNoTextTail = isStepOnlyNoTextParts(info.parts)
|
||||
|| await resolveNoTextTailFromSession({
|
||||
client,
|
||||
sessionID: info.sessionID,
|
||||
messageID: info.id,
|
||||
directory,
|
||||
})
|
||||
const isNoTextTail = await resolveNoTextTailFromSession({
|
||||
client,
|
||||
sessionID: info.sessionID,
|
||||
messageID: info.id,
|
||||
directory,
|
||||
})
|
||||
|
||||
if (!isNoTextTail) {
|
||||
postCompactionNoTextStreak.set(info.sessionID, 0)
|
||||
|
||||
@@ -38,8 +38,8 @@ export async function resolveNoTextTailFromSession(args: {
|
||||
client: {
|
||||
session: {
|
||||
messages: (input: {
|
||||
path: { id: string }
|
||||
query: { directory: string }
|
||||
sessionID: string
|
||||
directory: string
|
||||
}) => Promise<unknown>
|
||||
}
|
||||
}
|
||||
@@ -51,8 +51,8 @@ export async function resolveNoTextTailFromSession(args: {
|
||||
|
||||
try {
|
||||
const response = await client.session.messages({
|
||||
path: { id: sessionID },
|
||||
query: { directory },
|
||||
sessionID,
|
||||
directory,
|
||||
})
|
||||
|
||||
const messages = normalizeSDKResponse(response, [] as SessionMessage[], {
|
||||
|
||||
@@ -11,6 +11,9 @@ import { createPostCompactionDegradationMonitor } from "./preemptive-compaction-
|
||||
const PREEMPTIVE_COMPACTION_TIMEOUT_MS = 120_000
|
||||
const PREEMPTIVE_COMPACTION_THRESHOLD = 0.78
|
||||
|
||||
declare function setTimeout(handler: () => void, timeout?: number): unknown
|
||||
declare function clearTimeout(timeoutID: unknown): void
|
||||
|
||||
interface TokenInfo {
|
||||
input: number
|
||||
output: number
|
||||
@@ -29,7 +32,7 @@ async function withTimeout<TValue>(
|
||||
timeoutMs: number,
|
||||
errorMessage: string,
|
||||
): Promise<TValue> {
|
||||
let timeoutID: ReturnType<typeof setTimeout> | undefined
|
||||
let timeoutID: unknown
|
||||
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
timeoutID = setTimeout(() => {
|
||||
@@ -38,9 +41,7 @@ async function withTimeout<TValue>(
|
||||
})
|
||||
|
||||
return await Promise.race([promise, timeoutPromise]).finally(() => {
|
||||
if (timeoutID !== undefined) {
|
||||
clearTimeout(timeoutID)
|
||||
}
|
||||
clearTimeout(timeoutID)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -165,7 +166,6 @@ export function createPreemptiveCompactionHook(
|
||||
modelID?: string
|
||||
finish?: boolean
|
||||
tokens?: TokenInfo
|
||||
parts?: unknown
|
||||
} | undefined
|
||||
|
||||
if (!info || info.role !== "assistant" || !info.finish || !info.sessionID) return
|
||||
@@ -182,7 +182,6 @@ export function createPreemptiveCompactionHook(
|
||||
await postCompactionMonitor.onAssistantMessageUpdated({
|
||||
sessionID: info.sessionID,
|
||||
id: info.id,
|
||||
parts: info.parts,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user