From e0e39a6d522ac4ce06398c1ea7ffc012ec6337ab Mon Sep 17 00:00:00 2001 From: Dave Lauer Date: Thu, 22 Jan 2026 15:59:32 -0500 Subject: [PATCH] feat(compaction): add adaptive chunk sizing and progressive fallback - Add computeAdaptiveChunkRatio() to reduce chunk size for large messages - Add isOversizedForSummary() to detect messages too large to summarize - Add summarizeWithFallback() with progressive fallback: - Tries full summarization first - Falls back to partial summarization excluding oversized messages - Notes oversized messages in the summary output - Add SAFETY_MARGIN (1.2x) buffer for token estimation inaccuracy - Reduce MIN_CHUNK_RATIO to 0.15 for very large messages This prevents compaction failures when conversations contain unusually large tool outputs or responses that exceed the summarization model's context window. --- src/agents/pi-embedded-runner/google.ts | 20 +++ .../compaction-safeguard.test.ts | 114 +++++++++++++++- .../pi-extensions/compaction-safeguard.ts | 125 +++++++++++++++++- 3 files changed, 254 insertions(+), 5 deletions(-) diff --git a/src/agents/pi-embedded-runner/google.ts b/src/agents/pi-embedded-runner/google.ts index 8158c2c83..605e9b10b 100644 --- a/src/agents/pi-embedded-runner/google.ts +++ b/src/agents/pi-embedded-runner/google.ts @@ -1,3 +1,5 @@ +import { EventEmitter } from "node:events"; + import type { AgentMessage, AgentTool } from "@mariozechner/pi-agent-core"; import type { TSchema } from "@sinclair/typebox"; import type { SessionManager } from "@mariozechner/pi-coding-agent"; @@ -184,10 +186,28 @@ export function logToolSchemasForGoogle(params: { tools: AgentTool[]; provider: } } +// Event emitter for unhandled compaction failures that escape try-catch blocks. +// Listeners can use this to trigger session recovery with retry. +const compactionFailureEmitter = new EventEmitter(); + +export type CompactionFailureListener = (reason: string) => void; + +/** + * Register a listener for unhandled compaction failures. + * Called when auto-compaction fails in a way that escapes the normal try-catch, + * e.g., when the summarization request itself exceeds the model's token limit. + * Returns an unsubscribe function. + */ +export function onUnhandledCompactionFailure(cb: CompactionFailureListener): () => void { + compactionFailureEmitter.on("failure", cb); + return () => compactionFailureEmitter.off("failure", cb); +} + registerUnhandledRejectionHandler((reason) => { const message = describeUnknownError(reason); if (!isCompactionFailureError(message)) return false; log.error(`Auto-compaction failed (unhandled): ${message}`); + compactionFailureEmitter.emit("failure", message); return true; }); diff --git a/src/agents/pi-extensions/compaction-safeguard.test.ts b/src/agents/pi-extensions/compaction-safeguard.test.ts index d542da4fc..275e10e9f 100644 --- a/src/agents/pi-extensions/compaction-safeguard.test.ts +++ b/src/agents/pi-extensions/compaction-safeguard.test.ts @@ -3,7 +3,15 @@ import { describe, expect, it } from "vitest"; import { __testing } from "./compaction-safeguard.js"; -const { collectToolFailures, formatToolFailuresSection } = __testing; +const { + collectToolFailures, + formatToolFailuresSection, + computeAdaptiveChunkRatio, + isOversizedForSummary, + BASE_CHUNK_RATIO, + MIN_CHUNK_RATIO, + SAFETY_MARGIN, +} = __testing; describe("compaction-safeguard tool failures", () => { it("formats tool failures with meta and summary", () => { @@ -96,3 +104,107 @@ describe("compaction-safeguard tool failures", () => { expect(section).toBe(""); }); }); + +describe("computeAdaptiveChunkRatio", () => { + const CONTEXT_WINDOW = 200_000; + + it("returns BASE_CHUNK_RATIO for normal messages", () => { + // Small messages: 1000 tokens each, well under 10% of context + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(1000), timestamp: Date.now() }, + { + role: "assistant", + content: [{ type: "text", text: "y".repeat(1000) }], + timestamp: Date.now(), + }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBe(BASE_CHUNK_RATIO); + }); + + it("reduces ratio when average message > 10% of context", () => { + // Large messages: ~50K tokens each (25% of context) + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(50_000 * 4), timestamp: Date.now() }, + { + role: "assistant", + content: [{ type: "text", text: "y".repeat(50_000 * 4) }], + timestamp: Date.now(), + }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeLessThan(BASE_CHUNK_RATIO); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + }); + + it("respects MIN_CHUNK_RATIO floor", () => { + // Very large messages that would push ratio below minimum + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(150_000 * 4), timestamp: Date.now() }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + }); + + it("handles empty message array", () => { + const ratio = computeAdaptiveChunkRatio([], CONTEXT_WINDOW); + expect(ratio).toBe(BASE_CHUNK_RATIO); + }); + + it("handles single huge message", () => { + // Single massive message + const messages: AgentMessage[] = [ + { role: "user", content: "x".repeat(180_000 * 4), timestamp: Date.now() }, + ]; + + const ratio = computeAdaptiveChunkRatio(messages, CONTEXT_WINDOW); + expect(ratio).toBeGreaterThanOrEqual(MIN_CHUNK_RATIO); + expect(ratio).toBeLessThanOrEqual(BASE_CHUNK_RATIO); + }); +}); + +describe("isOversizedForSummary", () => { + const CONTEXT_WINDOW = 200_000; + + it("returns false for small messages", () => { + const msg: AgentMessage = { + role: "user", + content: "Hello, world!", + timestamp: Date.now(), + }; + + expect(isOversizedForSummary(msg, CONTEXT_WINDOW)).toBe(false); + }); + + it("returns true for messages > 50% of context", () => { + // Message with ~120K tokens (60% of 200K context) + // After safety margin (1.2x), effective is 144K which is > 100K (50%) + const msg: AgentMessage = { + role: "user", + content: "x".repeat(120_000 * 4), + timestamp: Date.now(), + }; + + expect(isOversizedForSummary(msg, CONTEXT_WINDOW)).toBe(true); + }); + + it("applies safety margin", () => { + // Message at exactly 50% of context before margin + // After SAFETY_MARGIN (1.2), it becomes 60% which is > 50% + const halfContextChars = (CONTEXT_WINDOW * 0.5) / SAFETY_MARGIN; + const msg: AgentMessage = { + role: "user", + content: "x".repeat(Math.floor(halfContextChars * 4)), + timestamp: Date.now(), + }; + + // With safety margin applied, this should be at the boundary + // The function checks if tokens * SAFETY_MARGIN > contextWindow * 0.5 + const isOversized = isOversizedForSummary(msg, CONTEXT_WINDOW); + // Due to token estimation, this could be either true or false at the boundary + expect(typeof isOversized).toBe("boolean"); + }); +}); diff --git a/src/agents/pi-extensions/compaction-safeguard.ts b/src/agents/pi-extensions/compaction-safeguard.ts index 9e4c20fef..a6a66637a 100644 --- a/src/agents/pi-extensions/compaction-safeguard.ts +++ b/src/agents/pi-extensions/compaction-safeguard.ts @@ -4,7 +4,9 @@ import { estimateTokens, generateSummary } from "@mariozechner/pi-coding-agent"; import { DEFAULT_CONTEXT_TOKENS } from "../defaults.js"; -const MAX_CHUNK_RATIO = 0.4; +const BASE_CHUNK_RATIO = 0.4; +const MIN_CHUNK_RATIO = 0.15; +const SAFETY_MARGIN = 1.2; // 20% buffer for estimateTokens() inaccuracy const FALLBACK_SUMMARY = "Summary unavailable due to context limits. Older messages were truncated."; const TURN_PREFIX_INSTRUCTIONS = @@ -160,6 +162,38 @@ function chunkMessages(messages: AgentMessage[], maxTokens: number): AgentMessag return chunks; } +/** + * Compute adaptive chunk ratio based on average message size. + * When messages are large, we use smaller chunks to avoid exceeding model limits. + */ +function computeAdaptiveChunkRatio(messages: AgentMessage[], contextWindow: number): number { + if (messages.length === 0) return BASE_CHUNK_RATIO; + + const totalTokens = messages.reduce((sum, m) => sum + estimateTokens(m), 0); + const avgTokens = totalTokens / messages.length; + + // Apply safety margin to account for estimation inaccuracy + const safeAvgTokens = avgTokens * SAFETY_MARGIN; + const avgRatio = safeAvgTokens / contextWindow; + + // If average message is > 10% of context, reduce chunk ratio + if (avgRatio > 0.1) { + const reduction = Math.min(avgRatio * 2, BASE_CHUNK_RATIO - MIN_CHUNK_RATIO); + return Math.max(MIN_CHUNK_RATIO, BASE_CHUNK_RATIO - reduction); + } + + return BASE_CHUNK_RATIO; +} + +/** + * Check if a single message is too large to summarize. + * If single message > 50% of context, it can't be summarized safely. + */ +function isOversizedForSummary(msg: AgentMessage, contextWindow: number): boolean { + const tokens = estimateTokens(msg) * SAFETY_MARGIN; + return tokens > contextWindow * 0.5; +} + async function summarizeChunks(params: { messages: AgentMessage[]; model: NonNullable; @@ -192,6 +226,78 @@ async function summarizeChunks(params: { return summary ?? "No prior history."; } +/** + * Summarize with progressive fallback for handling oversized messages. + * If full summarization fails, tries partial summarization excluding oversized messages. + */ +async function summarizeWithFallback(params: { + messages: AgentMessage[]; + model: NonNullable; + apiKey: string; + signal: AbortSignal; + reserveTokens: number; + maxChunkTokens: number; + contextWindow: number; + customInstructions?: string; + previousSummary?: string; +}): Promise { + const { messages, contextWindow } = params; + + if (messages.length === 0) { + return params.previousSummary ?? "No prior history."; + } + + // Try full summarization first + try { + return await summarizeChunks(params); + } catch (fullError) { + console.warn( + `Full summarization failed, trying partial: ${ + fullError instanceof Error ? fullError.message : String(fullError) + }`, + ); + } + + // Fallback 1: Summarize only small messages, note oversized ones + const smallMessages: AgentMessage[] = []; + const oversizedNotes: string[] = []; + + for (const msg of messages) { + if (isOversizedForSummary(msg, contextWindow)) { + const role = (msg as { role?: string }).role ?? "message"; + const tokens = estimateTokens(msg); + oversizedNotes.push( + `[Large ${role} (~${Math.round(tokens / 1000)}K tokens) omitted from summary]`, + ); + } else { + smallMessages.push(msg); + } + } + + if (smallMessages.length > 0) { + try { + const partialSummary = await summarizeChunks({ + ...params, + messages: smallMessages, + }); + const notes = oversizedNotes.length > 0 ? `\n\n${oversizedNotes.join("\n")}` : ""; + return partialSummary + notes; + } catch (partialError) { + console.warn( + `Partial summarization also failed: ${ + partialError instanceof Error ? partialError.message : String(partialError) + }`, + ); + } + } + + // Final fallback: Just note what was there + return ( + `Context contained ${messages.length} messages (${oversizedNotes.length} oversized). ` + + `Summary unavailable due to size limits.` + ); +} + export default function compactionSafeguardExtension(api: ExtensionAPI): void { api.on("session_before_compact", async (event, ctx) => { const { preparation, customInstructions, signal } = event; @@ -233,29 +339,35 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void { 1, Math.floor(model.contextWindow ?? DEFAULT_CONTEXT_TOKENS), ); - const maxChunkTokens = Math.max(1, Math.floor(contextWindowTokens * MAX_CHUNK_RATIO)); + + // Use adaptive chunk ratio based on message sizes + const allMessages = [...preparation.messagesToSummarize, ...preparation.turnPrefixMessages]; + const adaptiveRatio = computeAdaptiveChunkRatio(allMessages, contextWindowTokens); + const maxChunkTokens = Math.max(1, Math.floor(contextWindowTokens * adaptiveRatio)); const reserveTokens = Math.max(1, Math.floor(preparation.settings.reserveTokens)); - const historySummary = await summarizeChunks({ + const historySummary = await summarizeWithFallback({ messages: preparation.messagesToSummarize, model, apiKey, signal, reserveTokens, maxChunkTokens, + contextWindow: contextWindowTokens, customInstructions, previousSummary: preparation.previousSummary, }); let summary = historySummary; if (preparation.isSplitTurn && preparation.turnPrefixMessages.length > 0) { - const prefixSummary = await summarizeChunks({ + const prefixSummary = await summarizeWithFallback({ messages: preparation.turnPrefixMessages, model, apiKey, signal, reserveTokens, maxChunkTokens, + contextWindow: contextWindowTokens, customInstructions: TURN_PREFIX_INSTRUCTIONS, }); summary = `${historySummary}\n\n---\n\n**Turn Context (split turn):**\n\n${prefixSummary}`; @@ -293,4 +405,9 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void { export const __testing = { collectToolFailures, formatToolFailuresSection, + computeAdaptiveChunkRatio, + isOversizedForSummary, + BASE_CHUNK_RATIO, + MIN_CHUNK_RATIO, + SAFETY_MARGIN, } as const;