Pi runner: avoid global lock for compaction

Co-authored-by: kuna <47035140+1kuna@users.noreply.github.com>
This commit is contained in:
Alyx 2026-01-29 21:08:45 -05:00 committed by 1alyx
parent 4583f88626
commit 9776fd3661
8 changed files with 552 additions and 521 deletions

View File

@ -100,6 +100,7 @@ export type CompactEmbeddedPiSessionParams = {
enqueue?: typeof enqueueCommand; enqueue?: typeof enqueueCommand;
extraSystemPrompt?: string; extraSystemPrompt?: string;
ownerNumbers?: string[]; ownerNumbers?: string[];
skipSkillEnvOverrides?: boolean;
}; };
/** /**
@ -110,7 +111,6 @@ export async function compactEmbeddedPiSessionDirect(
params: CompactEmbeddedPiSessionParams, params: CompactEmbeddedPiSessionParams,
): Promise<EmbeddedPiCompactResult> { ): Promise<EmbeddedPiCompactResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir); const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; const provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER;
const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL;
@ -179,13 +179,14 @@ export async function compactEmbeddedPiSessionDirect(
cwd: effectiveWorkspace, cwd: effectiveWorkspace,
}); });
const shouldApplySkillEnvOverrides = !params.skipSkillEnvOverrides;
let restoreSkillEnv: (() => void) | undefined; let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try { try {
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills; const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(effectiveWorkspace) ? loadWorkspaceSkillEntries(effectiveWorkspace)
: []; : [];
if (shouldApplySkillEnvOverrides) {
restoreSkillEnv = params.skillsSnapshot restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({ ? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot, snapshot: params.skillsSnapshot,
@ -195,6 +196,7 @@ export async function compactEmbeddedPiSessionDirect(
skills: skillEntries ?? [], skills: skillEntries ?? [],
config: params.config, config: params.config,
}); });
}
const skillsPrompt = resolveSkillsPromptForRun({ const skillsPrompt = resolveSkillsPromptForRun({
skillsSnapshot: params.skillsSnapshot, skillsSnapshot: params.skillsSnapshot,
entries: shouldLoadSkillEntries ? skillEntries : undefined, entries: shouldLoadSkillEntries ? skillEntries : undefined,
@ -318,7 +320,7 @@ export async function compactEmbeddedPiSessionDirect(
const docsPath = await resolveMoltbotDocsPath({ const docsPath = await resolveMoltbotDocsPath({
workspaceDir: effectiveWorkspace, workspaceDir: effectiveWorkspace,
argv1: process.argv[1], argv1: process.argv[1],
cwd: process.cwd(), cwd: effectiveWorkspace,
moduleUrl: import.meta.url, moduleUrl: import.meta.url,
}); });
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined; const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
@ -466,7 +468,6 @@ export async function compactEmbeddedPiSessionDirect(
}; };
} finally { } finally {
restoreSkillEnv?.(); restoreSkillEnv?.();
process.chdir(prevCwd);
} }
} }

View File

@ -168,6 +168,7 @@ function makeAttemptResult(
didSendViaMessagingTool: false, didSendViaMessagingTool: false,
messagingToolSentTexts: [], messagingToolSentTexts: [],
messagingToolSentTargets: [], messagingToolSentTargets: [],
autoCompactionAttempts: 0,
cloudCodeAssistFormatError: false, cloudCodeAssistFormatError: false,
...overrides, ...overrides,
}; };
@ -220,6 +221,20 @@ describe("overflow compaction in run loop", () => {
expect(result.meta.error).toBeUndefined(); expect(result.meta.error).toBeUndefined();
}); });
it("skips manual compaction when auto-compaction already ran", async () => {
const overflowError = new Error("request_too_large: Request size exceeds model context window");
mockedRunEmbeddedAttempt.mockResolvedValueOnce(
makeAttemptResult({ promptError: overflowError, autoCompactionAttempts: 1 }),
);
const result = await runEmbeddedPiAgent(baseParams);
expect(mockedCompactDirect).not.toHaveBeenCalled();
expect(mockedRunEmbeddedAttempt).toHaveBeenCalledTimes(1);
expect(result.meta.error?.kind).toBe("context_overflow");
});
it("returns error if compaction fails", async () => { it("returns error if compaction fails", async () => {
const overflowError = new Error("request_too_large: Request size exceeds model context window"); const overflowError = new Error("request_too_large: Request size exceeds model context window");

View File

@ -86,8 +86,6 @@ export async function runEmbeddedPiAgent(
: "markdown"); : "markdown");
const isProbeSession = params.sessionId?.startsWith("probe-") ?? false; const isProbeSession = params.sessionId?.startsWith("probe-") ?? false;
return enqueueSession(() =>
enqueueGlobal(async () => {
const started = Date.now(); const started = Date.now();
const resolvedWorkspace = resolveUserPath(params.workspaceDir); const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd(); const prevCwd = process.cwd();
@ -95,8 +93,7 @@ export async function runEmbeddedPiAgent(
const provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; const provider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER;
const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; const modelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL;
const agentDir = params.agentDir ?? resolveMoltbotAgentDir(); const agentDir = params.agentDir ?? resolveMoltbotAgentDir();
const fallbackConfigured = const fallbackConfigured = (params.config?.agents?.defaults?.model?.fallbacks?.length ?? 0) > 0;
(params.config?.agents?.defaults?.model?.fallbacks?.length ?? 0) > 0;
await ensureMoltbotModelsJson(params.config, agentDir); await ensureMoltbotModelsJson(params.config, agentDir);
const { model, error, authStorage, modelRegistry } = resolveModel( const { model, error, authStorage, modelRegistry } = resolveModel(
@ -229,8 +226,7 @@ export async function runEmbeddedPiAgent(
return; return;
} }
if (model.provider === "github-copilot") { if (model.provider === "github-copilot") {
const { resolveCopilotApiToken } = const { resolveCopilotApiToken } = await import("../../providers/github-copilot-token.js");
await import("../../providers/github-copilot-token.js");
const copilotToken = await resolveCopilotApiToken({ const copilotToken = await resolveCopilotApiToken({
githubToken: apiKeyInfo.apiKey, githubToken: apiKeyInfo.apiKey,
}); });
@ -267,11 +263,7 @@ export async function runEmbeddedPiAgent(
try { try {
while (profileIndex < profileCandidates.length) { while (profileIndex < profileCandidates.length) {
const candidate = profileCandidates[profileIndex]; const candidate = profileCandidates[profileIndex];
if ( if (candidate && candidate !== lockedProfileId && isProfileInCooldown(authStore, candidate)) {
candidate &&
candidate !== lockedProfileId &&
isProfileInCooldown(authStore, candidate)
) {
profileIndex += 1; profileIndex += 1;
continue; continue;
} }
@ -301,7 +293,9 @@ export async function runEmbeddedPiAgent(
const prompt = const prompt =
provider === "anthropic" ? scrubAnthropicRefusalMagic(params.prompt) : params.prompt; provider === "anthropic" ? scrubAnthropicRefusalMagic(params.prompt) : params.prompt;
const attempt = await runEmbeddedAttempt({ const attempt = await enqueueSession(() =>
enqueueGlobal(async () =>
runEmbeddedAttempt({
sessionId: params.sessionId, sessionId: params.sessionId,
sessionKey: params.sessionKey, sessionKey: params.sessionKey,
messageChannel: params.messageChannel, messageChannel: params.messageChannel,
@ -354,21 +348,35 @@ export async function runEmbeddedPiAgent(
streamParams: params.streamParams, streamParams: params.streamParams,
ownerNumbers: params.ownerNumbers, ownerNumbers: params.ownerNumbers,
enforceFinalTag: params.enforceFinalTag, enforceFinalTag: params.enforceFinalTag,
}); }),
),
);
const { aborted, promptError, timedOut, sessionIdUsed, lastAssistant } = attempt; const {
aborted,
promptError,
timedOut,
sessionIdUsed,
lastAssistant,
autoCompactionAttempts,
} = attempt;
if (promptError && !aborted) { if (promptError && !aborted) {
const errorText = describeUnknownError(promptError); const errorText = describeUnknownError(promptError);
if (isContextOverflowError(errorText)) { if (isContextOverflowError(errorText)) {
const isCompactionFailure = isCompactionFailureError(errorText); const isCompactionFailure = isCompactionFailureError(errorText);
// Attempt auto-compaction on context overflow (not compaction_failure) // Attempt auto-compaction on context overflow (not compaction_failure)
if (!isCompactionFailure && !overflowCompactionAttempted) { if (
!isCompactionFailure &&
!overflowCompactionAttempted &&
autoCompactionAttempts === 0
) {
log.warn( log.warn(
`context overflow detected; attempting auto-compaction for ${provider}/${modelId}`, `context overflow detected; attempting auto-compaction for ${provider}/${modelId}`,
); );
overflowCompactionAttempted = true; overflowCompactionAttempted = true;
const compactResult = await compactEmbeddedPiSessionDirect({ const compactResult = await enqueueSession(() =>
compactEmbeddedPiSessionDirect({
sessionId: params.sessionId, sessionId: params.sessionId,
sessionKey: params.sessionKey, sessionKey: params.sessionKey,
messageChannel: params.messageChannel, messageChannel: params.messageChannel,
@ -387,7 +395,9 @@ export async function runEmbeddedPiAgent(
bashElevated: params.bashElevated, bashElevated: params.bashElevated,
extraSystemPrompt: params.extraSystemPrompt, extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers, ownerNumbers: params.ownerNumbers,
}); skipSkillEnvOverrides: true,
}),
);
if (compactResult.compacted) { if (compactResult.compacted) {
log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`); log.info(`auto-compaction succeeded for ${provider}/${modelId}; retrying prompt`);
continue; continue;
@ -674,6 +684,4 @@ export async function runEmbeddedPiAgent(
} finally { } finally {
process.chdir(prevCwd); process.chdir(prevCwd);
} }
}),
);
} }

View File

@ -619,6 +619,7 @@ export async function runEmbeddedAttempt(
toolMetas, toolMetas,
unsubscribe, unsubscribe,
waitForCompactionRetry, waitForCompactionRetry,
getCompactionAttempts,
getMessagingToolSentTexts, getMessagingToolSentTexts,
getMessagingToolSentTargets, getMessagingToolSentTargets,
didSendViaMessagingTool, didSendViaMessagingTool,
@ -865,6 +866,7 @@ export async function runEmbeddedAttempt(
didSendViaMessagingTool: didSendViaMessagingTool(), didSendViaMessagingTool: didSendViaMessagingTool(),
messagingToolSentTexts: getMessagingToolSentTexts(), messagingToolSentTexts: getMessagingToolSentTexts(),
messagingToolSentTargets: getMessagingToolSentTargets(), messagingToolSentTargets: getMessagingToolSentTargets(),
autoCompactionAttempts: getCompactionAttempts(),
cloudCodeAssistFormatError: Boolean( cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage), lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
), ),

View File

@ -102,6 +102,7 @@ export type EmbeddedRunAttemptResult = {
didSendViaMessagingTool: boolean; didSendViaMessagingTool: boolean;
messagingToolSentTexts: string[]; messagingToolSentTexts: string[];
messagingToolSentTargets: MessagingToolSend[]; messagingToolSentTargets: MessagingToolSend[];
autoCompactionAttempts: number;
cloudCodeAssistFormatError: boolean; cloudCodeAssistFormatError: boolean;
/** Client tool call detected (OpenResponses hosted tools). */ /** Client tool call detected (OpenResponses hosted tools). */
clientToolCall?: { name: string; params: Record<string, unknown> }; clientToolCall?: { name: string; params: Record<string, unknown> };

View File

@ -22,6 +22,7 @@ export function handleAgentStart(ctx: EmbeddedPiSubscribeContext) {
export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) { export function handleAutoCompactionStart(ctx: EmbeddedPiSubscribeContext) {
ctx.state.compactionInFlight = true; ctx.state.compactionInFlight = true;
ctx.state.compactionAttempts += 1;
ctx.ensureCompactionPromise(); ctx.ensureCompactionPromise();
ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`); ctx.log.debug(`embedded run compaction start: runId=${ctx.params.runId}`);
emitAgentEvent({ emitAgentEvent({

View File

@ -49,6 +49,7 @@ export type EmbeddedPiSubscribeState = {
lastReasoningSent?: string; lastReasoningSent?: string;
compactionInFlight: boolean; compactionInFlight: boolean;
compactionAttempts: number;
pendingCompactionRetry: number; pendingCompactionRetry: number;
compactionRetryResolve?: () => void; compactionRetryResolve?: () => void;
compactionRetryPromise: Promise<void> | null; compactionRetryPromise: Promise<void> | null;

View File

@ -57,6 +57,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
suppressBlockChunks: false, // Avoid late chunk inserts after final text merge. suppressBlockChunks: false, // Avoid late chunk inserts after final text merge.
lastReasoningSent: undefined, lastReasoningSent: undefined,
compactionInFlight: false, compactionInFlight: false,
compactionAttempts: 0,
pendingCompactionRetry: 0, pendingCompactionRetry: 0,
compactionRetryResolve: undefined, compactionRetryResolve: undefined,
compactionRetryPromise: null, compactionRetryPromise: null,
@ -472,6 +473,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
toolMetas, toolMetas,
unsubscribe, unsubscribe,
isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0, isCompacting: () => state.compactionInFlight || state.pendingCompactionRetry > 0,
getCompactionAttempts: () => state.compactionAttempts,
getMessagingToolSentTexts: () => messagingToolSentTexts.slice(), getMessagingToolSentTexts: () => messagingToolSentTexts.slice(),
getMessagingToolSentTargets: () => messagingToolSentTargets.slice(), getMessagingToolSentTargets: () => messagingToolSentTargets.slice(),
// Returns true if any messaging tool successfully sent a message. // Returns true if any messaging tool successfully sent a message.