- Enhanced SubagentRunOutcome type with errorType and errorHint fields - Added categorizeError() helper to classify common error patterns: * File system errors (ENOENT, EACCES, etc.) * API/model errors (rate limits, auth failures, invalid requests) * Network errors (connection refused, DNS failures) * Timeout errors * Configuration errors (missing credentials, quota limits) - Updated error emission in agent-runner-execution.ts to categorize errors - Updated subagent-registry.ts to capture and propagate new error fields - Added buildErrorStatusLabel() helper for user-friendly error messages - Error announcements now include error type and remediation hints Example improved messages: - Before: 'failed: unknown error' - After: 'failed (tool error): ENOENT — File or directory not found' This makes subagent failures much easier to understand and debug while maintaining backward compatibility.
509 lines
16 KiB
TypeScript
509 lines
16 KiB
TypeScript
import crypto from "node:crypto";
|
|
import path from "node:path";
|
|
|
|
import { loadConfig } from "../config/config.js";
|
|
import {
|
|
loadSessionStore,
|
|
resolveAgentIdFromSessionKey,
|
|
resolveMainSessionKey,
|
|
resolveStorePath,
|
|
} from "../config/sessions.js";
|
|
import { normalizeMainKey } from "../routing/session-key.js";
|
|
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
|
import { callGateway } from "../gateway/call.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
import {
|
|
type DeliveryContext,
|
|
deliveryContextFromSession,
|
|
mergeDeliveryContext,
|
|
normalizeDeliveryContext,
|
|
} from "../utils/delivery-context.js";
|
|
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
|
|
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
|
|
import { readLatestAssistantReply } from "./tools/agent-step.js";
|
|
|
|
function formatDurationShort(valueMs?: number) {
|
|
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) return undefined;
|
|
const totalSeconds = Math.round(valueMs / 1000);
|
|
const hours = Math.floor(totalSeconds / 3600);
|
|
const minutes = Math.floor((totalSeconds % 3600) / 60);
|
|
const seconds = totalSeconds % 60;
|
|
if (hours > 0) return `${hours}h${minutes}m`;
|
|
if (minutes > 0) return `${minutes}m${seconds}s`;
|
|
return `${seconds}s`;
|
|
}
|
|
|
|
function formatTokenCount(value?: number) {
|
|
if (!value || !Number.isFinite(value)) return "0";
|
|
if (value >= 1_000_000) return `${(value / 1_000_000).toFixed(1)}m`;
|
|
if (value >= 1_000) return `${(value / 1_000).toFixed(1)}k`;
|
|
return String(Math.round(value));
|
|
}
|
|
|
|
function formatUsd(value?: number) {
|
|
if (value === undefined || !Number.isFinite(value)) return undefined;
|
|
if (value >= 1) return `$${value.toFixed(2)}`;
|
|
if (value >= 0.01) return `$${value.toFixed(2)}`;
|
|
return `$${value.toFixed(4)}`;
|
|
}
|
|
|
|
function resolveModelCost(params: {
|
|
provider?: string;
|
|
model?: string;
|
|
config: ReturnType<typeof loadConfig>;
|
|
}):
|
|
| {
|
|
input: number;
|
|
output: number;
|
|
cacheRead: number;
|
|
cacheWrite: number;
|
|
}
|
|
| undefined {
|
|
const provider = params.provider?.trim();
|
|
const model = params.model?.trim();
|
|
if (!provider || !model) return undefined;
|
|
const models = params.config.models?.providers?.[provider]?.models ?? [];
|
|
const entry = models.find((candidate) => candidate.id === model);
|
|
return entry?.cost;
|
|
}
|
|
|
|
async function waitForSessionUsage(params: { sessionKey: string }) {
|
|
const cfg = loadConfig();
|
|
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
let entry = loadSessionStore(storePath)[params.sessionKey];
|
|
if (!entry) return { entry, storePath };
|
|
const hasTokens = () =>
|
|
entry &&
|
|
(typeof entry.totalTokens === "number" ||
|
|
typeof entry.inputTokens === "number" ||
|
|
typeof entry.outputTokens === "number");
|
|
if (hasTokens()) return { entry, storePath };
|
|
for (let attempt = 0; attempt < 4; attempt += 1) {
|
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
|
entry = loadSessionStore(storePath)[params.sessionKey];
|
|
if (hasTokens()) break;
|
|
}
|
|
return { entry, storePath };
|
|
}
|
|
|
|
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
|
|
|
|
function resolveAnnounceOrigin(
|
|
entry?: DeliveryContextSource,
|
|
requesterOrigin?: DeliveryContext,
|
|
): DeliveryContext | undefined {
|
|
return mergeDeliveryContext(deliveryContextFromSession(entry), requesterOrigin);
|
|
}
|
|
|
|
async function sendAnnounce(item: AnnounceQueueItem) {
|
|
const origin = item.origin;
|
|
const threadId =
|
|
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
|
|
await callGateway({
|
|
method: "agent",
|
|
params: {
|
|
sessionKey: item.sessionKey,
|
|
message: item.prompt,
|
|
channel: origin?.channel,
|
|
accountId: origin?.accountId,
|
|
to: origin?.to,
|
|
threadId,
|
|
deliver: true,
|
|
idempotencyKey: crypto.randomUUID(),
|
|
},
|
|
expectFinal: true,
|
|
timeoutMs: 60_000,
|
|
});
|
|
}
|
|
|
|
function resolveRequesterStoreKey(
|
|
cfg: ReturnType<typeof loadConfig>,
|
|
requesterSessionKey: string,
|
|
): string {
|
|
const raw = requesterSessionKey.trim();
|
|
if (!raw) return raw;
|
|
if (raw === "global" || raw === "unknown") return raw;
|
|
if (raw.startsWith("agent:")) return raw;
|
|
const mainKey = normalizeMainKey(cfg.session?.mainKey);
|
|
if (raw === "main" || raw === mainKey) {
|
|
return resolveMainSessionKey(cfg);
|
|
}
|
|
const agentId = resolveAgentIdFromSessionKey(raw);
|
|
return `agent:${agentId}:${raw}`;
|
|
}
|
|
|
|
function loadRequesterSessionEntry(requesterSessionKey: string) {
|
|
const cfg = loadConfig();
|
|
const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey);
|
|
const agentId = resolveAgentIdFromSessionKey(canonicalKey);
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
const store = loadSessionStore(storePath);
|
|
const entry = store[canonicalKey];
|
|
return { cfg, entry, canonicalKey };
|
|
}
|
|
|
|
async function maybeQueueSubagentAnnounce(params: {
|
|
requesterSessionKey: string;
|
|
triggerMessage: string;
|
|
summaryLine?: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
}): Promise<"steered" | "queued" | "none"> {
|
|
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
|
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
|
const sessionId = entry?.sessionId;
|
|
if (!sessionId) return "none";
|
|
|
|
const queueSettings = resolveQueueSettings({
|
|
cfg,
|
|
channel: entry?.channel ?? entry?.lastChannel,
|
|
sessionEntry: entry,
|
|
});
|
|
const isActive = isEmbeddedPiRunActive(sessionId);
|
|
|
|
const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
|
|
if (shouldSteer) {
|
|
const steered = queueEmbeddedPiMessage(sessionId, params.triggerMessage);
|
|
if (steered) return "steered";
|
|
}
|
|
|
|
const shouldFollowup =
|
|
queueSettings.mode === "followup" ||
|
|
queueSettings.mode === "collect" ||
|
|
queueSettings.mode === "steer-backlog" ||
|
|
queueSettings.mode === "interrupt";
|
|
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
|
|
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
|
|
enqueueAnnounce({
|
|
key: canonicalKey,
|
|
item: {
|
|
prompt: params.triggerMessage,
|
|
summaryLine: params.summaryLine,
|
|
enqueuedAt: Date.now(),
|
|
sessionKey: canonicalKey,
|
|
origin,
|
|
},
|
|
settings: queueSettings,
|
|
send: sendAnnounce,
|
|
});
|
|
return "queued";
|
|
}
|
|
|
|
return "none";
|
|
}
|
|
|
|
/**
|
|
* Build a descriptive error status label from outcome data.
|
|
* Includes error type, message, and hint if available.
|
|
*/
|
|
function buildErrorStatusLabel(outcome: SubagentRunOutcome): string {
|
|
const parts: string[] = [];
|
|
|
|
// Start with "failed"
|
|
parts.push("failed");
|
|
|
|
// Add error type context
|
|
if (outcome.errorType) {
|
|
const typeLabel: Record<string, string> = {
|
|
model: "API error",
|
|
tool: "tool error",
|
|
network: "network error",
|
|
config: "configuration error",
|
|
timeout: "timeout",
|
|
};
|
|
const label = typeLabel[outcome.errorType] || "error";
|
|
parts.push(`(${label}):`);
|
|
}
|
|
|
|
// Add error message
|
|
const errorMsg = outcome.error || "unknown error";
|
|
parts.push(errorMsg);
|
|
|
|
// Add hint if available
|
|
if (outcome.errorHint) {
|
|
parts.push(`— ${outcome.errorHint}`);
|
|
}
|
|
|
|
return parts.join(" ");
|
|
}
|
|
|
|
async function buildSubagentStatsLine(params: {
|
|
sessionKey: string;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
}) {
|
|
const cfg = loadConfig();
|
|
const { entry, storePath } = await waitForSessionUsage({
|
|
sessionKey: params.sessionKey,
|
|
});
|
|
|
|
const sessionId = entry?.sessionId;
|
|
const transcriptPath =
|
|
sessionId && storePath ? path.join(path.dirname(storePath), `${sessionId}.jsonl`) : undefined;
|
|
|
|
const input = entry?.inputTokens;
|
|
const output = entry?.outputTokens;
|
|
const total =
|
|
entry?.totalTokens ??
|
|
(typeof input === "number" && typeof output === "number" ? input + output : undefined);
|
|
const runtimeMs =
|
|
typeof params.startedAt === "number" && typeof params.endedAt === "number"
|
|
? Math.max(0, params.endedAt - params.startedAt)
|
|
: undefined;
|
|
|
|
const provider = entry?.modelProvider;
|
|
const model = entry?.model;
|
|
const costConfig = resolveModelCost({ provider, model, config: cfg });
|
|
const cost =
|
|
costConfig && typeof input === "number" && typeof output === "number"
|
|
? (input * costConfig.input + output * costConfig.output) / 1_000_000
|
|
: undefined;
|
|
|
|
const parts: string[] = [];
|
|
const runtime = formatDurationShort(runtimeMs);
|
|
parts.push(`runtime ${runtime ?? "n/a"}`);
|
|
if (typeof total === "number") {
|
|
const inputText = typeof input === "number" ? formatTokenCount(input) : "n/a";
|
|
const outputText = typeof output === "number" ? formatTokenCount(output) : "n/a";
|
|
const totalText = formatTokenCount(total);
|
|
parts.push(`tokens ${totalText} (in ${inputText} / out ${outputText})`);
|
|
} else {
|
|
parts.push("tokens n/a");
|
|
}
|
|
const costText = formatUsd(cost);
|
|
if (costText) parts.push(`est ${costText}`);
|
|
parts.push(`sessionKey ${params.sessionKey}`);
|
|
if (sessionId) parts.push(`sessionId ${sessionId}`);
|
|
if (transcriptPath) parts.push(`transcript ${transcriptPath}`);
|
|
|
|
return `Stats: ${parts.join(" \u2022 ")}`;
|
|
}
|
|
|
|
export function buildSubagentSystemPrompt(params: {
|
|
requesterSessionKey?: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
childSessionKey: string;
|
|
label?: string;
|
|
task?: string;
|
|
}) {
|
|
const taskText =
|
|
typeof params.task === "string" && params.task.trim()
|
|
? params.task.replace(/\s+/g, " ").trim()
|
|
: "{{TASK_DESCRIPTION}}";
|
|
const lines = [
|
|
"# Subagent Context",
|
|
"",
|
|
"You are a **subagent** spawned by the main agent for a specific task.",
|
|
"",
|
|
"## Your Role",
|
|
`- You were created to handle: ${taskText}`,
|
|
"- Complete this task. That's your entire purpose.",
|
|
"- You are NOT the main agent. Don't try to be.",
|
|
"",
|
|
"## Rules",
|
|
"1. **Stay focused** - Do your assigned task, nothing else",
|
|
"2. **Complete the task** - Your final message will be automatically reported to the main agent",
|
|
"3. **Don't initiate** - No heartbeats, no proactive actions, no side quests",
|
|
"4. **Be ephemeral** - You may be terminated after task completion. That's fine.",
|
|
"",
|
|
"## Output Format",
|
|
"When complete, your final response should include:",
|
|
"- What you accomplished or found",
|
|
"- Any relevant details the main agent should know",
|
|
"- Keep it concise but informative",
|
|
"",
|
|
"## What You DON'T Do",
|
|
"- NO user conversations (that's main agent's job)",
|
|
"- NO external messages (email, tweets, etc.) unless explicitly tasked",
|
|
"- NO cron jobs or persistent state",
|
|
"- NO pretending to be the main agent",
|
|
"- NO using the `message` tool directly",
|
|
"",
|
|
"## Session Context",
|
|
params.label ? `- Label: ${params.label}` : undefined,
|
|
params.requesterSessionKey ? `- Requester session: ${params.requesterSessionKey}.` : undefined,
|
|
params.requesterOrigin?.channel
|
|
? `- Requester channel: ${params.requesterOrigin.channel}.`
|
|
: undefined,
|
|
`- Your session: ${params.childSessionKey}.`,
|
|
"",
|
|
].filter((line): line is string => line !== undefined);
|
|
return lines.join("\n");
|
|
}
|
|
|
|
export type SubagentRunOutcome = {
|
|
status: "ok" | "error" | "timeout" | "unknown";
|
|
error?: string;
|
|
errorType?: "model" | "tool" | "network" | "config" | "timeout" | "unknown";
|
|
errorHint?: string;
|
|
};
|
|
|
|
export async function runSubagentAnnounceFlow(params: {
|
|
childSessionKey: string;
|
|
childRunId: string;
|
|
requesterSessionKey: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
requesterDisplayKey: string;
|
|
task: string;
|
|
timeoutMs: number;
|
|
cleanup: "delete" | "keep";
|
|
roundOneReply?: string;
|
|
waitForCompletion?: boolean;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
label?: string;
|
|
outcome?: SubagentRunOutcome;
|
|
}): Promise<boolean> {
|
|
let didAnnounce = false;
|
|
try {
|
|
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
|
|
let reply = params.roundOneReply;
|
|
let outcome: SubagentRunOutcome | undefined = params.outcome;
|
|
if (!reply && params.waitForCompletion !== false) {
|
|
const waitMs = Math.min(params.timeoutMs, 60_000);
|
|
const wait = (await callGateway({
|
|
method: "agent.wait",
|
|
params: {
|
|
runId: params.childRunId,
|
|
timeoutMs: waitMs,
|
|
},
|
|
timeoutMs: waitMs + 2000,
|
|
})) as {
|
|
status?: string;
|
|
error?: string;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
};
|
|
if (wait?.status === "timeout") {
|
|
outcome = { status: "timeout" };
|
|
} else if (wait?.status === "error") {
|
|
outcome = { status: "error", error: wait.error };
|
|
} else if (wait?.status === "ok") {
|
|
outcome = { status: "ok" };
|
|
}
|
|
if (typeof wait?.startedAt === "number" && !params.startedAt) {
|
|
params.startedAt = wait.startedAt;
|
|
}
|
|
if (typeof wait?.endedAt === "number" && !params.endedAt) {
|
|
params.endedAt = wait.endedAt;
|
|
}
|
|
if (wait?.status === "timeout") {
|
|
if (!outcome) outcome = { status: "timeout" };
|
|
}
|
|
reply = await readLatestAssistantReply({
|
|
sessionKey: params.childSessionKey,
|
|
});
|
|
}
|
|
|
|
if (!reply) {
|
|
reply = await readLatestAssistantReply({
|
|
sessionKey: params.childSessionKey,
|
|
});
|
|
}
|
|
|
|
if (!outcome) outcome = { status: "unknown" };
|
|
|
|
// Build stats
|
|
const statsLine = await buildSubagentStatsLine({
|
|
sessionKey: params.childSessionKey,
|
|
startedAt: params.startedAt,
|
|
endedAt: params.endedAt,
|
|
});
|
|
|
|
// Build status label
|
|
const statusLabel =
|
|
outcome.status === "ok"
|
|
? "completed successfully"
|
|
: outcome.status === "timeout"
|
|
? "timed out"
|
|
: outcome.status === "error"
|
|
? buildErrorStatusLabel(outcome)
|
|
: "finished with unknown status";
|
|
|
|
// Build instructional message for main agent
|
|
const taskLabel = params.label || params.task || "background task";
|
|
const triggerMessage = [
|
|
`A background task "${taskLabel}" just ${statusLabel}.`,
|
|
"",
|
|
"Findings:",
|
|
reply || "(no output)",
|
|
"",
|
|
statsLine,
|
|
"",
|
|
"Summarize this naturally for the user. Keep it brief (1-2 sentences). Flow it into the conversation naturally.",
|
|
"Do not mention technical details like tokens, stats, or that this was a background task.",
|
|
"You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).",
|
|
].join("\n");
|
|
|
|
const queued = await maybeQueueSubagentAnnounce({
|
|
requesterSessionKey: params.requesterSessionKey,
|
|
triggerMessage,
|
|
summaryLine: taskLabel,
|
|
requesterOrigin,
|
|
});
|
|
if (queued === "steered") {
|
|
didAnnounce = true;
|
|
return true;
|
|
}
|
|
if (queued === "queued") {
|
|
didAnnounce = true;
|
|
return true;
|
|
}
|
|
|
|
// Send to main agent - it will respond in its own voice
|
|
let directOrigin = requesterOrigin;
|
|
if (!directOrigin) {
|
|
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
|
directOrigin = deliveryContextFromSession(entry);
|
|
}
|
|
await callGateway({
|
|
method: "agent",
|
|
params: {
|
|
sessionKey: params.requesterSessionKey,
|
|
message: triggerMessage,
|
|
deliver: true,
|
|
channel: directOrigin?.channel,
|
|
accountId: directOrigin?.accountId,
|
|
to: directOrigin?.to,
|
|
threadId:
|
|
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
|
? String(directOrigin.threadId)
|
|
: undefined,
|
|
idempotencyKey: crypto.randomUUID(),
|
|
},
|
|
expectFinal: true,
|
|
timeoutMs: 60_000,
|
|
});
|
|
|
|
didAnnounce = true;
|
|
} catch (err) {
|
|
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
|
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
|
} finally {
|
|
// Patch label after all writes complete
|
|
if (params.label) {
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.patch",
|
|
params: { key: params.childSessionKey, label: params.label },
|
|
timeoutMs: 10_000,
|
|
});
|
|
} catch {
|
|
// Best-effort
|
|
}
|
|
}
|
|
if (params.cleanup === "delete") {
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.delete",
|
|
params: { key: params.childSessionKey, deleteTranscript: true },
|
|
timeoutMs: 10_000,
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
return didAnnounce;
|
|
}
|