Add support for extracting and saving generated images from OpenRouter
models that return images (like gpt-5-image-mini).
- Extract base64 images from assistant responses via extractAssistantImages()
- Save generated images using standard media store ("generated" subdir)
- Send generated images to users via message channels
- Support both direct agent responses and subagent image generation
This enables use of OpenRouter's image generation models in Moltbot
conversations and subagent tasks.
537 lines
18 KiB
TypeScript
537 lines
18 KiB
TypeScript
import crypto from "node:crypto";
|
|
import path from "node:path";
|
|
|
|
import { loadConfig } from "../config/config.js";
|
|
import {
|
|
loadSessionStore,
|
|
resolveAgentIdFromSessionKey,
|
|
resolveMainSessionKey,
|
|
resolveStorePath,
|
|
} from "../config/sessions.js";
|
|
import { normalizeMainKey } from "../routing/session-key.js";
|
|
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
|
|
import { callGateway } from "../gateway/call.js";
|
|
import { saveMediaBuffer } from "../media/store.js";
|
|
import { defaultRuntime } from "../runtime.js";
|
|
import {
|
|
type DeliveryContext,
|
|
deliveryContextFromSession,
|
|
mergeDeliveryContext,
|
|
normalizeDeliveryContext,
|
|
} from "../utils/delivery-context.js";
|
|
import { isEmbeddedPiRunActive, queueEmbeddedPiMessage } from "./pi-embedded.js";
|
|
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
|
|
import { readLatestAssistantReplyWithMedia } from "./tools/agent-step.js";
|
|
|
|
/**
|
|
* Save base64 image data from subagent image generation models.
|
|
* Uses the standard media store with "generated" subdirectory.
|
|
*/
|
|
async function saveGeneratedImage(image: {
|
|
mimeType: string;
|
|
data: string;
|
|
}): Promise<string | null> {
|
|
try {
|
|
const buffer = Buffer.from(image.data, "base64");
|
|
const saved = await saveMediaBuffer(buffer, image.mimeType, "generated");
|
|
return saved.path;
|
|
} catch {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
function formatDurationShort(valueMs?: number) {
|
|
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) return undefined;
|
|
const totalSeconds = Math.round(valueMs / 1000);
|
|
const hours = Math.floor(totalSeconds / 3600);
|
|
const minutes = Math.floor((totalSeconds % 3600) / 60);
|
|
const seconds = totalSeconds % 60;
|
|
if (hours > 0) return `${hours}h${minutes}m`;
|
|
if (minutes > 0) return `${minutes}m${seconds}s`;
|
|
return `${seconds}s`;
|
|
}
|
|
|
|
function formatTokenCount(value?: number) {
|
|
if (!value || !Number.isFinite(value)) return "0";
|
|
if (value >= 1_000_000) return `${(value / 1_000_000).toFixed(1)}m`;
|
|
if (value >= 1_000) return `${(value / 1_000).toFixed(1)}k`;
|
|
return String(Math.round(value));
|
|
}
|
|
|
|
function formatUsd(value?: number) {
|
|
if (value === undefined || !Number.isFinite(value)) return undefined;
|
|
if (value >= 1) return `$${value.toFixed(2)}`;
|
|
if (value >= 0.01) return `$${value.toFixed(2)}`;
|
|
return `$${value.toFixed(4)}`;
|
|
}
|
|
|
|
function resolveModelCost(params: {
|
|
provider?: string;
|
|
model?: string;
|
|
config: ReturnType<typeof loadConfig>;
|
|
}):
|
|
| {
|
|
input: number;
|
|
output: number;
|
|
cacheRead: number;
|
|
cacheWrite: number;
|
|
}
|
|
| undefined {
|
|
const provider = params.provider?.trim();
|
|
const model = params.model?.trim();
|
|
if (!provider || !model) return undefined;
|
|
const models = params.config.models?.providers?.[provider]?.models ?? [];
|
|
const entry = models.find((candidate) => candidate.id === model);
|
|
return entry?.cost;
|
|
}
|
|
|
|
async function waitForSessionUsage(params: { sessionKey: string }) {
|
|
const cfg = loadConfig();
|
|
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
let entry = loadSessionStore(storePath)[params.sessionKey];
|
|
if (!entry) return { entry, storePath };
|
|
const hasTokens = () =>
|
|
entry &&
|
|
(typeof entry.totalTokens === "number" ||
|
|
typeof entry.inputTokens === "number" ||
|
|
typeof entry.outputTokens === "number");
|
|
if (hasTokens()) return { entry, storePath };
|
|
for (let attempt = 0; attempt < 4; attempt += 1) {
|
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
|
entry = loadSessionStore(storePath)[params.sessionKey];
|
|
if (hasTokens()) break;
|
|
}
|
|
return { entry, storePath };
|
|
}
|
|
|
|
type DeliveryContextSource = Parameters<typeof deliveryContextFromSession>[0];
|
|
|
|
function resolveAnnounceOrigin(
|
|
entry?: DeliveryContextSource,
|
|
requesterOrigin?: DeliveryContext,
|
|
): DeliveryContext | undefined {
|
|
return mergeDeliveryContext(deliveryContextFromSession(entry), requesterOrigin);
|
|
}
|
|
|
|
async function sendAnnounce(item: AnnounceQueueItem) {
|
|
const origin = item.origin;
|
|
const threadId =
|
|
origin?.threadId != null && origin.threadId !== "" ? String(origin.threadId) : undefined;
|
|
await callGateway({
|
|
method: "agent",
|
|
params: {
|
|
sessionKey: item.sessionKey,
|
|
message: item.prompt,
|
|
channel: origin?.channel,
|
|
accountId: origin?.accountId,
|
|
to: origin?.to,
|
|
threadId,
|
|
deliver: true,
|
|
idempotencyKey: crypto.randomUUID(),
|
|
},
|
|
expectFinal: true,
|
|
timeoutMs: 60_000,
|
|
});
|
|
}
|
|
|
|
function resolveRequesterStoreKey(
|
|
cfg: ReturnType<typeof loadConfig>,
|
|
requesterSessionKey: string,
|
|
): string {
|
|
const raw = requesterSessionKey.trim();
|
|
if (!raw) return raw;
|
|
if (raw === "global" || raw === "unknown") return raw;
|
|
if (raw.startsWith("agent:")) return raw;
|
|
const mainKey = normalizeMainKey(cfg.session?.mainKey);
|
|
if (raw === "main" || raw === mainKey) {
|
|
return resolveMainSessionKey(cfg);
|
|
}
|
|
const agentId = resolveAgentIdFromSessionKey(raw);
|
|
return `agent:${agentId}:${raw}`;
|
|
}
|
|
|
|
function loadRequesterSessionEntry(requesterSessionKey: string) {
|
|
const cfg = loadConfig();
|
|
const canonicalKey = resolveRequesterStoreKey(cfg, requesterSessionKey);
|
|
const agentId = resolveAgentIdFromSessionKey(canonicalKey);
|
|
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
|
const store = loadSessionStore(storePath);
|
|
const entry = store[canonicalKey];
|
|
return { cfg, entry, canonicalKey };
|
|
}
|
|
|
|
async function maybeQueueSubagentAnnounce(params: {
|
|
requesterSessionKey: string;
|
|
triggerMessage: string;
|
|
summaryLine?: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
}): Promise<"steered" | "queued" | "none"> {
|
|
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
|
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
|
|
const sessionId = entry?.sessionId;
|
|
if (!sessionId) return "none";
|
|
|
|
const queueSettings = resolveQueueSettings({
|
|
cfg,
|
|
channel: entry?.channel ?? entry?.lastChannel,
|
|
sessionEntry: entry,
|
|
});
|
|
const isActive = isEmbeddedPiRunActive(sessionId);
|
|
|
|
const shouldSteer = queueSettings.mode === "steer" || queueSettings.mode === "steer-backlog";
|
|
if (shouldSteer) {
|
|
const steered = queueEmbeddedPiMessage(sessionId, params.triggerMessage);
|
|
if (steered) return "steered";
|
|
}
|
|
|
|
const shouldFollowup =
|
|
queueSettings.mode === "followup" ||
|
|
queueSettings.mode === "collect" ||
|
|
queueSettings.mode === "steer-backlog" ||
|
|
queueSettings.mode === "interrupt";
|
|
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
|
|
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
|
|
enqueueAnnounce({
|
|
key: canonicalKey,
|
|
item: {
|
|
prompt: params.triggerMessage,
|
|
summaryLine: params.summaryLine,
|
|
enqueuedAt: Date.now(),
|
|
sessionKey: canonicalKey,
|
|
origin,
|
|
},
|
|
settings: queueSettings,
|
|
send: sendAnnounce,
|
|
});
|
|
return "queued";
|
|
}
|
|
|
|
return "none";
|
|
}
|
|
|
|
async function buildSubagentStatsLine(params: {
|
|
sessionKey: string;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
}) {
|
|
const cfg = loadConfig();
|
|
const { entry, storePath } = await waitForSessionUsage({
|
|
sessionKey: params.sessionKey,
|
|
});
|
|
|
|
const sessionId = entry?.sessionId;
|
|
const transcriptPath =
|
|
sessionId && storePath ? path.join(path.dirname(storePath), `${sessionId}.jsonl`) : undefined;
|
|
|
|
const input = entry?.inputTokens;
|
|
const output = entry?.outputTokens;
|
|
const total =
|
|
entry?.totalTokens ??
|
|
(typeof input === "number" && typeof output === "number" ? input + output : undefined);
|
|
const runtimeMs =
|
|
typeof params.startedAt === "number" && typeof params.endedAt === "number"
|
|
? Math.max(0, params.endedAt - params.startedAt)
|
|
: undefined;
|
|
|
|
const provider = entry?.modelProvider;
|
|
const model = entry?.model;
|
|
const costConfig = resolveModelCost({ provider, model, config: cfg });
|
|
const cost =
|
|
costConfig && typeof input === "number" && typeof output === "number"
|
|
? (input * costConfig.input + output * costConfig.output) / 1_000_000
|
|
: undefined;
|
|
|
|
const parts: string[] = [];
|
|
const runtime = formatDurationShort(runtimeMs);
|
|
parts.push(`runtime ${runtime ?? "n/a"}`);
|
|
if (typeof total === "number") {
|
|
const inputText = typeof input === "number" ? formatTokenCount(input) : "n/a";
|
|
const outputText = typeof output === "number" ? formatTokenCount(output) : "n/a";
|
|
const totalText = formatTokenCount(total);
|
|
parts.push(`tokens ${totalText} (in ${inputText} / out ${outputText})`);
|
|
} else {
|
|
parts.push("tokens n/a");
|
|
}
|
|
const costText = formatUsd(cost);
|
|
if (costText) parts.push(`est ${costText}`);
|
|
parts.push(`sessionKey ${params.sessionKey}`);
|
|
if (sessionId) parts.push(`sessionId ${sessionId}`);
|
|
if (transcriptPath) parts.push(`transcript ${transcriptPath}`);
|
|
|
|
return `Stats: ${parts.join(" \u2022 ")}`;
|
|
}
|
|
|
|
export function buildSubagentSystemPrompt(params: {
|
|
requesterSessionKey?: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
childSessionKey: string;
|
|
label?: string;
|
|
task?: string;
|
|
}) {
|
|
const taskText =
|
|
typeof params.task === "string" && params.task.trim()
|
|
? params.task.replace(/\s+/g, " ").trim()
|
|
: "{{TASK_DESCRIPTION}}";
|
|
const lines = [
|
|
"# Subagent Context",
|
|
"",
|
|
"You are a **subagent** spawned by the main agent for a specific task.",
|
|
"",
|
|
"## Your Role",
|
|
`- You were created to handle: ${taskText}`,
|
|
"- Complete this task. That's your entire purpose.",
|
|
"- You are NOT the main agent. Don't try to be.",
|
|
"",
|
|
"## Rules",
|
|
"1. **Stay focused** - Do your assigned task, nothing else",
|
|
"2. **Complete the task** - Your final message will be automatically reported to the main agent",
|
|
"3. **Don't initiate** - No heartbeats, no proactive actions, no side quests",
|
|
"4. **Be ephemeral** - You may be terminated after task completion. That's fine.",
|
|
"",
|
|
"## Output Format",
|
|
"When complete, your final response should include:",
|
|
"- What you accomplished or found",
|
|
"- Any relevant details the main agent should know",
|
|
"- Keep it concise but informative",
|
|
"",
|
|
"## What You DON'T Do",
|
|
"- NO user conversations (that's main agent's job)",
|
|
"- NO external messages (email, tweets, etc.) unless explicitly tasked",
|
|
"- NO cron jobs or persistent state",
|
|
"- NO pretending to be the main agent",
|
|
"- NO using the `message` tool directly",
|
|
"",
|
|
"## Session Context",
|
|
params.label ? `- Label: ${params.label}` : undefined,
|
|
params.requesterSessionKey ? `- Requester session: ${params.requesterSessionKey}.` : undefined,
|
|
params.requesterOrigin?.channel
|
|
? `- Requester channel: ${params.requesterOrigin.channel}.`
|
|
: undefined,
|
|
`- Your session: ${params.childSessionKey}.`,
|
|
"",
|
|
].filter((line): line is string => line !== undefined);
|
|
return lines.join("\n");
|
|
}
|
|
|
|
export type SubagentRunOutcome = {
|
|
status: "ok" | "error" | "timeout" | "unknown";
|
|
error?: string;
|
|
};
|
|
|
|
export async function runSubagentAnnounceFlow(params: {
|
|
childSessionKey: string;
|
|
childRunId: string;
|
|
requesterSessionKey: string;
|
|
requesterOrigin?: DeliveryContext;
|
|
requesterDisplayKey: string;
|
|
task: string;
|
|
timeoutMs: number;
|
|
cleanup: "delete" | "keep";
|
|
roundOneReply?: string;
|
|
waitForCompletion?: boolean;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
label?: string;
|
|
outcome?: SubagentRunOutcome;
|
|
}): Promise<boolean> {
|
|
let didAnnounce = false;
|
|
try {
|
|
const requesterOrigin = normalizeDeliveryContext(params.requesterOrigin);
|
|
let reply = params.roundOneReply;
|
|
let replyImages: Array<{ mimeType: string; data: string }> | undefined;
|
|
let outcome: SubagentRunOutcome | undefined = params.outcome;
|
|
if (!reply && params.waitForCompletion !== false) {
|
|
const waitMs = Math.min(params.timeoutMs, 60_000);
|
|
const wait = (await callGateway({
|
|
method: "agent.wait",
|
|
params: {
|
|
runId: params.childRunId,
|
|
timeoutMs: waitMs,
|
|
},
|
|
timeoutMs: waitMs + 2000,
|
|
})) as {
|
|
status?: string;
|
|
error?: string;
|
|
startedAt?: number;
|
|
endedAt?: number;
|
|
};
|
|
if (wait?.status === "timeout") {
|
|
outcome = { status: "timeout" };
|
|
} else if (wait?.status === "error") {
|
|
outcome = { status: "error", error: wait.error };
|
|
} else if (wait?.status === "ok") {
|
|
outcome = { status: "ok" };
|
|
}
|
|
if (typeof wait?.startedAt === "number" && !params.startedAt) {
|
|
params.startedAt = wait.startedAt;
|
|
}
|
|
if (typeof wait?.endedAt === "number" && !params.endedAt) {
|
|
params.endedAt = wait.endedAt;
|
|
}
|
|
if (wait?.status === "timeout") {
|
|
if (!outcome) outcome = { status: "timeout" };
|
|
}
|
|
// Use the new function that also extracts images
|
|
const replyContent = await readLatestAssistantReplyWithMedia({
|
|
sessionKey: params.childSessionKey,
|
|
});
|
|
reply = replyContent.text;
|
|
replyImages = replyContent.images;
|
|
}
|
|
|
|
if (!reply && !replyImages) {
|
|
const replyContent = await readLatestAssistantReplyWithMedia({
|
|
sessionKey: params.childSessionKey,
|
|
});
|
|
reply = replyContent.text;
|
|
replyImages = replyContent.images;
|
|
}
|
|
|
|
if (!outcome) outcome = { status: "unknown" };
|
|
|
|
// Build stats
|
|
const statsLine = await buildSubagentStatsLine({
|
|
sessionKey: params.childSessionKey,
|
|
startedAt: params.startedAt,
|
|
endedAt: params.endedAt,
|
|
});
|
|
|
|
// Handle generated images - send them directly to the user
|
|
const savedImagePaths: string[] = [];
|
|
if (replyImages && replyImages.length > 0) {
|
|
for (const img of replyImages) {
|
|
const filePath = await saveGeneratedImage(img);
|
|
if (filePath) {
|
|
savedImagePaths.push(filePath);
|
|
}
|
|
}
|
|
// Send images directly to the user if we have a delivery context
|
|
if (savedImagePaths.length > 0 && requesterOrigin?.to && requesterOrigin?.channel) {
|
|
try {
|
|
await callGateway({
|
|
method: "send",
|
|
params: {
|
|
to: requesterOrigin.to,
|
|
message: "Here's what I generated:", // Non-empty message required
|
|
mediaUrls: savedImagePaths,
|
|
channel: requesterOrigin.channel,
|
|
accountId: requesterOrigin.accountId,
|
|
idempotencyKey: crypto.randomUUID(),
|
|
},
|
|
timeoutMs: 30_000,
|
|
});
|
|
defaultRuntime.log(
|
|
`[subagent] Images sent: ${savedImagePaths.length} image(s) to ${requesterOrigin.to}`,
|
|
);
|
|
} catch (err) {
|
|
defaultRuntime.error?.(`Failed to send subagent images: ${String(err)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build status label
|
|
const statusLabel =
|
|
outcome.status === "ok"
|
|
? "completed successfully"
|
|
: outcome.status === "timeout"
|
|
? "timed out"
|
|
: outcome.status === "error"
|
|
? `failed: ${outcome.error || "unknown error"}`
|
|
: "finished with unknown status";
|
|
|
|
// Build instructional message for main agent
|
|
const taskLabel = params.label || params.task || "background task";
|
|
// If we sent images, mention it in the findings
|
|
const imageNote =
|
|
savedImagePaths.length > 0
|
|
? `\n[${savedImagePaths.length} image(s) were generated and sent to the user]`
|
|
: "";
|
|
const triggerMessage = [
|
|
`A background task "${taskLabel}" just ${statusLabel}.`,
|
|
"",
|
|
"Findings:",
|
|
reply || "(no output)",
|
|
imageNote,
|
|
"",
|
|
statsLine,
|
|
"",
|
|
"Summarize this naturally for the user. Keep it brief (1-2 sentences). Flow it into the conversation naturally.",
|
|
"Do not mention technical details like tokens, stats, or that this was a background task.",
|
|
savedImagePaths.length > 0
|
|
? "The generated image(s) have already been sent to the user. Just acknowledge the completion naturally."
|
|
: "You can respond with NO_REPLY if no announcement is needed (e.g., internal task with no user-facing result).",
|
|
].join("\n");
|
|
|
|
const queued = await maybeQueueSubagentAnnounce({
|
|
requesterSessionKey: params.requesterSessionKey,
|
|
triggerMessage,
|
|
summaryLine: taskLabel,
|
|
requesterOrigin,
|
|
});
|
|
if (queued === "steered") {
|
|
didAnnounce = true;
|
|
return true;
|
|
}
|
|
if (queued === "queued") {
|
|
didAnnounce = true;
|
|
return true;
|
|
}
|
|
|
|
// Send to main agent - it will respond in its own voice
|
|
let directOrigin = requesterOrigin;
|
|
if (!directOrigin) {
|
|
const { entry } = loadRequesterSessionEntry(params.requesterSessionKey);
|
|
directOrigin = deliveryContextFromSession(entry);
|
|
}
|
|
await callGateway({
|
|
method: "agent",
|
|
params: {
|
|
sessionKey: params.requesterSessionKey,
|
|
message: triggerMessage,
|
|
deliver: true,
|
|
channel: directOrigin?.channel,
|
|
accountId: directOrigin?.accountId,
|
|
to: directOrigin?.to,
|
|
threadId:
|
|
directOrigin?.threadId != null && directOrigin.threadId !== ""
|
|
? String(directOrigin.threadId)
|
|
: undefined,
|
|
idempotencyKey: crypto.randomUUID(),
|
|
},
|
|
expectFinal: true,
|
|
timeoutMs: 60_000,
|
|
});
|
|
|
|
didAnnounce = true;
|
|
} catch (err) {
|
|
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
|
|
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.
|
|
} finally {
|
|
// Patch label after all writes complete
|
|
if (params.label) {
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.patch",
|
|
params: { key: params.childSessionKey, label: params.label },
|
|
timeoutMs: 10_000,
|
|
});
|
|
} catch {
|
|
// Best-effort
|
|
}
|
|
}
|
|
if (params.cleanup === "delete") {
|
|
try {
|
|
await callGateway({
|
|
method: "sessions.delete",
|
|
params: { key: params.childSessionKey, deleteTranscript: true },
|
|
timeoutMs: 10_000,
|
|
});
|
|
} catch {
|
|
// ignore
|
|
}
|
|
}
|
|
}
|
|
return didAnnounce;
|
|
}
|