From 3bf63a89e99ef83fcc7cc3db3d5a7a238d75438d Mon Sep 17 00:00:00 2001 From: tao Date: Thu, 29 Jan 2026 23:29:35 +0800 Subject: [PATCH] agents: add proactive compaction before request Check session tokens before sending API request. If estimated tokens exceed threshold (default 85% of context), run compaction first to avoid context overflow errors. This prevents the scenario where a large new prompt causes overflow and there's not enough space to even run compaction. New config options: - compaction.proactiveEnabled (default: true) - compaction.proactiveThresholdRatio (default: 0.85) Co-Authored-By: Claude --- .../proactive-compaction.test.ts | 186 ++++++++++++++++++ .../proactive-compaction.ts | 137 +++++++++++++ src/agents/pi-embedded-runner/run.ts | 55 ++++++ src/config/types.agent-defaults.ts | 4 + src/config/zod-schema.agent-defaults.ts | 2 + 5 files changed, 384 insertions(+) create mode 100644 src/agents/pi-embedded-runner/proactive-compaction.test.ts create mode 100644 src/agents/pi-embedded-runner/proactive-compaction.ts diff --git a/src/agents/pi-embedded-runner/proactive-compaction.test.ts b/src/agents/pi-embedded-runner/proactive-compaction.test.ts new file mode 100644 index 000000000..49446f089 --- /dev/null +++ b/src/agents/pi-embedded-runner/proactive-compaction.test.ts @@ -0,0 +1,186 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import os from "node:os"; +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; + +import { checkProactiveCompaction, resolveProactiveThreshold } from "./proactive-compaction.js"; + +vi.mock("./logger.js", () => ({ + log: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +describe("proactive-compaction", () => { + describe("resolveProactiveThreshold", () => { + it("returns 85% of context by default", () => { + const threshold = resolveProactiveThreshold({ + contextTokens: 200_000, + config: undefined, + }); + // 200k * 0.85 = 170k, but reserve-based may be lower + expect(threshold).toBeLessThanOrEqual(170_000); + expect(threshold).toBeGreaterThan(0); + }); + + it("respects custom proactiveThresholdRatio config", () => { + const threshold = resolveProactiveThreshold({ + contextTokens: 200_000, + config: { + agents: { + defaults: { + compaction: { + proactiveThresholdRatio: 0.7, + }, + }, + }, + } as any, + }); + // Should be min(140k ratio, reserve-based) + expect(threshold).toBeLessThanOrEqual(140_000); + }); + + it("accounts for reserveTokensFloor in threshold", () => { + const threshold = resolveProactiveThreshold({ + contextTokens: 50_000, + config: { + agents: { + defaults: { + compaction: { + reserveTokensFloor: 25_000, + }, + }, + }, + } as any, + }); + // (50000 - 25000) / 1.2 = ~20833 + // vs 50000 * 0.85 = 42500 + // min = ~20833 + expect(threshold).toBeLessThanOrEqual(25_000); + }); + + it("returns 0 for very small context windows", () => { + const threshold = resolveProactiveThreshold({ + contextTokens: 1_000, + config: { + agents: { + defaults: { + compaction: { + reserveTokensFloor: 20_000, + }, + }, + }, + } as any, + }); + expect(threshold).toBe(0); + }); + }); + + describe("checkProactiveCompaction", () => { + let tmpDir: string; + + beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "proactive-compaction-test-")); + }); + + afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); + }); + + it("returns shouldCompact=false for missing session file", async () => { + const result = await checkProactiveCompaction({ + sessionFile: path.join(tmpDir, "nonexistent.jsonl"), + contextTokens: 200_000, + }); + expect(result.shouldCompact).toBe(false); + expect(result.reason).toBe("no_session_file"); + }); + + it("returns shouldCompact=false for empty session", async () => { + const sessionFile = path.join(tmpDir, "empty-session.jsonl"); + // Create empty session file with just header (no message entries) + await fs.writeFile(sessionFile, JSON.stringify({ v: 1, id: "test", cwd: tmpDir }) + "\n"); + + const result = await checkProactiveCompaction({ + sessionFile, + contextTokens: 200_000, + }); + expect(result.shouldCompact).toBe(false); + expect(result.reason).toBe("empty_session"); + }); + + it("returns shouldCompact=false when tokens below threshold", async () => { + const sessionFile = path.join(tmpDir, "small-session.jsonl"); + // Create session with small messages using correct JSONL format (type: "message") + const header = { v: 1, id: "test", cwd: tmpDir }; + const userEntry = { + type: "message", + message: { role: "user", content: [{ type: "text", text: "Hello" }] }, + }; + const assistantEntry = { + type: "message", + message: { role: "assistant", content: [{ type: "text", text: "Hi there!" }] }, + }; + await fs.writeFile( + sessionFile, + [JSON.stringify(header), JSON.stringify(userEntry), JSON.stringify(assistantEntry)].join( + "\n", + ) + "\n", + ); + + const result = await checkProactiveCompaction({ + sessionFile, + contextTokens: 200_000, + promptTokenEstimate: 100, + }); + expect(result.shouldCompact).toBe(false); + expect(result.reason).toBe("below_threshold"); + expect(result.estimatedTokens).toBeGreaterThan(0); + }); + + it("returns shouldCompact=true when tokens exceed threshold", async () => { + const sessionFile = path.join(tmpDir, "large-session.jsonl"); + // Create session with a very large message + const header = { v: 1, id: "test", cwd: tmpDir }; + const largeText = "x".repeat(100_000); // ~25k tokens at 4 chars/token + const userEntry = { + type: "message", + message: { role: "user", content: [{ type: "text", text: largeText }] }, + }; + await fs.writeFile( + sessionFile, + [JSON.stringify(header), JSON.stringify(userEntry)].join("\n") + "\n", + ); + + const result = await checkProactiveCompaction({ + sessionFile, + contextTokens: 30_000, // Small context to trigger compaction + promptTokenEstimate: 1000, + }); + expect(result.shouldCompact).toBe(true); + expect(result.reason).toBe("exceeded"); + }); + + it("handles file with no valid message entries gracefully", async () => { + const sessionFile = path.join(tmpDir, "no-messages-session.jsonl"); + // File exists but has no message-type entries + await fs.writeFile( + sessionFile, + [ + JSON.stringify({ v: 1, id: "test", cwd: tmpDir }), + JSON.stringify({ type: "other", data: "something" }), + ].join("\n") + "\n", + ); + + const result = await checkProactiveCompaction({ + sessionFile, + contextTokens: 200_000, + }); + expect(result.shouldCompact).toBe(false); + expect(result.reason).toBe("empty_session"); + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/proactive-compaction.ts b/src/agents/pi-embedded-runner/proactive-compaction.ts new file mode 100644 index 000000000..971d868f7 --- /dev/null +++ b/src/agents/pi-embedded-runner/proactive-compaction.ts @@ -0,0 +1,137 @@ +import fs from "node:fs/promises"; +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +import type { MoltbotConfig } from "../../config/config.js"; +import { estimateMessagesTokens, SAFETY_MARGIN } from "../compaction.js"; +import { resolveCompactionReserveTokensFloor } from "../pi-settings.js"; +import { log } from "./logger.js"; + +export const DEFAULT_PROACTIVE_THRESHOLD_RATIO = 0.85; + +export type ProactiveCompactionCheckResult = { + shouldCompact: boolean; + estimatedTokens: number; + threshold: number; + reason?: "no_session_file" | "empty_session" | "check_failed" | "below_threshold" | "exceeded"; +}; + +/** + * Resolve the token threshold at which proactive compaction should trigger. + * Returns min(ratio-based threshold, reserve-based threshold). + */ +export function resolveProactiveThreshold(params: { + contextTokens: number; + config?: MoltbotConfig; +}): number { + const { contextTokens, config } = params; + const reserveTokensFloor = resolveCompactionReserveTokensFloor(config); + const thresholdRatio = + config?.agents?.defaults?.compaction?.proactiveThresholdRatio ?? + DEFAULT_PROACTIVE_THRESHOLD_RATIO; + + // Ratio-based: e.g., 85% of context + const ratioThreshold = Math.floor(contextTokens * thresholdRatio); + + // Reserve-based: context minus reserve, with safety margin for estimation inaccuracy + const reserveThreshold = Math.floor((contextTokens - reserveTokensFloor) / SAFETY_MARGIN); + + return Math.max(0, Math.min(ratioThreshold, reserveThreshold)); +} + +type SessionEntry = { + type?: string; + message?: AgentMessage; +}; + +/** + * Read messages from a session JSONL file. + * Returns an array of AgentMessages extracted from "message" type entries. + */ +async function readSessionMessages(sessionFile: string): Promise { + const raw = await fs.readFile(sessionFile, "utf-8"); + const lines = raw.split(/\r?\n/).filter(Boolean); + + const messages: AgentMessage[] = []; + for (const line of lines) { + try { + const entry = JSON.parse(line) as SessionEntry; + if (entry.type === "message" && entry.message) { + messages.push(entry.message); + } + } catch { + // Skip malformed lines + } + } + return messages; +} + +/** + * Check if proactive compaction should run before sending a request. + * Reads the session file to estimate current tokens and compares against threshold. + */ +export async function checkProactiveCompaction(params: { + sessionFile: string; + contextTokens: number; + config?: MoltbotConfig; + promptTokenEstimate?: number; +}): Promise { + const threshold = resolveProactiveThreshold({ + contextTokens: params.contextTokens, + config: params.config, + }); + + // Check if session file exists + try { + await fs.access(params.sessionFile); + } catch { + return { + shouldCompact: false, + estimatedTokens: 0, + threshold, + reason: "no_session_file", + }; + } + + try { + const messages = await readSessionMessages(params.sessionFile); + + if (messages.length === 0) { + return { + shouldCompact: false, + estimatedTokens: 0, + threshold, + reason: "empty_session", + }; + } + + // Estimate tokens from existing messages + const existingTokens = estimateMessagesTokens(messages); + + // Add estimate for the new prompt (default ~500 tokens if not provided) + const promptEstimate = params.promptTokenEstimate ?? 500; + + // Apply safety margin to account for estimation inaccuracy + const estimatedTotalTokens = Math.floor((existingTokens + promptEstimate) * SAFETY_MARGIN); + + const shouldCompact = estimatedTotalTokens >= threshold; + + return { + shouldCompact, + estimatedTokens: estimatedTotalTokens, + threshold, + reason: shouldCompact ? "exceeded" : "below_threshold", + }; + } catch (err) { + log.debug( + `proactive compaction check failed (proceeding without): ${ + err instanceof Error ? err.message : String(err) + }`, + ); + return { + shouldCompact: false, + estimatedTokens: 0, + threshold, + reason: "check_failed", + }; + } +} diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 870453f38..8dfa20dcf 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -46,6 +46,7 @@ import { normalizeUsage, type UsageLike } from "../usage.js"; import { compactEmbeddedPiSessionDirect } from "./compact.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; +import { checkProactiveCompaction } from "./proactive-compaction.js"; import { resolveModel } from "./model.js"; import { runEmbeddedAttempt } from "./run/attempt.js"; import type { RunEmbeddedPiAgentParams } from "./run/params.js"; @@ -293,9 +294,63 @@ export async function runEmbeddedPiAgent( } let overflowCompactionAttempted = false; + let proactiveCompactionAttempted = false; try { while (true) { attemptedThinking.add(thinkLevel); + + // Proactive compaction: check before sending if near context limit + const proactiveEnabled = + params.config?.agents?.defaults?.compaction?.proactiveEnabled ?? true; + if (proactiveEnabled && !overflowCompactionAttempted && !proactiveCompactionAttempted) { + const proactiveCheck = await checkProactiveCompaction({ + sessionFile: params.sessionFile, + contextTokens: ctxInfo.tokens, + config: params.config, + promptTokenEstimate: Math.ceil(params.prompt.length / 4), + }); + + if (proactiveCheck.shouldCompact) { + log.info( + `proactive compaction triggered: estimated=${proactiveCheck.estimatedTokens} ` + + `threshold=${proactiveCheck.threshold} for ${provider}/${modelId}`, + ); + proactiveCompactionAttempted = true; + + const compactResult = await compactEmbeddedPiSessionDirect({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + sessionFile: params.sessionFile, + workspaceDir: params.workspaceDir, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + provider, + model: modelId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + }); + + if (compactResult.compacted) { + log.info( + `proactive compaction succeeded: tokensBefore=${compactResult.result?.tokensBefore} ` + + `tokensAfter=${compactResult.result?.tokensAfter}`, + ); + } else { + log.warn( + `proactive compaction skipped: ${compactResult.reason ?? "nothing to compact"}`, + ); + } + } + } + await fs.mkdir(resolvedWorkspace, { recursive: true }); const prompt = diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 9c6ce0211..62a754be5 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -248,6 +248,10 @@ export type AgentCompactionConfig = { maxHistoryShare?: number; /** Pre-compaction memory flush (agentic turn). Default: enabled. */ memoryFlush?: AgentCompactionMemoryFlushConfig; + /** Enable proactive compaction before sending requests (default: true). */ + proactiveEnabled?: boolean; + /** Threshold ratio of context window to trigger proactive compaction (0.5–0.95, default 0.85). */ + proactiveThresholdRatio?: number; }; export type AgentCompactionMemoryFlushConfig = { diff --git a/src/config/zod-schema.agent-defaults.ts b/src/config/zod-schema.agent-defaults.ts index a849078ed..0d8cc43b0 100644 --- a/src/config/zod-schema.agent-defaults.ts +++ b/src/config/zod-schema.agent-defaults.ts @@ -100,6 +100,8 @@ export const AgentDefaultsSchema = z }) .strict() .optional(), + proactiveEnabled: z.boolean().optional(), + proactiveThresholdRatio: z.number().min(0.5).max(0.95).optional(), }) .strict() .optional(),