openclaw/src/agents/pi-embedded-runner/run/attempt.ts
spiceoogway 480fd42b2e fix: repair tool_use/tool_result pairings after history truncation (fixes #4367)
The message processing pipeline had a synchronization bug where
limitHistoryTurns() truncated conversation history AFTER
repairToolUseResultPairing() had already fixed tool_use/tool_result
pairings. This could split assistant messages (with tool_use) from
their corresponding tool_result blocks, creating orphaned tool_result
blocks that the Anthropic API rejects.

This fix calls sanitizeToolUseResultPairing() AFTER limitHistoryTurns()
to repair any pairings broken by truncation, ensuring the transcript
remains valid before being sent to the LLM API.

Changes:
- Added import for sanitizeToolUseResultPairing from session-transcript-repair.js
- Call sanitizeToolUseResultPairing() on the limited message array
- Updated variable name from 'limited' to 'repaired' for clarity
2026-01-30 00:10:57 -05:00

888 lines
34 KiB
TypeScript

import fs from "node:fs/promises";
import os from "node:os";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AssistantMessage, ImageContent } from "@mariozechner/pi-ai";
import { streamSimple } from "@mariozechner/pi-ai";
import { createAgentSession, SessionManager, SettingsManager } from "@mariozechner/pi-coding-agent";
import { resolveHeartbeatPrompt } from "../../../auto-reply/heartbeat.js";
import {
listChannelSupportedActions,
resolveChannelMessageToolHints,
} from "../../channel-tools.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { getMachineDisplayName } from "../../../infra/machine-name.js";
import { resolveTelegramInlineButtonsScope } from "../../../telegram/inline-buttons.js";
import { resolveTelegramReactionLevel } from "../../../telegram/reaction-level.js";
import { resolveSignalReactionLevel } from "../../../signal/reaction-level.js";
import { normalizeMessageChannel } from "../../../utils/message-channel.js";
import { isReasoningTagProvider } from "../../../utils/provider-utils.js";
import { isSubagentSessionKey } from "../../../routing/session-key.js";
import { resolveUserPath } from "../../../utils.js";
import { createCacheTrace } from "../../cache-trace.js";
import { createAnthropicPayloadLogger } from "../../anthropic-payload-log.js";
import { resolveOpenClawAgentDir } from "../../agent-paths.js";
import { resolveSessionAgentIds } from "../../agent-scope.js";
import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../../bootstrap-files.js";
import { resolveOpenClawDocsPath } from "../../docs-path.js";
import { resolveModelAuthMode } from "../../model-auth.js";
import {
isCloudCodeAssistFormatError,
resolveBootstrapMaxChars,
validateAnthropicTurns,
validateGeminiTurns,
} from "../../pi-embedded-helpers.js";
import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js";
import {
ensurePiCompactionReserveTokens,
resolveCompactionReserveTokensFloor,
} from "../../pi-settings.js";
import { createOpenClawCodingTools } from "../../pi-tools.js";
import { resolveSandboxContext } from "../../sandbox.js";
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
import { resolveTranscriptPolicy } from "../../transcript-policy.js";
import { acquireSessionWriteLock } from "../../session-write-lock.js";
import {
applySkillEnvOverrides,
applySkillEnvOverridesFromSnapshot,
loadWorkspaceSkillEntries,
resolveSkillsPromptForRun,
} from "../../skills.js";
import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js";
import { buildSystemPromptReport } from "../../system-prompt-report.js";
import { resolveDefaultModelForAgent } from "../../model-selection.js";
import { sanitizeToolUseResultPairing } from "../../session-transcript-repair.js";
import { isAbortError } from "../abort.js";
import { buildEmbeddedExtensionPaths } from "../extensions.js";
import { applyExtraParamsToAgent } from "../extra-params.js";
import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js";
import {
logToolSchemasForGoogle,
sanitizeSessionHistory,
sanitizeToolsForGoogle,
} from "../google.js";
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
import { log } from "../logger.js";
import { buildModelAliasLines } from "../model.js";
import {
clearActiveEmbeddedRun,
type EmbeddedPiQueueHandle,
setActiveEmbeddedRun,
} from "../runs.js";
import { buildEmbeddedSandboxInfo } from "../sandbox-info.js";
import { prewarmSessionFile, trackSessionManagerAccess } from "../session-manager-cache.js";
import { prepareSessionManagerForRun } from "../session-manager-init.js";
import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "../system-prompt.js";
import { splitSdkTools } from "../tool-split.js";
import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js";
import { buildSystemPromptParams } from "../../system-prompt-params.js";
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
import { resolveSandboxRuntimeStatus } from "../../sandbox/runtime-status.js";
import { buildTtsSystemPromptHint } from "../../../tts/tts.js";
import { isTimeoutError } from "../../failover-error.js";
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
import { MAX_IMAGE_BYTES } from "../../../media/constants.js";
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
import { detectAndLoadPromptImages } from "./images.js";
export function injectHistoryImagesIntoMessages(
messages: AgentMessage[],
historyImagesByIndex: Map<number, ImageContent[]>,
): boolean {
if (historyImagesByIndex.size === 0) return false;
let didMutate = false;
for (const [msgIndex, images] of historyImagesByIndex) {
// Bounds check: ensure index is valid before accessing
if (msgIndex < 0 || msgIndex >= messages.length) continue;
const msg = messages[msgIndex];
if (msg && msg.role === "user") {
// Convert string content to array format if needed
if (typeof msg.content === "string") {
msg.content = [{ type: "text", text: msg.content }];
didMutate = true;
}
if (Array.isArray(msg.content)) {
// Check for existing image content to avoid duplicates across turns
const existingImageData = new Set(
msg.content
.filter(
(c): c is ImageContent =>
c != null &&
typeof c === "object" &&
c.type === "image" &&
typeof c.data === "string",
)
.map((c) => c.data),
);
for (const img of images) {
// Only add if this image isn't already in the message
if (!existingImageData.has(img.data)) {
msg.content.push(img);
didMutate = true;
}
}
}
}
}
return didMutate;
}
export async function runEmbeddedAttempt(
params: EmbeddedRunAttemptParams,
): Promise<EmbeddedRunAttemptResult> {
const resolvedWorkspace = resolveUserPath(params.workspaceDir);
const prevCwd = process.cwd();
const runAbortController = new AbortController();
log.debug(
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${params.provider} model=${params.modelId} thinking=${params.thinkLevel} messageChannel=${params.messageChannel ?? params.messageProvider ?? "unknown"}`,
);
await fs.mkdir(resolvedWorkspace, { recursive: true });
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
const sandbox = await resolveSandboxContext({
config: params.config,
sessionKey: sandboxSessionKey,
workspaceDir: resolvedWorkspace,
});
const effectiveWorkspace = sandbox?.enabled
? sandbox.workspaceAccess === "rw"
? resolvedWorkspace
: sandbox.workspaceDir
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
let restoreSkillEnv: (() => void) | undefined;
process.chdir(effectiveWorkspace);
try {
const shouldLoadSkillEntries = !params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
const skillEntries = shouldLoadSkillEntries
? loadWorkspaceSkillEntries(effectiveWorkspace)
: [];
restoreSkillEnv = params.skillsSnapshot
? applySkillEnvOverridesFromSnapshot({
snapshot: params.skillsSnapshot,
config: params.config,
})
: applySkillEnvOverrides({
skills: skillEntries ?? [],
config: params.config,
});
const skillsPrompt = resolveSkillsPromptForRun({
skillsSnapshot: params.skillsSnapshot,
entries: shouldLoadSkillEntries ? skillEntries : undefined,
config: params.config,
workspaceDir: effectiveWorkspace,
});
const sessionLabel = params.sessionKey ?? params.sessionId;
const { bootstrapFiles: hookAdjustedBootstrapFiles, contextFiles } =
await resolveBootstrapContextForRun({
workspaceDir: effectiveWorkspace,
config: params.config,
sessionKey: params.sessionKey,
sessionId: params.sessionId,
warn: makeBootstrapWarn({ sessionLabel, warn: (message) => log.warn(message) }),
});
const workspaceNotes = hookAdjustedBootstrapFiles.some(
(file) => file.name === DEFAULT_BOOTSTRAP_FILENAME && !file.missing,
)
? ["Reminder: commit your changes in this workspace after edits."]
: undefined;
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
// Check if the model supports native image input
const modelHasVision = params.model.input?.includes("image") ?? false;
const toolsRaw = params.disableTools
? []
: createOpenClawCodingTools({
exec: {
...params.execOverrides,
elevated: params.bashElevated,
},
sandbox,
messageProvider: params.messageChannel ?? params.messageProvider,
agentAccountId: params.agentAccountId,
messageTo: params.messageTo,
messageThreadId: params.messageThreadId,
groupId: params.groupId,
groupChannel: params.groupChannel,
groupSpace: params.groupSpace,
spawnedBy: params.spawnedBy,
senderId: params.senderId,
senderName: params.senderName,
senderUsername: params.senderUsername,
senderE164: params.senderE164,
sessionKey: params.sessionKey ?? params.sessionId,
agentDir,
workspaceDir: effectiveWorkspace,
config: params.config,
abortSignal: runAbortController.signal,
modelProvider: params.model.provider,
modelId: params.modelId,
modelAuthMode: resolveModelAuthMode(params.model.provider, params.config),
currentChannelId: params.currentChannelId,
currentThreadTs: params.currentThreadTs,
replyToMode: params.replyToMode,
hasRepliedRef: params.hasRepliedRef,
modelHasVision,
});
const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider: params.provider });
logToolSchemasForGoogle({ tools, provider: params.provider });
const machineName = await getMachineDisplayName();
const runtimeChannel = normalizeMessageChannel(params.messageChannel ?? params.messageProvider);
let runtimeCapabilities = runtimeChannel
? (resolveChannelCapabilities({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
}) ?? [])
: undefined;
if (runtimeChannel === "telegram" && params.config) {
const inlineButtonsScope = resolveTelegramInlineButtonsScope({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
if (inlineButtonsScope !== "off") {
if (!runtimeCapabilities) runtimeCapabilities = [];
if (
!runtimeCapabilities.some((cap) => String(cap).trim().toLowerCase() === "inlinebuttons")
) {
runtimeCapabilities.push("inlineButtons");
}
}
}
const reactionGuidance =
runtimeChannel && params.config
? (() => {
if (runtimeChannel === "telegram") {
const resolved = resolveTelegramReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Telegram" } : undefined;
}
if (runtimeChannel === "signal") {
const resolved = resolveSignalReactionLevel({
cfg: params.config,
accountId: params.agentAccountId ?? undefined,
});
const level = resolved.agentReactionGuidance;
return level ? { level, channel: "Signal" } : undefined;
}
return undefined;
})()
: undefined;
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
config: params.config,
});
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
const reasoningTagHint = isReasoningTagProvider(params.provider);
// Resolve channel-specific message actions for system prompt
const channelActions = runtimeChannel
? listChannelSupportedActions({
cfg: params.config,
channel: runtimeChannel,
})
: undefined;
const messageToolHints = runtimeChannel
? resolveChannelMessageToolHints({
cfg: params.config,
channel: runtimeChannel,
accountId: params.agentAccountId,
})
: undefined;
const defaultModelRef = resolveDefaultModelForAgent({
cfg: params.config ?? {},
agentId: sessionAgentId,
});
const defaultModelLabel = `${defaultModelRef.provider}/${defaultModelRef.model}`;
const { runtimeInfo, userTimezone, userTime, userTimeFormat } = buildSystemPromptParams({
config: params.config,
agentId: sessionAgentId,
workspaceDir: effectiveWorkspace,
cwd: process.cwd(),
runtime: {
host: machineName,
os: `${os.type()} ${os.release()}`,
arch: os.arch(),
node: process.version,
model: `${params.provider}/${params.modelId}`,
defaultModel: defaultModelLabel,
channel: runtimeChannel,
capabilities: runtimeCapabilities,
channelActions,
},
});
const isDefaultAgent = sessionAgentId === defaultAgentId;
const promptMode = isSubagentSessionKey(params.sessionKey) ? "minimal" : "full";
const docsPath = await resolveOpenClawDocsPath({
workspaceDir: effectiveWorkspace,
argv1: process.argv[1],
cwd: process.cwd(),
moduleUrl: import.meta.url,
});
const ttsHint = params.config ? buildTtsSystemPromptHint(params.config) : undefined;
const appendPrompt = buildEmbeddedSystemPrompt({
workspaceDir: effectiveWorkspace,
defaultThinkLevel: params.thinkLevel,
reasoningLevel: params.reasoningLevel ?? "off",
extraSystemPrompt: params.extraSystemPrompt,
ownerNumbers: params.ownerNumbers,
reasoningTagHint,
heartbeatPrompt: isDefaultAgent
? resolveHeartbeatPrompt(params.config?.agents?.defaults?.heartbeat?.prompt)
: undefined,
skillsPrompt,
docsPath: docsPath ?? undefined,
ttsHint,
workspaceNotes,
reactionGuidance,
promptMode,
runtimeInfo,
messageToolHints,
sandboxInfo,
tools,
modelAliasLines: buildModelAliasLines(params.config),
userTimezone,
userTime,
userTimeFormat,
contextFiles,
});
const systemPromptReport = buildSystemPromptReport({
source: "run",
generatedAt: Date.now(),
sessionId: params.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
model: params.modelId,
workspaceDir: effectiveWorkspace,
bootstrapMaxChars: resolveBootstrapMaxChars(params.config),
sandbox: (() => {
const runtime = resolveSandboxRuntimeStatus({
cfg: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
});
return { mode: runtime.mode, sandboxed: runtime.sandboxed };
})(),
systemPrompt: appendPrompt,
bootstrapFiles: hookAdjustedBootstrapFiles,
injectedFiles: contextFiles,
skillsPrompt,
tools,
});
const systemPrompt = createSystemPromptOverride(appendPrompt);
const sessionLock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
});
let sessionManager: ReturnType<typeof guardSessionManager> | undefined;
let session: Awaited<ReturnType<typeof createAgentSession>>["session"] | undefined;
try {
const hadSessionFile = await fs
.stat(params.sessionFile)
.then(() => true)
.catch(() => false);
const transcriptPolicy = resolveTranscriptPolicy({
modelApi: params.model?.api,
provider: params.provider,
modelId: params.modelId,
});
await prewarmSessionFile(params.sessionFile);
sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), {
agentId: sessionAgentId,
sessionKey: params.sessionKey,
allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults,
});
trackSessionManagerAccess(params.sessionFile);
await prepareSessionManagerForRun({
sessionManager,
sessionFile: params.sessionFile,
hadSessionFile,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
});
const settingsManager = SettingsManager.create(effectiveWorkspace, agentDir);
ensurePiCompactionReserveTokens({
settingsManager,
minReserveTokens: resolveCompactionReserveTokensFloor(params.config),
});
const additionalExtensionPaths = buildEmbeddedExtensionPaths({
cfg: params.config,
sessionManager,
provider: params.provider,
modelId: params.modelId,
model: params.model,
});
const { builtInTools, customTools } = splitSdkTools({
tools,
sandboxEnabled: !!sandbox?.enabled,
});
// Add client tools (OpenResponses hosted tools) to customTools
let clientToolCallDetected: { name: string; params: Record<string, unknown> } | null = null;
const clientToolDefs = params.clientTools
? toClientToolDefinitions(params.clientTools, (toolName, toolParams) => {
clientToolCallDetected = { name: toolName, params: toolParams };
})
: [];
const allCustomTools = [...customTools, ...clientToolDefs];
({ session } = await createAgentSession({
cwd: resolvedWorkspace,
agentDir,
authStorage: params.authStorage,
modelRegistry: params.modelRegistry,
model: params.model,
thinkingLevel: mapThinkingLevel(params.thinkLevel),
systemPrompt,
tools: builtInTools,
customTools: allCustomTools,
sessionManager,
settingsManager,
skills: [],
contextFiles: [],
additionalExtensionPaths,
}));
if (!session) {
throw new Error("Embedded agent session missing");
}
const activeSession = session;
const cacheTrace = createCacheTrace({
cfg: params.config,
env: process.env,
runId: params.runId,
sessionId: activeSession.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
modelId: params.modelId,
modelApi: params.model.api,
workspaceDir: params.workspaceDir,
});
const anthropicPayloadLogger = createAnthropicPayloadLogger({
env: process.env,
runId: params.runId,
sessionId: activeSession.sessionId,
sessionKey: params.sessionKey,
provider: params.provider,
modelId: params.modelId,
modelApi: params.model.api,
workspaceDir: params.workspaceDir,
});
// Force a stable streamFn reference so vitest can reliably mock @mariozechner/pi-ai.
activeSession.agent.streamFn = streamSimple;
applyExtraParamsToAgent(
activeSession.agent,
params.config,
params.provider,
params.modelId,
params.streamParams,
);
if (cacheTrace) {
cacheTrace.recordStage("session:loaded", {
messages: activeSession.messages,
system: systemPrompt,
note: "after session create",
});
activeSession.agent.streamFn = cacheTrace.wrapStreamFn(activeSession.agent.streamFn);
}
if (anthropicPayloadLogger) {
activeSession.agent.streamFn = anthropicPayloadLogger.wrapStreamFn(
activeSession.agent.streamFn,
);
}
try {
const prior = await sanitizeSessionHistory({
messages: activeSession.messages,
modelApi: params.model.api,
modelId: params.modelId,
provider: params.provider,
sessionManager,
sessionId: params.sessionId,
policy: transcriptPolicy,
});
cacheTrace?.recordStage("session:sanitized", { messages: prior });
const validatedGemini = transcriptPolicy.validateGeminiTurns
? validateGeminiTurns(prior)
: prior;
const validated = transcriptPolicy.validateAnthropicTurns
? validateAnthropicTurns(validatedGemini)
: validatedGemini;
const limited = limitHistoryTurns(
validated,
getDmHistoryLimitFromSessionKey(params.sessionKey, params.config),
);
// Fix: Repair tool_use/tool_result pairings AFTER truncation (issue #4367)
const repaired = sanitizeToolUseResultPairing(limited);
cacheTrace?.recordStage("session:limited", { messages: repaired });
if (repaired.length > 0) {
activeSession.agent.replaceMessages(repaired);
}
} catch (err) {
sessionManager.flushPendingToolResults?.();
activeSession.dispose();
throw err;
}
let aborted = Boolean(params.abortSignal?.aborted);
let timedOut = false;
const getAbortReason = (signal: AbortSignal): unknown =>
"reason" in signal ? (signal as { reason?: unknown }).reason : undefined;
const makeTimeoutAbortReason = (): Error => {
const err = new Error("request timed out");
err.name = "TimeoutError";
return err;
};
const makeAbortError = (signal: AbortSignal): Error => {
const reason = getAbortReason(signal);
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
err.name = "AbortError";
return err;
};
const abortRun = (isTimeout = false, reason?: unknown) => {
aborted = true;
if (isTimeout) timedOut = true;
if (isTimeout) {
runAbortController.abort(reason ?? makeTimeoutAbortReason());
} else {
runAbortController.abort(reason);
}
void activeSession.abort();
};
const abortable = <T>(promise: Promise<T>): Promise<T> => {
const signal = runAbortController.signal;
if (signal.aborted) {
return Promise.reject(makeAbortError(signal));
}
return new Promise<T>((resolve, reject) => {
const onAbort = () => {
signal.removeEventListener("abort", onAbort);
reject(makeAbortError(signal));
};
signal.addEventListener("abort", onAbort, { once: true });
promise.then(
(value) => {
signal.removeEventListener("abort", onAbort);
resolve(value);
},
(err) => {
signal.removeEventListener("abort", onAbort);
reject(err);
},
);
});
};
const subscription = subscribeEmbeddedPiSession({
session: activeSession,
runId: params.runId,
verboseLevel: params.verboseLevel,
reasoningMode: params.reasoningLevel ?? "off",
toolResultFormat: params.toolResultFormat,
shouldEmitToolResult: params.shouldEmitToolResult,
shouldEmitToolOutput: params.shouldEmitToolOutput,
onToolResult: params.onToolResult,
onReasoningStream: params.onReasoningStream,
onBlockReply: params.onBlockReply,
onBlockReplyFlush: params.onBlockReplyFlush,
blockReplyBreak: params.blockReplyBreak,
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onAgentEvent: params.onAgentEvent,
enforceFinalTag: params.enforceFinalTag,
});
const {
assistantTexts,
toolMetas,
unsubscribe,
waitForCompactionRetry,
getMessagingToolSentTexts,
getMessagingToolSentTargets,
didSendViaMessagingTool,
getLastToolError,
} = subscription;
const queueHandle: EmbeddedPiQueueHandle = {
queueMessage: async (text: string) => {
await activeSession.steer(text);
},
isStreaming: () => activeSession.isStreaming,
isCompacting: () => subscription.isCompacting(),
abort: abortRun,
};
setActiveEmbeddedRun(params.sessionId, queueHandle);
let abortWarnTimer: NodeJS.Timeout | undefined;
const isProbeSession = params.sessionId?.startsWith("probe-") ?? false;
const abortTimer = setTimeout(
() => {
if (!isProbeSession) {
log.warn(
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
);
}
abortRun(true);
if (!abortWarnTimer) {
abortWarnTimer = setTimeout(() => {
if (!activeSession.isStreaming) return;
if (!isProbeSession) {
log.warn(
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
);
}
}, 10_000);
}
},
Math.max(1, params.timeoutMs),
);
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = activeSession.sessionId;
const onAbort = () => {
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
const timeout = reason ? isTimeoutError(reason) : false;
abortRun(timeout, reason);
};
if (params.abortSignal) {
if (params.abortSignal.aborted) {
onAbort();
} else {
params.abortSignal.addEventListener("abort", onAbort, {
once: true,
});
}
}
// Get hook runner once for both before_agent_start and agent_end hooks
const hookRunner = getGlobalHookRunner();
let promptError: unknown = null;
try {
const promptStartedAt = Date.now();
// Run before_agent_start hooks to allow plugins to inject context
let effectivePrompt = params.prompt;
if (hookRunner?.hasHooks("before_agent_start")) {
try {
const hookResult = await hookRunner.runBeforeAgentStart(
{
prompt: params.prompt,
messages: activeSession.messages,
},
{
agentId: params.sessionKey?.split(":")[0] ?? "main",
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
);
if (hookResult?.prependContext) {
effectivePrompt = `${hookResult.prependContext}\n\n${params.prompt}`;
log.debug(
`hooks: prepended context to prompt (${hookResult.prependContext.length} chars)`,
);
}
} catch (hookErr) {
log.warn(`before_agent_start hook failed: ${String(hookErr)}`);
}
}
log.debug(`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`);
cacheTrace?.recordStage("prompt:before", {
prompt: effectivePrompt,
messages: activeSession.messages,
});
// Repair orphaned trailing user messages so new prompts don't violate role ordering.
const leafEntry = sessionManager.getLeafEntry();
if (leafEntry?.type === "message" && leafEntry.message.role === "user") {
if (leafEntry.parentId) {
sessionManager.branch(leafEntry.parentId);
} else {
sessionManager.resetLeaf();
}
const sessionContext = sessionManager.buildSessionContext();
activeSession.agent.replaceMessages(sessionContext.messages);
log.warn(
`Removed orphaned user message to prevent consecutive user turns. ` +
`runId=${params.runId} sessionId=${params.sessionId}`,
);
}
try {
// Detect and load images referenced in the prompt for vision-capable models.
// This eliminates the need for an explicit "view" tool call by injecting
// images directly into the prompt when the model supports it.
// Also scans conversation history to enable follow-up questions about earlier images.
const imageResult = await detectAndLoadPromptImages({
prompt: effectivePrompt,
workspaceDir: effectiveWorkspace,
model: params.model,
existingImages: params.images,
historyMessages: activeSession.messages,
maxBytes: MAX_IMAGE_BYTES,
// Enforce sandbox path restrictions when sandbox is enabled
sandboxRoot: sandbox?.enabled ? sandbox.workspaceDir : undefined,
});
// Inject history images into their original message positions.
// This ensures the model sees images in context (e.g., "compare to the first image").
const didMutate = injectHistoryImagesIntoMessages(
activeSession.messages,
imageResult.historyImagesByIndex,
);
if (didMutate) {
// Persist message mutations (e.g., injected history images) so we don't re-scan/reload.
activeSession.agent.replaceMessages(activeSession.messages);
}
cacheTrace?.recordStage("prompt:images", {
prompt: effectivePrompt,
messages: activeSession.messages,
note: `images: prompt=${imageResult.images.length} history=${imageResult.historyImagesByIndex.size}`,
});
const shouldTrackCacheTtl =
params.config?.agents?.defaults?.contextPruning?.mode === "cache-ttl" &&
isCacheTtlEligibleProvider(params.provider, params.modelId);
if (shouldTrackCacheTtl) {
appendCacheTtlTimestamp(sessionManager, {
timestamp: Date.now(),
provider: params.provider,
modelId: params.modelId,
});
}
// Only pass images option if there are actually images to pass
// This avoids potential issues with models that don't expect the images parameter
if (imageResult.images.length > 0) {
await abortable(activeSession.prompt(effectivePrompt, { images: imageResult.images }));
} else {
await abortable(activeSession.prompt(effectivePrompt));
}
} catch (err) {
promptError = err;
} finally {
log.debug(
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
);
}
try {
await waitForCompactionRetry();
} catch (err) {
if (isAbortError(err)) {
if (!promptError) promptError = err;
} else {
throw err;
}
}
messagesSnapshot = activeSession.messages.slice();
sessionIdUsed = activeSession.sessionId;
cacheTrace?.recordStage("session:after", {
messages: messagesSnapshot,
note: promptError ? "prompt error" : undefined,
});
anthropicPayloadLogger?.recordUsage(messagesSnapshot, promptError);
// Run agent_end hooks to allow plugins to analyze the conversation
// This is fire-and-forget, so we don't await
if (hookRunner?.hasHooks("agent_end")) {
hookRunner
.runAgentEnd(
{
messages: messagesSnapshot,
success: !aborted && !promptError,
error: promptError ? describeUnknownError(promptError) : undefined,
durationMs: Date.now() - promptStartedAt,
},
{
agentId: params.sessionKey?.split(":")[0] ?? "main",
sessionKey: params.sessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider ?? undefined,
},
)
.catch((err) => {
log.warn(`agent_end hook failed: ${err}`);
});
}
} finally {
clearTimeout(abortTimer);
if (abortWarnTimer) clearTimeout(abortWarnTimer);
unsubscribe();
clearActiveEmbeddedRun(params.sessionId, queueHandle);
params.abortSignal?.removeEventListener?.("abort", onAbort);
}
const lastAssistant = messagesSnapshot
.slice()
.reverse()
.find((m) => (m as AgentMessage)?.role === "assistant") as AssistantMessage | undefined;
const toolMetasNormalized = toolMetas
.filter(
(entry): entry is { toolName: string; meta?: string } =>
typeof entry.toolName === "string" && entry.toolName.trim().length > 0,
)
.map((entry) => ({ toolName: entry.toolName, meta: entry.meta }));
return {
aborted,
timedOut,
promptError,
sessionIdUsed,
systemPromptReport,
messagesSnapshot,
assistantTexts,
toolMetas: toolMetasNormalized,
lastAssistant,
lastToolError: getLastToolError?.(),
didSendViaMessagingTool: didSendViaMessagingTool(),
messagingToolSentTexts: getMessagingToolSentTexts(),
messagingToolSentTargets: getMessagingToolSentTargets(),
cloudCodeAssistFormatError: Boolean(
lastAssistant?.errorMessage && isCloudCodeAssistFormatError(lastAssistant.errorMessage),
),
// Client tool call detected (OpenResponses hosted tools)
clientToolCall: clientToolCallDetected ?? undefined,
};
} finally {
// Always tear down the session (and release the lock) before we leave this attempt.
sessionManager?.flushPendingToolResults?.();
session?.dispose();
await sessionLock.release();
}
} finally {
restoreSkillEnv?.();
process.chdir(prevCwd);
}
}