agents: add proactive compaction before request
Check session tokens before sending API request. If estimated tokens exceed threshold (default 85% of context), run compaction first to avoid context overflow errors. This prevents the scenario where a large new prompt causes overflow and there's not enough space to even run compaction. New config options: - compaction.proactiveEnabled (default: true) - compaction.proactiveThresholdRatio (default: 0.85) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
5f4715acfc
commit
3bf63a89e9
186
src/agents/pi-embedded-runner/proactive-compaction.test.ts
Normal file
186
src/agents/pi-embedded-runner/proactive-compaction.test.ts
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import path from "node:path";
|
||||||
|
import os from "node:os";
|
||||||
|
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||||
|
|
||||||
|
import { checkProactiveCompaction, resolveProactiveThreshold } from "./proactive-compaction.js";
|
||||||
|
|
||||||
|
vi.mock("./logger.js", () => ({
|
||||||
|
log: {
|
||||||
|
debug: vi.fn(),
|
||||||
|
info: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
error: vi.fn(),
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
describe("proactive-compaction", () => {
|
||||||
|
describe("resolveProactiveThreshold", () => {
|
||||||
|
it("returns 85% of context by default", () => {
|
||||||
|
const threshold = resolveProactiveThreshold({
|
||||||
|
contextTokens: 200_000,
|
||||||
|
config: undefined,
|
||||||
|
});
|
||||||
|
// 200k * 0.85 = 170k, but reserve-based may be lower
|
||||||
|
expect(threshold).toBeLessThanOrEqual(170_000);
|
||||||
|
expect(threshold).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("respects custom proactiveThresholdRatio config", () => {
|
||||||
|
const threshold = resolveProactiveThreshold({
|
||||||
|
contextTokens: 200_000,
|
||||||
|
config: {
|
||||||
|
agents: {
|
||||||
|
defaults: {
|
||||||
|
compaction: {
|
||||||
|
proactiveThresholdRatio: 0.7,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} as any,
|
||||||
|
});
|
||||||
|
// Should be min(140k ratio, reserve-based)
|
||||||
|
expect(threshold).toBeLessThanOrEqual(140_000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accounts for reserveTokensFloor in threshold", () => {
|
||||||
|
const threshold = resolveProactiveThreshold({
|
||||||
|
contextTokens: 50_000,
|
||||||
|
config: {
|
||||||
|
agents: {
|
||||||
|
defaults: {
|
||||||
|
compaction: {
|
||||||
|
reserveTokensFloor: 25_000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} as any,
|
||||||
|
});
|
||||||
|
// (50000 - 25000) / 1.2 = ~20833
|
||||||
|
// vs 50000 * 0.85 = 42500
|
||||||
|
// min = ~20833
|
||||||
|
expect(threshold).toBeLessThanOrEqual(25_000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns 0 for very small context windows", () => {
|
||||||
|
const threshold = resolveProactiveThreshold({
|
||||||
|
contextTokens: 1_000,
|
||||||
|
config: {
|
||||||
|
agents: {
|
||||||
|
defaults: {
|
||||||
|
compaction: {
|
||||||
|
reserveTokensFloor: 20_000,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} as any,
|
||||||
|
});
|
||||||
|
expect(threshold).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("checkProactiveCompaction", () => {
|
||||||
|
let tmpDir: string;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "proactive-compaction-test-"));
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns shouldCompact=false for missing session file", async () => {
|
||||||
|
const result = await checkProactiveCompaction({
|
||||||
|
sessionFile: path.join(tmpDir, "nonexistent.jsonl"),
|
||||||
|
contextTokens: 200_000,
|
||||||
|
});
|
||||||
|
expect(result.shouldCompact).toBe(false);
|
||||||
|
expect(result.reason).toBe("no_session_file");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns shouldCompact=false for empty session", async () => {
|
||||||
|
const sessionFile = path.join(tmpDir, "empty-session.jsonl");
|
||||||
|
// Create empty session file with just header (no message entries)
|
||||||
|
await fs.writeFile(sessionFile, JSON.stringify({ v: 1, id: "test", cwd: tmpDir }) + "\n");
|
||||||
|
|
||||||
|
const result = await checkProactiveCompaction({
|
||||||
|
sessionFile,
|
||||||
|
contextTokens: 200_000,
|
||||||
|
});
|
||||||
|
expect(result.shouldCompact).toBe(false);
|
||||||
|
expect(result.reason).toBe("empty_session");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns shouldCompact=false when tokens below threshold", async () => {
|
||||||
|
const sessionFile = path.join(tmpDir, "small-session.jsonl");
|
||||||
|
// Create session with small messages using correct JSONL format (type: "message")
|
||||||
|
const header = { v: 1, id: "test", cwd: tmpDir };
|
||||||
|
const userEntry = {
|
||||||
|
type: "message",
|
||||||
|
message: { role: "user", content: [{ type: "text", text: "Hello" }] },
|
||||||
|
};
|
||||||
|
const assistantEntry = {
|
||||||
|
type: "message",
|
||||||
|
message: { role: "assistant", content: [{ type: "text", text: "Hi there!" }] },
|
||||||
|
};
|
||||||
|
await fs.writeFile(
|
||||||
|
sessionFile,
|
||||||
|
[JSON.stringify(header), JSON.stringify(userEntry), JSON.stringify(assistantEntry)].join(
|
||||||
|
"\n",
|
||||||
|
) + "\n",
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await checkProactiveCompaction({
|
||||||
|
sessionFile,
|
||||||
|
contextTokens: 200_000,
|
||||||
|
promptTokenEstimate: 100,
|
||||||
|
});
|
||||||
|
expect(result.shouldCompact).toBe(false);
|
||||||
|
expect(result.reason).toBe("below_threshold");
|
||||||
|
expect(result.estimatedTokens).toBeGreaterThan(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns shouldCompact=true when tokens exceed threshold", async () => {
|
||||||
|
const sessionFile = path.join(tmpDir, "large-session.jsonl");
|
||||||
|
// Create session with a very large message
|
||||||
|
const header = { v: 1, id: "test", cwd: tmpDir };
|
||||||
|
const largeText = "x".repeat(100_000); // ~25k tokens at 4 chars/token
|
||||||
|
const userEntry = {
|
||||||
|
type: "message",
|
||||||
|
message: { role: "user", content: [{ type: "text", text: largeText }] },
|
||||||
|
};
|
||||||
|
await fs.writeFile(
|
||||||
|
sessionFile,
|
||||||
|
[JSON.stringify(header), JSON.stringify(userEntry)].join("\n") + "\n",
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await checkProactiveCompaction({
|
||||||
|
sessionFile,
|
||||||
|
contextTokens: 30_000, // Small context to trigger compaction
|
||||||
|
promptTokenEstimate: 1000,
|
||||||
|
});
|
||||||
|
expect(result.shouldCompact).toBe(true);
|
||||||
|
expect(result.reason).toBe("exceeded");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles file with no valid message entries gracefully", async () => {
|
||||||
|
const sessionFile = path.join(tmpDir, "no-messages-session.jsonl");
|
||||||
|
// File exists but has no message-type entries
|
||||||
|
await fs.writeFile(
|
||||||
|
sessionFile,
|
||||||
|
[
|
||||||
|
JSON.stringify({ v: 1, id: "test", cwd: tmpDir }),
|
||||||
|
JSON.stringify({ type: "other", data: "something" }),
|
||||||
|
].join("\n") + "\n",
|
||||||
|
);
|
||||||
|
|
||||||
|
const result = await checkProactiveCompaction({
|
||||||
|
sessionFile,
|
||||||
|
contextTokens: 200_000,
|
||||||
|
});
|
||||||
|
expect(result.shouldCompact).toBe(false);
|
||||||
|
expect(result.reason).toBe("empty_session");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
137
src/agents/pi-embedded-runner/proactive-compaction.ts
Normal file
137
src/agents/pi-embedded-runner/proactive-compaction.ts
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||||
|
|
||||||
|
import type { MoltbotConfig } from "../../config/config.js";
|
||||||
|
import { estimateMessagesTokens, SAFETY_MARGIN } from "../compaction.js";
|
||||||
|
import { resolveCompactionReserveTokensFloor } from "../pi-settings.js";
|
||||||
|
import { log } from "./logger.js";
|
||||||
|
|
||||||
|
export const DEFAULT_PROACTIVE_THRESHOLD_RATIO = 0.85;
|
||||||
|
|
||||||
|
export type ProactiveCompactionCheckResult = {
|
||||||
|
shouldCompact: boolean;
|
||||||
|
estimatedTokens: number;
|
||||||
|
threshold: number;
|
||||||
|
reason?: "no_session_file" | "empty_session" | "check_failed" | "below_threshold" | "exceeded";
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resolve the token threshold at which proactive compaction should trigger.
|
||||||
|
* Returns min(ratio-based threshold, reserve-based threshold).
|
||||||
|
*/
|
||||||
|
export function resolveProactiveThreshold(params: {
|
||||||
|
contextTokens: number;
|
||||||
|
config?: MoltbotConfig;
|
||||||
|
}): number {
|
||||||
|
const { contextTokens, config } = params;
|
||||||
|
const reserveTokensFloor = resolveCompactionReserveTokensFloor(config);
|
||||||
|
const thresholdRatio =
|
||||||
|
config?.agents?.defaults?.compaction?.proactiveThresholdRatio ??
|
||||||
|
DEFAULT_PROACTIVE_THRESHOLD_RATIO;
|
||||||
|
|
||||||
|
// Ratio-based: e.g., 85% of context
|
||||||
|
const ratioThreshold = Math.floor(contextTokens * thresholdRatio);
|
||||||
|
|
||||||
|
// Reserve-based: context minus reserve, with safety margin for estimation inaccuracy
|
||||||
|
const reserveThreshold = Math.floor((contextTokens - reserveTokensFloor) / SAFETY_MARGIN);
|
||||||
|
|
||||||
|
return Math.max(0, Math.min(ratioThreshold, reserveThreshold));
|
||||||
|
}
|
||||||
|
|
||||||
|
type SessionEntry = {
|
||||||
|
type?: string;
|
||||||
|
message?: AgentMessage;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read messages from a session JSONL file.
|
||||||
|
* Returns an array of AgentMessages extracted from "message" type entries.
|
||||||
|
*/
|
||||||
|
async function readSessionMessages(sessionFile: string): Promise<AgentMessage[]> {
|
||||||
|
const raw = await fs.readFile(sessionFile, "utf-8");
|
||||||
|
const lines = raw.split(/\r?\n/).filter(Boolean);
|
||||||
|
|
||||||
|
const messages: AgentMessage[] = [];
|
||||||
|
for (const line of lines) {
|
||||||
|
try {
|
||||||
|
const entry = JSON.parse(line) as SessionEntry;
|
||||||
|
if (entry.type === "message" && entry.message) {
|
||||||
|
messages.push(entry.message);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Skip malformed lines
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if proactive compaction should run before sending a request.
|
||||||
|
* Reads the session file to estimate current tokens and compares against threshold.
|
||||||
|
*/
|
||||||
|
export async function checkProactiveCompaction(params: {
|
||||||
|
sessionFile: string;
|
||||||
|
contextTokens: number;
|
||||||
|
config?: MoltbotConfig;
|
||||||
|
promptTokenEstimate?: number;
|
||||||
|
}): Promise<ProactiveCompactionCheckResult> {
|
||||||
|
const threshold = resolveProactiveThreshold({
|
||||||
|
contextTokens: params.contextTokens,
|
||||||
|
config: params.config,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check if session file exists
|
||||||
|
try {
|
||||||
|
await fs.access(params.sessionFile);
|
||||||
|
} catch {
|
||||||
|
return {
|
||||||
|
shouldCompact: false,
|
||||||
|
estimatedTokens: 0,
|
||||||
|
threshold,
|
||||||
|
reason: "no_session_file",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const messages = await readSessionMessages(params.sessionFile);
|
||||||
|
|
||||||
|
if (messages.length === 0) {
|
||||||
|
return {
|
||||||
|
shouldCompact: false,
|
||||||
|
estimatedTokens: 0,
|
||||||
|
threshold,
|
||||||
|
reason: "empty_session",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Estimate tokens from existing messages
|
||||||
|
const existingTokens = estimateMessagesTokens(messages);
|
||||||
|
|
||||||
|
// Add estimate for the new prompt (default ~500 tokens if not provided)
|
||||||
|
const promptEstimate = params.promptTokenEstimate ?? 500;
|
||||||
|
|
||||||
|
// Apply safety margin to account for estimation inaccuracy
|
||||||
|
const estimatedTotalTokens = Math.floor((existingTokens + promptEstimate) * SAFETY_MARGIN);
|
||||||
|
|
||||||
|
const shouldCompact = estimatedTotalTokens >= threshold;
|
||||||
|
|
||||||
|
return {
|
||||||
|
shouldCompact,
|
||||||
|
estimatedTokens: estimatedTotalTokens,
|
||||||
|
threshold,
|
||||||
|
reason: shouldCompact ? "exceeded" : "below_threshold",
|
||||||
|
};
|
||||||
|
} catch (err) {
|
||||||
|
log.debug(
|
||||||
|
`proactive compaction check failed (proceeding without): ${
|
||||||
|
err instanceof Error ? err.message : String(err)
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
return {
|
||||||
|
shouldCompact: false,
|
||||||
|
estimatedTokens: 0,
|
||||||
|
threshold,
|
||||||
|
reason: "check_failed",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -46,6 +46,7 @@ import { normalizeUsage, type UsageLike } from "../usage.js";
|
|||||||
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
||||||
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||||
import { log } from "./logger.js";
|
import { log } from "./logger.js";
|
||||||
|
import { checkProactiveCompaction } from "./proactive-compaction.js";
|
||||||
import { resolveModel } from "./model.js";
|
import { resolveModel } from "./model.js";
|
||||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||||
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
||||||
@ -293,9 +294,63 @@ export async function runEmbeddedPiAgent(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let overflowCompactionAttempted = false;
|
let overflowCompactionAttempted = false;
|
||||||
|
let proactiveCompactionAttempted = false;
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
attemptedThinking.add(thinkLevel);
|
attemptedThinking.add(thinkLevel);
|
||||||
|
|
||||||
|
// Proactive compaction: check before sending if near context limit
|
||||||
|
const proactiveEnabled =
|
||||||
|
params.config?.agents?.defaults?.compaction?.proactiveEnabled ?? true;
|
||||||
|
if (proactiveEnabled && !overflowCompactionAttempted && !proactiveCompactionAttempted) {
|
||||||
|
const proactiveCheck = await checkProactiveCompaction({
|
||||||
|
sessionFile: params.sessionFile,
|
||||||
|
contextTokens: ctxInfo.tokens,
|
||||||
|
config: params.config,
|
||||||
|
promptTokenEstimate: Math.ceil(params.prompt.length / 4),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (proactiveCheck.shouldCompact) {
|
||||||
|
log.info(
|
||||||
|
`proactive compaction triggered: estimated=${proactiveCheck.estimatedTokens} ` +
|
||||||
|
`threshold=${proactiveCheck.threshold} for ${provider}/${modelId}`,
|
||||||
|
);
|
||||||
|
proactiveCompactionAttempted = true;
|
||||||
|
|
||||||
|
const compactResult = await compactEmbeddedPiSessionDirect({
|
||||||
|
sessionId: params.sessionId,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
messageChannel: params.messageChannel,
|
||||||
|
messageProvider: params.messageProvider,
|
||||||
|
agentAccountId: params.agentAccountId,
|
||||||
|
authProfileId: lastProfileId,
|
||||||
|
sessionFile: params.sessionFile,
|
||||||
|
workspaceDir: params.workspaceDir,
|
||||||
|
agentDir,
|
||||||
|
config: params.config,
|
||||||
|
skillsSnapshot: params.skillsSnapshot,
|
||||||
|
provider,
|
||||||
|
model: modelId,
|
||||||
|
thinkLevel,
|
||||||
|
reasoningLevel: params.reasoningLevel,
|
||||||
|
bashElevated: params.bashElevated,
|
||||||
|
extraSystemPrompt: params.extraSystemPrompt,
|
||||||
|
ownerNumbers: params.ownerNumbers,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (compactResult.compacted) {
|
||||||
|
log.info(
|
||||||
|
`proactive compaction succeeded: tokensBefore=${compactResult.result?.tokensBefore} ` +
|
||||||
|
`tokensAfter=${compactResult.result?.tokensAfter}`,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
log.warn(
|
||||||
|
`proactive compaction skipped: ${compactResult.reason ?? "nothing to compact"}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
||||||
|
|
||||||
const prompt =
|
const prompt =
|
||||||
|
|||||||
@ -248,6 +248,10 @@ export type AgentCompactionConfig = {
|
|||||||
maxHistoryShare?: number;
|
maxHistoryShare?: number;
|
||||||
/** Pre-compaction memory flush (agentic turn). Default: enabled. */
|
/** Pre-compaction memory flush (agentic turn). Default: enabled. */
|
||||||
memoryFlush?: AgentCompactionMemoryFlushConfig;
|
memoryFlush?: AgentCompactionMemoryFlushConfig;
|
||||||
|
/** Enable proactive compaction before sending requests (default: true). */
|
||||||
|
proactiveEnabled?: boolean;
|
||||||
|
/** Threshold ratio of context window to trigger proactive compaction (0.5–0.95, default 0.85). */
|
||||||
|
proactiveThresholdRatio?: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type AgentCompactionMemoryFlushConfig = {
|
export type AgentCompactionMemoryFlushConfig = {
|
||||||
|
|||||||
@ -100,6 +100,8 @@ export const AgentDefaultsSchema = z
|
|||||||
})
|
})
|
||||||
.strict()
|
.strict()
|
||||||
.optional(),
|
.optional(),
|
||||||
|
proactiveEnabled: z.boolean().optional(),
|
||||||
|
proactiveThresholdRatio: z.number().min(0.5).max(0.95).optional(),
|
||||||
})
|
})
|
||||||
.strict()
|
.strict()
|
||||||
.optional(),
|
.optional(),
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user