Merge f28dacc11b into 6af205a13a
This commit is contained in:
commit
077166f9aa
186
src/agents/pi-embedded-runner/proactive-compaction.test.ts
Normal file
186
src/agents/pi-embedded-runner/proactive-compaction.test.ts
Normal file
@ -0,0 +1,186 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import os from "node:os";
|
||||
import { describe, expect, it, vi, beforeEach, afterEach } from "vitest";
|
||||
|
||||
import { checkProactiveCompaction, resolveProactiveThreshold } from "./proactive-compaction.js";
|
||||
|
||||
vi.mock("./logger.js", () => ({
|
||||
log: {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
describe("proactive-compaction", () => {
|
||||
describe("resolveProactiveThreshold", () => {
|
||||
it("returns 85% of context by default", () => {
|
||||
const threshold = resolveProactiveThreshold({
|
||||
contextTokens: 200_000,
|
||||
config: undefined,
|
||||
});
|
||||
// 200k * 0.85 = 170k, but reserve-based may be lower
|
||||
expect(threshold).toBeLessThanOrEqual(170_000);
|
||||
expect(threshold).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("respects custom proactiveThresholdRatio config", () => {
|
||||
const threshold = resolveProactiveThreshold({
|
||||
contextTokens: 200_000,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
proactiveThresholdRatio: 0.7,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any,
|
||||
});
|
||||
// Should be min(140k ratio, reserve-based)
|
||||
expect(threshold).toBeLessThanOrEqual(140_000);
|
||||
});
|
||||
|
||||
it("accounts for reserveTokensFloor in threshold", () => {
|
||||
const threshold = resolveProactiveThreshold({
|
||||
contextTokens: 50_000,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
reserveTokensFloor: 25_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any,
|
||||
});
|
||||
// (50000 - 25000) / 1.2 = ~20833
|
||||
// vs 50000 * 0.85 = 42500
|
||||
// min = ~20833
|
||||
expect(threshold).toBeLessThanOrEqual(25_000);
|
||||
});
|
||||
|
||||
it("returns 0 for very small context windows", () => {
|
||||
const threshold = resolveProactiveThreshold({
|
||||
contextTokens: 1_000,
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
compaction: {
|
||||
reserveTokensFloor: 20_000,
|
||||
},
|
||||
},
|
||||
},
|
||||
} as any,
|
||||
});
|
||||
expect(threshold).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("checkProactiveCompaction", () => {
|
||||
let tmpDir: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "proactive-compaction-test-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("returns shouldCompact=false for missing session file", async () => {
|
||||
const result = await checkProactiveCompaction({
|
||||
sessionFile: path.join(tmpDir, "nonexistent.jsonl"),
|
||||
contextTokens: 200_000,
|
||||
});
|
||||
expect(result.shouldCompact).toBe(false);
|
||||
expect(result.reason).toBe("no_session_file");
|
||||
});
|
||||
|
||||
it("returns shouldCompact=false for empty session", async () => {
|
||||
const sessionFile = path.join(tmpDir, "empty-session.jsonl");
|
||||
// Create empty session file with just header (no message entries)
|
||||
await fs.writeFile(sessionFile, JSON.stringify({ v: 1, id: "test", cwd: tmpDir }) + "\n");
|
||||
|
||||
const result = await checkProactiveCompaction({
|
||||
sessionFile,
|
||||
contextTokens: 200_000,
|
||||
});
|
||||
expect(result.shouldCompact).toBe(false);
|
||||
expect(result.reason).toBe("empty_session");
|
||||
});
|
||||
|
||||
it("returns shouldCompact=false when tokens below threshold", async () => {
|
||||
const sessionFile = path.join(tmpDir, "small-session.jsonl");
|
||||
// Create session with small messages using correct JSONL format (type: "message")
|
||||
const header = { v: 1, id: "test", cwd: tmpDir };
|
||||
const userEntry = {
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: "Hello" }] },
|
||||
};
|
||||
const assistantEntry = {
|
||||
type: "message",
|
||||
message: { role: "assistant", content: [{ type: "text", text: "Hi there!" }] },
|
||||
};
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[JSON.stringify(header), JSON.stringify(userEntry), JSON.stringify(assistantEntry)].join(
|
||||
"\n",
|
||||
) + "\n",
|
||||
);
|
||||
|
||||
const result = await checkProactiveCompaction({
|
||||
sessionFile,
|
||||
contextTokens: 200_000,
|
||||
promptTokenEstimate: 100,
|
||||
});
|
||||
expect(result.shouldCompact).toBe(false);
|
||||
expect(result.reason).toBe("below_threshold");
|
||||
expect(result.estimatedTokens).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it("returns shouldCompact=true when tokens exceed threshold", async () => {
|
||||
const sessionFile = path.join(tmpDir, "large-session.jsonl");
|
||||
// Create session with a very large message
|
||||
const header = { v: 1, id: "test", cwd: tmpDir };
|
||||
const largeText = "x".repeat(100_000); // ~25k tokens at 4 chars/token
|
||||
const userEntry = {
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: largeText }] },
|
||||
};
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[JSON.stringify(header), JSON.stringify(userEntry)].join("\n") + "\n",
|
||||
);
|
||||
|
||||
const result = await checkProactiveCompaction({
|
||||
sessionFile,
|
||||
contextTokens: 30_000, // Small context to trigger compaction
|
||||
promptTokenEstimate: 1000,
|
||||
});
|
||||
expect(result.shouldCompact).toBe(true);
|
||||
expect(result.reason).toBe("exceeded");
|
||||
});
|
||||
|
||||
it("handles file with no valid message entries gracefully", async () => {
|
||||
const sessionFile = path.join(tmpDir, "no-messages-session.jsonl");
|
||||
// File exists but has no message-type entries
|
||||
await fs.writeFile(
|
||||
sessionFile,
|
||||
[
|
||||
JSON.stringify({ v: 1, id: "test", cwd: tmpDir }),
|
||||
JSON.stringify({ type: "other", data: "something" }),
|
||||
].join("\n") + "\n",
|
||||
);
|
||||
|
||||
const result = await checkProactiveCompaction({
|
||||
sessionFile,
|
||||
contextTokens: 200_000,
|
||||
});
|
||||
expect(result.shouldCompact).toBe(false);
|
||||
expect(result.reason).toBe("empty_session");
|
||||
});
|
||||
});
|
||||
});
|
||||
137
src/agents/pi-embedded-runner/proactive-compaction.ts
Normal file
137
src/agents/pi-embedded-runner/proactive-compaction.ts
Normal file
@ -0,0 +1,137 @@
|
||||
import fs from "node:fs/promises";
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
|
||||
import type { MoltbotConfig } from "../../config/config.js";
|
||||
import { estimateMessagesTokens, SAFETY_MARGIN } from "../compaction.js";
|
||||
import { resolveCompactionReserveTokensFloor } from "../pi-settings.js";
|
||||
import { log } from "./logger.js";
|
||||
|
||||
export const DEFAULT_PROACTIVE_THRESHOLD_RATIO = 0.85;
|
||||
|
||||
export type ProactiveCompactionCheckResult = {
|
||||
shouldCompact: boolean;
|
||||
estimatedTokens: number;
|
||||
threshold: number;
|
||||
reason?: "no_session_file" | "empty_session" | "check_failed" | "below_threshold" | "exceeded";
|
||||
};
|
||||
|
||||
/**
|
||||
* Resolve the token threshold at which proactive compaction should trigger.
|
||||
* Returns min(ratio-based threshold, reserve-based threshold).
|
||||
*/
|
||||
export function resolveProactiveThreshold(params: {
|
||||
contextTokens: number;
|
||||
config?: MoltbotConfig;
|
||||
}): number {
|
||||
const { contextTokens, config } = params;
|
||||
const reserveTokensFloor = resolveCompactionReserveTokensFloor(config);
|
||||
const thresholdRatio =
|
||||
config?.agents?.defaults?.compaction?.proactiveThresholdRatio ??
|
||||
DEFAULT_PROACTIVE_THRESHOLD_RATIO;
|
||||
|
||||
// Ratio-based: e.g., 85% of context
|
||||
const ratioThreshold = Math.floor(contextTokens * thresholdRatio);
|
||||
|
||||
// Reserve-based: context minus reserve, with safety margin for estimation inaccuracy
|
||||
const reserveThreshold = Math.floor((contextTokens - reserveTokensFloor) / SAFETY_MARGIN);
|
||||
|
||||
return Math.max(0, Math.min(ratioThreshold, reserveThreshold));
|
||||
}
|
||||
|
||||
type SessionEntry = {
|
||||
type?: string;
|
||||
message?: AgentMessage;
|
||||
};
|
||||
|
||||
/**
|
||||
* Read messages from a session JSONL file.
|
||||
* Returns an array of AgentMessages extracted from "message" type entries.
|
||||
*/
|
||||
async function readSessionMessages(sessionFile: string): Promise<AgentMessage[]> {
|
||||
const raw = await fs.readFile(sessionFile, "utf-8");
|
||||
const lines = raw.split(/\r?\n/).filter(Boolean);
|
||||
|
||||
const messages: AgentMessage[] = [];
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const entry = JSON.parse(line) as SessionEntry;
|
||||
if (entry.type === "message" && entry.message) {
|
||||
messages.push(entry.message);
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed lines
|
||||
}
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if proactive compaction should run before sending a request.
|
||||
* Reads the session file to estimate current tokens and compares against threshold.
|
||||
*/
|
||||
export async function checkProactiveCompaction(params: {
|
||||
sessionFile: string;
|
||||
contextTokens: number;
|
||||
config?: MoltbotConfig;
|
||||
promptTokenEstimate?: number;
|
||||
}): Promise<ProactiveCompactionCheckResult> {
|
||||
const threshold = resolveProactiveThreshold({
|
||||
contextTokens: params.contextTokens,
|
||||
config: params.config,
|
||||
});
|
||||
|
||||
// Check if session file exists
|
||||
try {
|
||||
await fs.access(params.sessionFile);
|
||||
} catch {
|
||||
return {
|
||||
shouldCompact: false,
|
||||
estimatedTokens: 0,
|
||||
threshold,
|
||||
reason: "no_session_file",
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
const messages = await readSessionMessages(params.sessionFile);
|
||||
|
||||
if (messages.length === 0) {
|
||||
return {
|
||||
shouldCompact: false,
|
||||
estimatedTokens: 0,
|
||||
threshold,
|
||||
reason: "empty_session",
|
||||
};
|
||||
}
|
||||
|
||||
// Estimate tokens from existing messages
|
||||
const existingTokens = estimateMessagesTokens(messages);
|
||||
|
||||
// Add estimate for the new prompt (default ~500 tokens if not provided)
|
||||
const promptEstimate = params.promptTokenEstimate ?? 500;
|
||||
|
||||
// Total estimated tokens (safety margin is already applied in threshold calculation)
|
||||
const estimatedTotalTokens = existingTokens + promptEstimate;
|
||||
|
||||
const shouldCompact = estimatedTotalTokens >= threshold;
|
||||
|
||||
return {
|
||||
shouldCompact,
|
||||
estimatedTokens: estimatedTotalTokens,
|
||||
threshold,
|
||||
reason: shouldCompact ? "exceeded" : "below_threshold",
|
||||
};
|
||||
} catch (err) {
|
||||
log.debug(
|
||||
`proactive compaction check failed (proceeding without): ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
return {
|
||||
shouldCompact: false,
|
||||
estimatedTokens: 0,
|
||||
threshold,
|
||||
reason: "check_failed",
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -46,6 +46,7 @@ import { normalizeUsage, type UsageLike } from "../usage.js";
|
||||
import { compactEmbeddedPiSessionDirect } from "./compact.js";
|
||||
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
|
||||
import { log } from "./logger.js";
|
||||
import { checkProactiveCompaction } from "./proactive-compaction.js";
|
||||
import { resolveModel } from "./model.js";
|
||||
import { runEmbeddedAttempt } from "./run/attempt.js";
|
||||
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
|
||||
@ -293,9 +294,63 @@ export async function runEmbeddedPiAgent(
|
||||
}
|
||||
|
||||
let overflowCompactionAttempted = false;
|
||||
let proactiveCompactionAttempted = false;
|
||||
try {
|
||||
while (true) {
|
||||
attemptedThinking.add(thinkLevel);
|
||||
|
||||
// Proactive compaction: check before sending if near context limit
|
||||
const proactiveEnabled =
|
||||
params.config?.agents?.defaults?.compaction?.proactiveEnabled ?? true;
|
||||
if (proactiveEnabled && !overflowCompactionAttempted && !proactiveCompactionAttempted) {
|
||||
const proactiveCheck = await checkProactiveCompaction({
|
||||
sessionFile: params.sessionFile,
|
||||
contextTokens: ctxInfo.tokens,
|
||||
config: params.config,
|
||||
promptTokenEstimate: Math.ceil(params.prompt.length / 4),
|
||||
});
|
||||
|
||||
if (proactiveCheck.shouldCompact) {
|
||||
log.info(
|
||||
`proactive compaction triggered: estimated=${proactiveCheck.estimatedTokens} ` +
|
||||
`threshold=${proactiveCheck.threshold} for ${provider}/${modelId}`,
|
||||
);
|
||||
proactiveCompactionAttempted = true;
|
||||
|
||||
const compactResult = await compactEmbeddedPiSessionDirect({
|
||||
sessionId: params.sessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
messageChannel: params.messageChannel,
|
||||
messageProvider: params.messageProvider,
|
||||
agentAccountId: params.agentAccountId,
|
||||
authProfileId: lastProfileId,
|
||||
sessionFile: params.sessionFile,
|
||||
workspaceDir: params.workspaceDir,
|
||||
agentDir,
|
||||
config: params.config,
|
||||
skillsSnapshot: params.skillsSnapshot,
|
||||
provider,
|
||||
model: modelId,
|
||||
thinkLevel,
|
||||
reasoningLevel: params.reasoningLevel,
|
||||
bashElevated: params.bashElevated,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
ownerNumbers: params.ownerNumbers,
|
||||
});
|
||||
|
||||
if (compactResult.compacted) {
|
||||
log.info(
|
||||
`proactive compaction succeeded: tokensBefore=${compactResult.result?.tokensBefore} ` +
|
||||
`tokensAfter=${compactResult.result?.tokensAfter}`,
|
||||
);
|
||||
} else {
|
||||
log.warn(
|
||||
`proactive compaction skipped: ${compactResult.reason ?? "nothing to compact"}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
||||
|
||||
const prompt =
|
||||
|
||||
@ -248,6 +248,10 @@ export type AgentCompactionConfig = {
|
||||
maxHistoryShare?: number;
|
||||
/** Pre-compaction memory flush (agentic turn). Default: enabled. */
|
||||
memoryFlush?: AgentCompactionMemoryFlushConfig;
|
||||
/** Enable proactive compaction before sending requests (default: true). */
|
||||
proactiveEnabled?: boolean;
|
||||
/** Threshold ratio of context window to trigger proactive compaction (0.5–0.95, default 0.85). */
|
||||
proactiveThresholdRatio?: number;
|
||||
};
|
||||
|
||||
export type AgentCompactionMemoryFlushConfig = {
|
||||
|
||||
@ -100,6 +100,8 @@ export const AgentDefaultsSchema = z
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
proactiveEnabled: z.boolean().optional(),
|
||||
proactiveThresholdRatio: z.number().min(0.5).max(0.95).optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user