diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 870453f38..a3ea34154 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -367,6 +367,7 @@ export async function runEmbeddedPiAgent( log.warn( `context overflow detected; attempting auto-compaction for ${provider}/${modelId}`, ); + await params.onCompactionStart?.(); overflowCompactionAttempted = true; const compactResult = await compactEmbeddedPiSessionDirect({ sessionId: params.sessionId, diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index b21a5e3fc..4151075e3 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -86,6 +86,7 @@ export type RunEmbeddedPiAgentParams = { onBlockReplyFlush?: () => void | Promise; blockReplyBreak?: "text_end" | "message_end"; blockReplyChunking?: BlockReplyChunking; + onCompactionStart?: () => void | Promise; onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise; onAgentEvent?: (evt: { stream: string; data: Record }) => void; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index f86ecb8a9..c7a2b86ff 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -276,6 +276,13 @@ export async function runAgentTurnWithFallback(params: { abortSignal: params.opts?.abortSignal, blockReplyBreak: params.resolvedBlockStreamingBreak, blockReplyChunking: params.blockReplyChunking, + onCompactionStart: async () => { + if (params.opts?.onPartialReply) { + await params.opts.onPartialReply({ + text: "🧹 Compacting context, please wait...", + }); + } + }, onPartialReply: allowPartialStream ? async (payload) => { const textForTyping = await handlePartialForTyping(payload); diff --git a/src/entry.ts b/src/entry.ts index 8cd1c5060..0e5a55fae 100644 --- a/src/entry.ts +++ b/src/entry.ts @@ -5,11 +5,13 @@ import process from "node:process"; import { applyCliProfileEnv, parseCliProfileArgs } from "./cli/profile.js"; import { isTruthyEnvValue } from "./infra/env.js"; +import { installGlobalFetchSanitizer } from "./infra/fetch.js"; import { installProcessWarningFilter } from "./infra/warnings.js"; import { attachChildProcessBridge } from "./process/child-process-bridge.js"; process.title = "moltbot"; installProcessWarningFilter(); +installGlobalFetchSanitizer(); if (process.argv.includes("--no-color")) { process.env.NO_COLOR = "1"; diff --git a/src/infra/fetch.ts b/src/infra/fetch.ts index 61012e485..9997d868e 100644 --- a/src/infra/fetch.ts +++ b/src/infra/fetch.ts @@ -4,6 +4,90 @@ type FetchWithPreconnect = typeof fetch & { type RequestInitWithDuplex = RequestInit & { duplex?: "half" }; +function sanitizeHeaderValue(value: string): string { + // fast path for common ascii + let isClean = true; + for (let i = 0; i < value.length; i++) { + if (value.charCodeAt(i) > 255) { + isClean = false; + break; + } + } + if (isClean) return value; // eslint-disable-line no-control-regex + + // Node's undici fetch crashes on header values > 255 (ByteString). + // We sanitize them by replacing non-latin1 chars with '?'. + return value.replace(/[^\u0000-\u00ff]/g, "?"); // eslint-disable-line no-control-regex +} + +function sanitizeHeaders(init?: RequestInit): RequestInit | undefined { + if (!init || !init.headers) return init; + + if (typeof Headers !== "undefined" && init.headers instanceof Headers) { + // Headers object: iterate + const dirtyEntries: [string, string][] = []; + init.headers.forEach((value, key) => { + const sanitized = sanitizeHeaderValue(value); + if (sanitized !== value) { + dirtyEntries.push([key, sanitized]); + } + }); + + if (dirtyEntries.length > 0) { + const h = new Headers(init.headers); + for (const [k, v] of dirtyEntries) { + h.set(k, v); + } + return { ...init, headers: h }; + } + return init; // No changes + } + + if (Array.isArray(init.headers)) { + // Array of tuples + const dirtyIndices: [number, string][] = []; + for (let i = 0; i < init.headers.length; i++) { + const entry = init.headers[i]; + if (entry.length >= 2) { + const val = entry[1]; + const sanitized = sanitizeHeaderValue(val); + if (sanitized !== val) { + dirtyIndices.push([i, sanitized]); + } + } + } + if (dirtyIndices.length > 0) { + const next = [...init.headers]; + for (const [i, v] of dirtyIndices) { + next[i] = [next[i][0], v]; + } + return { ...init, headers: next }; + } + return init; + } + + // Record + const rec = init.headers as Record; + let nextRec: Record | undefined; + + for (const k in rec) { + const v = rec[k]; + if (typeof v === "string") { + const s = sanitizeHeaderValue(v); + if (s !== v) { + if (!nextRec) nextRec = { ...rec }; + nextRec[k] = s; + } + } + } + + if (nextRec) { + return { ...init, headers: nextRec }; + } + + return init; +} + function withDuplex( init: RequestInit | undefined, input: RequestInfo | URL, @@ -23,7 +107,8 @@ function withDuplex( export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch { const wrapped = ((input: RequestInfo | URL, init?: RequestInit) => { - const patchedInit = withDuplex(init, input); + const sanitizedInit = sanitizeHeaders(init); + const patchedInit = withDuplex(sanitizedInit, input); const signal = patchedInit?.signal; if (!signal) return fetchImpl(input, patchedInit); if (typeof AbortSignal !== "undefined" && signal instanceof AbortSignal) { @@ -65,3 +150,9 @@ export function resolveFetch(fetchImpl?: typeof fetch): typeof fetch | undefined if (!resolved) return undefined; return wrapFetchWithAbortSignal(resolved); } + +export function installGlobalFetchSanitizer(): void { + if (globalThis.fetch) { + globalThis.fetch = wrapFetchWithAbortSignal(globalThis.fetch); + } +}