Compare commits
5 Commits
main
...
fix/model-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8aa2c0229d | ||
|
|
485481895e | ||
|
|
f01784369d | ||
|
|
0a853df6c9 | ||
|
|
25ada49e36 |
@ -5,6 +5,7 @@
|
||||
## Unreleased
|
||||
|
||||
### Fixes
|
||||
- Model: retry fallback on rate-limit/quota errors and unsupported thinking levels. (#223) — thanks @augard
|
||||
- Onboarding: resolve CLI entrypoint when running via `npx` so gateway daemon install works without a build step.
|
||||
- TUI: migrate key handling to the updated pi-tui Key matcher API.
|
||||
- macOS: prefer gateway config reads/writes in local mode (fall back to disk if the gateway is unavailable).
|
||||
|
||||
77
src/agents/pi-embedded-helpers.test.ts
Normal file
77
src/agents/pi-embedded-helpers.test.ts
Normal file
@ -0,0 +1,77 @@
|
||||
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { ThinkLevel } from "../auto-reply/thinking.js";
|
||||
import {
|
||||
isRateLimitAssistantError,
|
||||
pickFallbackThinkingLevel,
|
||||
} from "./pi-embedded-helpers.js";
|
||||
|
||||
const asAssistant = (overrides: Partial<AssistantMessage>) =>
|
||||
({
|
||||
role: "assistant",
|
||||
stopReason: "error",
|
||||
...overrides,
|
||||
}) as AssistantMessage;
|
||||
|
||||
describe("isRateLimitAssistantError", () => {
|
||||
it("detects 429 rate limit payloads", () => {
|
||||
const msg = asAssistant({
|
||||
errorMessage:
|
||||
'429 {"type":"error","error":{"type":"rate_limit_error","message":"This request would exceed your account\'s rate limit. Please try again later."}}',
|
||||
});
|
||||
expect(isRateLimitAssistantError(msg)).toBe(true);
|
||||
});
|
||||
|
||||
it("detects human-readable rate limit messages", () => {
|
||||
const msg = asAssistant({
|
||||
errorMessage: "Too many requests. Rate limit exceeded.",
|
||||
});
|
||||
expect(isRateLimitAssistantError(msg)).toBe(true);
|
||||
});
|
||||
|
||||
it("detects quota exceeded messages", () => {
|
||||
const msg = asAssistant({
|
||||
errorMessage:
|
||||
"You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.",
|
||||
});
|
||||
expect(isRateLimitAssistantError(msg)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for non-error messages", () => {
|
||||
const msg = asAssistant({
|
||||
stopReason: "end_turn",
|
||||
errorMessage: "rate limit",
|
||||
});
|
||||
expect(isRateLimitAssistantError(msg)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pickFallbackThinkingLevel", () => {
|
||||
it("selects the first supported thinking level", () => {
|
||||
const attempted = new Set<ThinkLevel>(["low"]);
|
||||
const next = pickFallbackThinkingLevel({
|
||||
message:
|
||||
"Unsupported value: 'low' is not supported with the 'gpt-5.2-pro' model. Supported values are: 'medium', 'high', and 'xhigh'.",
|
||||
attempted,
|
||||
});
|
||||
expect(next).toBe("medium");
|
||||
});
|
||||
|
||||
it("skips already attempted levels", () => {
|
||||
const attempted = new Set<ThinkLevel>(["low", "medium"]);
|
||||
const next = pickFallbackThinkingLevel({
|
||||
message: "Supported values are: 'medium', 'high', and 'xhigh'.",
|
||||
attempted,
|
||||
});
|
||||
expect(next).toBe("high");
|
||||
});
|
||||
|
||||
it("returns undefined when no supported values are found", () => {
|
||||
const attempted = new Set<ThinkLevel>(["low"]);
|
||||
const next = pickFallbackThinkingLevel({
|
||||
message: "Request failed.",
|
||||
attempted,
|
||||
});
|
||||
expect(next).toBeUndefined();
|
||||
});
|
||||
});
|
||||
@ -6,6 +6,10 @@ import type {
|
||||
AgentToolResult,
|
||||
} from "@mariozechner/pi-agent-core";
|
||||
import type { AssistantMessage } from "@mariozechner/pi-ai";
|
||||
import {
|
||||
normalizeThinkLevel,
|
||||
type ThinkLevel,
|
||||
} from "../auto-reply/thinking.js";
|
||||
|
||||
import { sanitizeContentBlocksImages } from "./tool-images.js";
|
||||
import type { WorkspaceBootstrapFile } from "./workspace.js";
|
||||
@ -109,3 +113,50 @@ export function formatAssistantErrorText(
|
||||
// Keep it short for WhatsApp.
|
||||
return raw.length > 600 ? `${raw.slice(0, 600)}…` : raw;
|
||||
}
|
||||
|
||||
export function isRateLimitAssistantError(
|
||||
msg: AssistantMessage | undefined,
|
||||
): boolean {
|
||||
if (!msg || msg.stopReason !== "error") return false;
|
||||
const raw = (msg.errorMessage ?? "").toLowerCase();
|
||||
if (!raw) return false;
|
||||
return (
|
||||
/rate[_ ]limit|too many requests|429/.test(raw) ||
|
||||
raw.includes("exceeded your current quota")
|
||||
);
|
||||
}
|
||||
|
||||
function extractSupportedValues(raw: string): string[] {
|
||||
const match =
|
||||
raw.match(/supported values are:\s*([^\n.]+)/i) ??
|
||||
raw.match(/supported values:\s*([^\n.]+)/i);
|
||||
if (!match?.[1]) return [];
|
||||
const fragment = match[1];
|
||||
const quoted = Array.from(fragment.matchAll(/['"]([^'"]+)['"]/g)).map(
|
||||
(entry) => entry[1]?.trim(),
|
||||
);
|
||||
if (quoted.length > 0) {
|
||||
return quoted.filter((entry): entry is string => Boolean(entry));
|
||||
}
|
||||
return fragment
|
||||
.split(/,|\band\b/gi)
|
||||
.map((entry) => entry.replace(/^[^a-zA-Z]+|[^a-zA-Z]+$/g, "").trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
export function pickFallbackThinkingLevel(params: {
|
||||
message?: string;
|
||||
attempted: Set<ThinkLevel>;
|
||||
}): ThinkLevel | undefined {
|
||||
const raw = params.message?.trim();
|
||||
if (!raw) return undefined;
|
||||
const supported = extractSupportedValues(raw);
|
||||
if (supported.length === 0) return undefined;
|
||||
for (const entry of supported) {
|
||||
const normalized = normalizeThinkLevel(entry);
|
||||
if (!normalized) continue;
|
||||
if (params.attempted.has(normalized)) continue;
|
||||
return normalized;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
@ -32,6 +32,8 @@ import {
|
||||
buildBootstrapContextFiles,
|
||||
ensureSessionHeader,
|
||||
formatAssistantErrorText,
|
||||
isRateLimitAssistantError,
|
||||
pickFallbackThinkingLevel,
|
||||
sanitizeSessionMessagesImages,
|
||||
} from "./pi-embedded-helpers.js";
|
||||
import {
|
||||
@ -317,302 +319,350 @@ export async function runEmbeddedPiAgent(params: {
|
||||
const apiKey = await getApiKeyForModel(model, authStorage);
|
||||
authStorage.setRuntimeApiKey(model.provider, apiKey);
|
||||
|
||||
const thinkingLevel = mapThinkingLevel(params.thinkLevel);
|
||||
let thinkLevel = params.thinkLevel ?? "off";
|
||||
const attemptedThinking = new Set<ThinkLevel>();
|
||||
|
||||
log.debug(
|
||||
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${provider} model=${modelId} surface=${params.surface ?? "unknown"}`,
|
||||
);
|
||||
|
||||
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
||||
await ensureSessionHeader({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
cwd: resolvedWorkspace,
|
||||
});
|
||||
|
||||
let restoreSkillEnv: (() => void) | undefined;
|
||||
process.chdir(resolvedWorkspace);
|
||||
try {
|
||||
const shouldLoadSkillEntries =
|
||||
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
|
||||
const skillEntries = shouldLoadSkillEntries
|
||||
? loadWorkspaceSkillEntries(resolvedWorkspace)
|
||||
: [];
|
||||
const skillsSnapshot =
|
||||
params.skillsSnapshot ??
|
||||
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
|
||||
config: params.config,
|
||||
entries: skillEntries,
|
||||
});
|
||||
const sandboxSessionKey = params.sessionKey?.trim() || params.sessionId;
|
||||
const sandbox = await resolveSandboxContext({
|
||||
config: params.config,
|
||||
sessionKey: sandboxSessionKey,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
});
|
||||
restoreSkillEnv = params.skillsSnapshot
|
||||
? applySkillEnvOverridesFromSnapshot({
|
||||
snapshot: params.skillsSnapshot,
|
||||
config: params.config,
|
||||
})
|
||||
: applySkillEnvOverrides({
|
||||
skills: skillEntries ?? [],
|
||||
config: params.config,
|
||||
});
|
||||
|
||||
const bootstrapFiles =
|
||||
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
|
||||
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
|
||||
const promptSkills = resolvePromptSkills(skillsSnapshot, skillEntries);
|
||||
// Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`).
|
||||
// `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged.
|
||||
const tools = createClawdbotCodingTools({
|
||||
bash: {
|
||||
...params.config?.agent?.bash,
|
||||
elevated: params.bashElevated,
|
||||
},
|
||||
sandbox,
|
||||
surface: params.surface,
|
||||
sessionKey: params.sessionKey ?? params.sessionId,
|
||||
config: params.config,
|
||||
});
|
||||
const machineName = await getMachineDisplayName();
|
||||
const runtimeInfo = {
|
||||
host: machineName,
|
||||
os: `${os.type()} ${os.release()}`,
|
||||
arch: os.arch(),
|
||||
node: process.version,
|
||||
model: `${provider}/${modelId}`,
|
||||
};
|
||||
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox);
|
||||
const reasoningTagHint = provider === "ollama";
|
||||
const systemPrompt = buildSystemPrompt({
|
||||
appendPrompt: buildAgentSystemPromptAppend({
|
||||
workspaceDir: resolvedWorkspace,
|
||||
defaultThinkLevel: params.thinkLevel,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
ownerNumbers: params.ownerNumbers,
|
||||
reasoningTagHint,
|
||||
runtimeInfo,
|
||||
sandboxInfo,
|
||||
toolNames: tools.map((tool) => tool.name),
|
||||
}),
|
||||
contextFiles,
|
||||
skills: promptSkills,
|
||||
cwd: resolvedWorkspace,
|
||||
tools,
|
||||
});
|
||||
|
||||
const sessionManager = SessionManager.open(params.sessionFile);
|
||||
const settingsManager = SettingsManager.create(
|
||||
resolvedWorkspace,
|
||||
agentDir,
|
||||
);
|
||||
|
||||
const { session } = await createAgentSession({
|
||||
cwd: resolvedWorkspace,
|
||||
agentDir,
|
||||
authStorage,
|
||||
modelRegistry,
|
||||
model,
|
||||
thinkingLevel,
|
||||
systemPrompt,
|
||||
// Custom tool set: extra bash/process + read image sanitization.
|
||||
tools,
|
||||
sessionManager,
|
||||
settingsManager,
|
||||
skills: promptSkills,
|
||||
contextFiles,
|
||||
});
|
||||
|
||||
const prior = await sanitizeSessionMessagesImages(
|
||||
session.messages,
|
||||
"session:history",
|
||||
);
|
||||
if (prior.length > 0) {
|
||||
session.agent.replaceMessages(prior);
|
||||
}
|
||||
let aborted = Boolean(params.abortSignal?.aborted);
|
||||
const abortRun = () => {
|
||||
aborted = true;
|
||||
void session.abort();
|
||||
};
|
||||
const queueHandle: EmbeddedPiQueueHandle = {
|
||||
queueMessage: async (text: string) => {
|
||||
await session.steer(text);
|
||||
},
|
||||
isStreaming: () => session.isStreaming,
|
||||
abort: abortRun,
|
||||
};
|
||||
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
|
||||
|
||||
const {
|
||||
assistantTexts,
|
||||
toolMetas,
|
||||
unsubscribe,
|
||||
waitForCompactionRetry,
|
||||
} = subscribeEmbeddedPiSession({
|
||||
session,
|
||||
runId: params.runId,
|
||||
verboseLevel: params.verboseLevel,
|
||||
shouldEmitToolResult: params.shouldEmitToolResult,
|
||||
onToolResult: params.onToolResult,
|
||||
onBlockReply: params.onBlockReply,
|
||||
blockReplyBreak: params.blockReplyBreak,
|
||||
blockReplyChunking: params.blockReplyChunking,
|
||||
onPartialReply: params.onPartialReply,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
enforceFinalTag: params.enforceFinalTag,
|
||||
});
|
||||
|
||||
let abortWarnTimer: NodeJS.Timeout | undefined;
|
||||
const abortTimer = setTimeout(
|
||||
() => {
|
||||
log.warn(
|
||||
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
||||
);
|
||||
abortRun();
|
||||
if (!abortWarnTimer) {
|
||||
abortWarnTimer = setTimeout(() => {
|
||||
if (!session.isStreaming) return;
|
||||
log.warn(
|
||||
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
}, 10_000);
|
||||
}
|
||||
},
|
||||
Math.max(1, params.timeoutMs),
|
||||
);
|
||||
|
||||
let messagesSnapshot: AgentMessage[] = [];
|
||||
let sessionIdUsed = session.sessionId;
|
||||
const onAbort = () => {
|
||||
abortRun();
|
||||
};
|
||||
if (params.abortSignal) {
|
||||
if (params.abortSignal.aborted) {
|
||||
onAbort();
|
||||
} else {
|
||||
params.abortSignal.addEventListener("abort", onAbort, {
|
||||
once: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
let promptError: unknown = null;
|
||||
try {
|
||||
const promptStartedAt = Date.now();
|
||||
log.debug(
|
||||
`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
try {
|
||||
await session.prompt(params.prompt);
|
||||
} catch (err) {
|
||||
promptError = err;
|
||||
} finally {
|
||||
log.debug(
|
||||
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
|
||||
);
|
||||
}
|
||||
await waitForCompactionRetry();
|
||||
messagesSnapshot = session.messages.slice();
|
||||
sessionIdUsed = session.sessionId;
|
||||
} finally {
|
||||
clearTimeout(abortTimer);
|
||||
if (abortWarnTimer) {
|
||||
clearTimeout(abortWarnTimer);
|
||||
abortWarnTimer = undefined;
|
||||
}
|
||||
unsubscribe();
|
||||
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
|
||||
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
|
||||
notifyEmbeddedRunEnded(params.sessionId);
|
||||
}
|
||||
session.dispose();
|
||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||
}
|
||||
if (promptError && !aborted) {
|
||||
throw promptError;
|
||||
}
|
||||
|
||||
const lastAssistant = messagesSnapshot
|
||||
.slice()
|
||||
.reverse()
|
||||
.find((m) => (m as AgentMessage)?.role === "assistant") as
|
||||
| AssistantMessage
|
||||
| undefined;
|
||||
|
||||
const usage = lastAssistant?.usage;
|
||||
const agentMeta: EmbeddedPiAgentMeta = {
|
||||
sessionId: sessionIdUsed,
|
||||
provider: lastAssistant?.provider ?? provider,
|
||||
model: lastAssistant?.model ?? model.id,
|
||||
usage: usage
|
||||
? {
|
||||
input: usage.input,
|
||||
output: usage.output,
|
||||
cacheRead: usage.cacheRead,
|
||||
cacheWrite: usage.cacheWrite,
|
||||
total: usage.totalTokens,
|
||||
}
|
||||
: undefined,
|
||||
};
|
||||
|
||||
const replyItems: Array<{ text: string; media?: string[] }> = [];
|
||||
|
||||
const errorText = lastAssistant
|
||||
? formatAssistantErrorText(lastAssistant)
|
||||
: undefined;
|
||||
if (errorText) replyItems.push({ text: errorText });
|
||||
|
||||
const inlineToolResults =
|
||||
params.verboseLevel === "on" &&
|
||||
!params.onPartialReply &&
|
||||
!params.onToolResult &&
|
||||
toolMetas.length > 0;
|
||||
if (inlineToolResults) {
|
||||
for (const { toolName, meta } of toolMetas) {
|
||||
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
|
||||
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(agg);
|
||||
if (cleanedText)
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
}
|
||||
}
|
||||
|
||||
for (const text of assistantTexts.length
|
||||
? assistantTexts
|
||||
: lastAssistant
|
||||
? [extractAssistantText(lastAssistant)]
|
||||
: []) {
|
||||
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
|
||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0)) continue;
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
}
|
||||
|
||||
const payloads = replyItems
|
||||
.map((item) => ({
|
||||
text: item.text?.trim() ? item.text.trim() : undefined,
|
||||
mediaUrls: item.media?.length ? item.media : undefined,
|
||||
mediaUrl: item.media?.[0],
|
||||
}))
|
||||
.filter(
|
||||
(p) =>
|
||||
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
|
||||
);
|
||||
while (true) {
|
||||
const thinkingLevel = mapThinkingLevel(thinkLevel);
|
||||
attemptedThinking.add(thinkLevel);
|
||||
|
||||
log.debug(
|
||||
`embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`,
|
||||
`embedded run start: runId=${params.runId} sessionId=${params.sessionId} provider=${provider} model=${modelId} thinking=${thinkLevel} surface=${params.surface ?? "unknown"}`,
|
||||
);
|
||||
return {
|
||||
payloads: payloads.length ? payloads : undefined,
|
||||
meta: {
|
||||
durationMs: Date.now() - started,
|
||||
agentMeta,
|
||||
aborted,
|
||||
},
|
||||
};
|
||||
} finally {
|
||||
restoreSkillEnv?.();
|
||||
process.chdir(prevCwd);
|
||||
|
||||
await fs.mkdir(resolvedWorkspace, { recursive: true });
|
||||
await ensureSessionHeader({
|
||||
sessionFile: params.sessionFile,
|
||||
sessionId: params.sessionId,
|
||||
cwd: resolvedWorkspace,
|
||||
});
|
||||
|
||||
let restoreSkillEnv: (() => void) | undefined;
|
||||
process.chdir(resolvedWorkspace);
|
||||
try {
|
||||
const shouldLoadSkillEntries =
|
||||
!params.skillsSnapshot || !params.skillsSnapshot.resolvedSkills;
|
||||
const skillEntries = shouldLoadSkillEntries
|
||||
? loadWorkspaceSkillEntries(resolvedWorkspace)
|
||||
: [];
|
||||
const skillsSnapshot =
|
||||
params.skillsSnapshot ??
|
||||
buildWorkspaceSkillSnapshot(resolvedWorkspace, {
|
||||
config: params.config,
|
||||
entries: skillEntries,
|
||||
});
|
||||
const sandboxSessionKey =
|
||||
params.sessionKey?.trim() || params.sessionId;
|
||||
const sandbox = await resolveSandboxContext({
|
||||
config: params.config,
|
||||
sessionKey: sandboxSessionKey,
|
||||
workspaceDir: resolvedWorkspace,
|
||||
});
|
||||
restoreSkillEnv = params.skillsSnapshot
|
||||
? applySkillEnvOverridesFromSnapshot({
|
||||
snapshot: params.skillsSnapshot,
|
||||
config: params.config,
|
||||
})
|
||||
: applySkillEnvOverrides({
|
||||
skills: skillEntries ?? [],
|
||||
config: params.config,
|
||||
});
|
||||
|
||||
const bootstrapFiles =
|
||||
await loadWorkspaceBootstrapFiles(resolvedWorkspace);
|
||||
const contextFiles = buildBootstrapContextFiles(bootstrapFiles);
|
||||
const promptSkills = resolvePromptSkills(
|
||||
skillsSnapshot,
|
||||
skillEntries,
|
||||
);
|
||||
// Tool schemas must be provider-compatible (OpenAI requires top-level `type: "object"`).
|
||||
// `createClawdbotCodingTools()` normalizes schemas so the session can pass them through unchanged.
|
||||
const tools = createClawdbotCodingTools({
|
||||
bash: {
|
||||
...params.config?.agent?.bash,
|
||||
elevated: params.bashElevated,
|
||||
},
|
||||
sandbox,
|
||||
surface: params.surface,
|
||||
sessionKey: params.sessionKey ?? params.sessionId,
|
||||
config: params.config,
|
||||
});
|
||||
const machineName = await getMachineDisplayName();
|
||||
const runtimeInfo = {
|
||||
host: machineName,
|
||||
os: `${os.type()} ${os.release()}`,
|
||||
arch: os.arch(),
|
||||
node: process.version,
|
||||
model: `${provider}/${modelId}`,
|
||||
};
|
||||
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox);
|
||||
const reasoningTagHint = provider === "ollama";
|
||||
const systemPrompt = buildSystemPrompt({
|
||||
appendPrompt: buildAgentSystemPromptAppend({
|
||||
workspaceDir: resolvedWorkspace,
|
||||
defaultThinkLevel: thinkLevel,
|
||||
extraSystemPrompt: params.extraSystemPrompt,
|
||||
ownerNumbers: params.ownerNumbers,
|
||||
reasoningTagHint,
|
||||
runtimeInfo,
|
||||
sandboxInfo,
|
||||
toolNames: tools.map((tool) => tool.name),
|
||||
}),
|
||||
contextFiles,
|
||||
skills: promptSkills,
|
||||
cwd: resolvedWorkspace,
|
||||
tools,
|
||||
});
|
||||
|
||||
const sessionManager = SessionManager.open(params.sessionFile);
|
||||
const settingsManager = SettingsManager.create(
|
||||
resolvedWorkspace,
|
||||
agentDir,
|
||||
);
|
||||
|
||||
const { session } = await createAgentSession({
|
||||
cwd: resolvedWorkspace,
|
||||
agentDir,
|
||||
authStorage,
|
||||
modelRegistry,
|
||||
model,
|
||||
thinkingLevel,
|
||||
systemPrompt,
|
||||
// Custom tool set: extra bash/process + read image sanitization.
|
||||
tools,
|
||||
sessionManager,
|
||||
settingsManager,
|
||||
skills: promptSkills,
|
||||
contextFiles,
|
||||
});
|
||||
|
||||
const prior = await sanitizeSessionMessagesImages(
|
||||
session.messages,
|
||||
"session:history",
|
||||
);
|
||||
if (prior.length > 0) {
|
||||
session.agent.replaceMessages(prior);
|
||||
}
|
||||
let aborted = Boolean(params.abortSignal?.aborted);
|
||||
const abortRun = () => {
|
||||
aborted = true;
|
||||
void session.abort();
|
||||
};
|
||||
const queueHandle: EmbeddedPiQueueHandle = {
|
||||
queueMessage: async (text: string) => {
|
||||
await session.steer(text);
|
||||
},
|
||||
isStreaming: () => session.isStreaming,
|
||||
abort: abortRun,
|
||||
};
|
||||
ACTIVE_EMBEDDED_RUNS.set(params.sessionId, queueHandle);
|
||||
|
||||
const {
|
||||
assistantTexts,
|
||||
toolMetas,
|
||||
unsubscribe,
|
||||
waitForCompactionRetry,
|
||||
} = subscribeEmbeddedPiSession({
|
||||
session,
|
||||
runId: params.runId,
|
||||
verboseLevel: params.verboseLevel,
|
||||
shouldEmitToolResult: params.shouldEmitToolResult,
|
||||
onToolResult: params.onToolResult,
|
||||
onBlockReply: params.onBlockReply,
|
||||
blockReplyBreak: params.blockReplyBreak,
|
||||
blockReplyChunking: params.blockReplyChunking,
|
||||
onPartialReply: params.onPartialReply,
|
||||
onAgentEvent: params.onAgentEvent,
|
||||
enforceFinalTag: params.enforceFinalTag,
|
||||
});
|
||||
|
||||
let abortWarnTimer: NodeJS.Timeout | undefined;
|
||||
const abortTimer = setTimeout(
|
||||
() => {
|
||||
log.warn(
|
||||
`embedded run timeout: runId=${params.runId} sessionId=${params.sessionId} timeoutMs=${params.timeoutMs}`,
|
||||
);
|
||||
abortRun();
|
||||
if (!abortWarnTimer) {
|
||||
abortWarnTimer = setTimeout(() => {
|
||||
if (!session.isStreaming) return;
|
||||
log.warn(
|
||||
`embedded run abort still streaming: runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
}, 10_000);
|
||||
}
|
||||
},
|
||||
Math.max(1, params.timeoutMs),
|
||||
);
|
||||
|
||||
let messagesSnapshot: AgentMessage[] = [];
|
||||
let sessionIdUsed = session.sessionId;
|
||||
const onAbort = () => {
|
||||
abortRun();
|
||||
};
|
||||
if (params.abortSignal) {
|
||||
if (params.abortSignal.aborted) {
|
||||
onAbort();
|
||||
} else {
|
||||
params.abortSignal.addEventListener("abort", onAbort, {
|
||||
once: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
let promptError: unknown = null;
|
||||
try {
|
||||
const promptStartedAt = Date.now();
|
||||
log.debug(
|
||||
`embedded run prompt start: runId=${params.runId} sessionId=${params.sessionId}`,
|
||||
);
|
||||
try {
|
||||
await session.prompt(params.prompt);
|
||||
} catch (err) {
|
||||
promptError = err;
|
||||
} finally {
|
||||
log.debug(
|
||||
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
|
||||
);
|
||||
}
|
||||
await waitForCompactionRetry();
|
||||
messagesSnapshot = session.messages.slice();
|
||||
sessionIdUsed = session.sessionId;
|
||||
} finally {
|
||||
clearTimeout(abortTimer);
|
||||
if (abortWarnTimer) {
|
||||
clearTimeout(abortWarnTimer);
|
||||
abortWarnTimer = undefined;
|
||||
}
|
||||
unsubscribe();
|
||||
if (ACTIVE_EMBEDDED_RUNS.get(params.sessionId) === queueHandle) {
|
||||
ACTIVE_EMBEDDED_RUNS.delete(params.sessionId);
|
||||
notifyEmbeddedRunEnded(params.sessionId);
|
||||
}
|
||||
session.dispose();
|
||||
params.abortSignal?.removeEventListener?.("abort", onAbort);
|
||||
}
|
||||
if (promptError && !aborted) {
|
||||
const fallbackThinking = pickFallbackThinkingLevel({
|
||||
message:
|
||||
promptError instanceof Error
|
||||
? promptError.message
|
||||
: String(promptError),
|
||||
attempted: attemptedThinking,
|
||||
});
|
||||
if (fallbackThinking) {
|
||||
log.warn(
|
||||
`unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`,
|
||||
);
|
||||
thinkLevel = fallbackThinking;
|
||||
continue;
|
||||
}
|
||||
throw promptError;
|
||||
}
|
||||
|
||||
const lastAssistant = messagesSnapshot
|
||||
.slice()
|
||||
.reverse()
|
||||
.find((m) => (m as AgentMessage)?.role === "assistant") as
|
||||
| AssistantMessage
|
||||
| undefined;
|
||||
|
||||
const fallbackThinking = pickFallbackThinkingLevel({
|
||||
message: lastAssistant?.errorMessage,
|
||||
attempted: attemptedThinking,
|
||||
});
|
||||
if (fallbackThinking && !aborted) {
|
||||
log.warn(
|
||||
`unsupported thinking level for ${provider}/${modelId}; retrying with ${fallbackThinking}`,
|
||||
);
|
||||
thinkLevel = fallbackThinking;
|
||||
continue;
|
||||
}
|
||||
|
||||
const fallbackConfigured =
|
||||
(params.config?.agent?.modelFallbacks?.length ?? 0) > 0;
|
||||
if (fallbackConfigured && isRateLimitAssistantError(lastAssistant)) {
|
||||
const message =
|
||||
lastAssistant?.errorMessage?.trim() ||
|
||||
(lastAssistant ? formatAssistantErrorText(lastAssistant) : "") ||
|
||||
"LLM request rate limited.";
|
||||
throw new Error(message);
|
||||
}
|
||||
|
||||
const usage = lastAssistant?.usage;
|
||||
const agentMeta: EmbeddedPiAgentMeta = {
|
||||
sessionId: sessionIdUsed,
|
||||
provider: lastAssistant?.provider ?? provider,
|
||||
model: lastAssistant?.model ?? model.id,
|
||||
usage: usage
|
||||
? {
|
||||
input: usage.input,
|
||||
output: usage.output,
|
||||
cacheRead: usage.cacheRead,
|
||||
cacheWrite: usage.cacheWrite,
|
||||
total: usage.totalTokens,
|
||||
}
|
||||
: undefined,
|
||||
};
|
||||
|
||||
const replyItems: Array<{ text: string; media?: string[] }> = [];
|
||||
|
||||
const errorText = lastAssistant
|
||||
? formatAssistantErrorText(lastAssistant)
|
||||
: undefined;
|
||||
if (errorText) replyItems.push({ text: errorText });
|
||||
|
||||
const inlineToolResults =
|
||||
params.verboseLevel === "on" &&
|
||||
!params.onPartialReply &&
|
||||
!params.onToolResult &&
|
||||
toolMetas.length > 0;
|
||||
if (inlineToolResults) {
|
||||
for (const { toolName, meta } of toolMetas) {
|
||||
const agg = formatToolAggregate(toolName, meta ? [meta] : []);
|
||||
const { text: cleanedText, mediaUrls } =
|
||||
splitMediaFromOutput(agg);
|
||||
if (cleanedText)
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
}
|
||||
}
|
||||
|
||||
for (const text of assistantTexts.length
|
||||
? assistantTexts
|
||||
: lastAssistant
|
||||
? [extractAssistantText(lastAssistant)]
|
||||
: []) {
|
||||
const { text: cleanedText, mediaUrls } = splitMediaFromOutput(text);
|
||||
if (!cleanedText && (!mediaUrls || mediaUrls.length === 0))
|
||||
continue;
|
||||
replyItems.push({ text: cleanedText, media: mediaUrls });
|
||||
}
|
||||
|
||||
const payloads = replyItems
|
||||
.map((item) => ({
|
||||
text: item.text?.trim() ? item.text.trim() : undefined,
|
||||
mediaUrls: item.media?.length ? item.media : undefined,
|
||||
mediaUrl: item.media?.[0],
|
||||
}))
|
||||
.filter(
|
||||
(p) =>
|
||||
p.text || p.mediaUrl || (p.mediaUrls && p.mediaUrls.length > 0),
|
||||
);
|
||||
|
||||
log.debug(
|
||||
`embedded run done: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - started} aborted=${aborted}`,
|
||||
);
|
||||
return {
|
||||
payloads: payloads.length ? payloads : undefined,
|
||||
meta: {
|
||||
durationMs: Date.now() - started,
|
||||
agentMeta,
|
||||
aborted,
|
||||
},
|
||||
};
|
||||
} finally {
|
||||
restoreSkillEnv?.();
|
||||
process.chdir(prevCwd);
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
import fs from "node:fs";
|
||||
|
||||
import { getEnvApiKey } from "@mariozechner/pi-ai";
|
||||
import { discoverAuthStorage } from "@mariozechner/pi-coding-agent";
|
||||
import { resolveClawdbotAgentDir } from "../../agents/agent-paths.js";
|
||||
import type { ClawdbotConfig } from "../../config/config.js";
|
||||
import { resolveOAuthPath } from "../../config/paths.js";
|
||||
import {
|
||||
type SessionEntry,
|
||||
type SessionScope,
|
||||
@ -12,10 +15,6 @@ import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { normalizeE164 } from "../../utils.js";
|
||||
import { resolveHeartbeatSeconds } from "../../web/reconnect.js";
|
||||
import { getWebAuthAgeMs, webAuthExists } from "../../web/session.js";
|
||||
import { resolveClawdbotAgentDir } from "../../agents/agent-paths.js";
|
||||
import { resolveOAuthPath } from "../../config/paths.js";
|
||||
import { getEnvApiKey } from "@mariozechner/pi-ai";
|
||||
import { discoverAuthStorage } from "@mariozechner/pi-coding-agent";
|
||||
import {
|
||||
normalizeGroupActivation,
|
||||
parseActivationCommand,
|
||||
@ -61,7 +60,8 @@ function hasOAuthCredentials(provider: string): boolean {
|
||||
if (!entry) return false;
|
||||
const refresh =
|
||||
entry.refresh ?? entry.refresh_token ?? entry.refreshToken ?? "";
|
||||
const access = entry.access ?? entry.access_token ?? entry.accessToken ?? "";
|
||||
const access =
|
||||
entry.access ?? entry.access_token ?? entry.accessToken ?? "";
|
||||
return Boolean(refresh.trim() && access.trim());
|
||||
} catch {
|
||||
return false;
|
||||
@ -204,6 +204,7 @@ export async function handleCommands(params: {
|
||||
resolvedVerboseLevel,
|
||||
resolvedElevatedLevel,
|
||||
resolveDefaultThinkingLevel,
|
||||
provider,
|
||||
model,
|
||||
contextTokens,
|
||||
isGroup,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user