This commit is contained in:
Shahil kadia 2026-01-29 19:00:16 +00:00 committed by GitHub
commit e1260ec695
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 103 additions and 1 deletions

View File

@ -367,6 +367,7 @@ export async function runEmbeddedPiAgent(
log.warn(
`context overflow detected; attempting auto-compaction for ${provider}/${modelId}`,
);
await params.onCompactionStart?.();
overflowCompactionAttempted = true;
const compactResult = await compactEmbeddedPiSessionDirect({
sessionId: params.sessionId,

View File

@ -86,6 +86,7 @@ export type RunEmbeddedPiAgentParams = {
onBlockReplyFlush?: () => void | Promise<void>;
blockReplyBreak?: "text_end" | "message_end";
blockReplyChunking?: BlockReplyChunking;
onCompactionStart?: () => void | Promise<void>;
onReasoningStream?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onToolResult?: (payload: { text?: string; mediaUrls?: string[] }) => void | Promise<void>;
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;

View File

@ -276,6 +276,13 @@ export async function runAgentTurnWithFallback(params: {
abortSignal: params.opts?.abortSignal,
blockReplyBreak: params.resolvedBlockStreamingBreak,
blockReplyChunking: params.blockReplyChunking,
onCompactionStart: async () => {
if (params.opts?.onPartialReply) {
await params.opts.onPartialReply({
text: "🧹 Compacting context, please wait...",
});
}
},
onPartialReply: allowPartialStream
? async (payload) => {
const textForTyping = await handlePartialForTyping(payload);

View File

@ -5,11 +5,13 @@ import process from "node:process";
import { applyCliProfileEnv, parseCliProfileArgs } from "./cli/profile.js";
import { isTruthyEnvValue } from "./infra/env.js";
import { installGlobalFetchSanitizer } from "./infra/fetch.js";
import { installProcessWarningFilter } from "./infra/warnings.js";
import { attachChildProcessBridge } from "./process/child-process-bridge.js";
process.title = "moltbot";
installProcessWarningFilter();
installGlobalFetchSanitizer();
if (process.argv.includes("--no-color")) {
process.env.NO_COLOR = "1";

View File

@ -4,6 +4,90 @@ type FetchWithPreconnect = typeof fetch & {
type RequestInitWithDuplex = RequestInit & { duplex?: "half" };
function sanitizeHeaderValue(value: string): string {
// fast path for common ascii
let isClean = true;
for (let i = 0; i < value.length; i++) {
if (value.charCodeAt(i) > 255) {
isClean = false;
break;
}
}
if (isClean) return value; // eslint-disable-line no-control-regex
// Node's undici fetch crashes on header values > 255 (ByteString).
// We sanitize them by replacing non-latin1 chars with '?'.
return value.replace(/[^\u0000-\u00ff]/g, "?"); // eslint-disable-line no-control-regex
}
function sanitizeHeaders(init?: RequestInit): RequestInit | undefined {
if (!init || !init.headers) return init;
if (typeof Headers !== "undefined" && init.headers instanceof Headers) {
// Headers object: iterate
const dirtyEntries: [string, string][] = [];
init.headers.forEach((value, key) => {
const sanitized = sanitizeHeaderValue(value);
if (sanitized !== value) {
dirtyEntries.push([key, sanitized]);
}
});
if (dirtyEntries.length > 0) {
const h = new Headers(init.headers);
for (const [k, v] of dirtyEntries) {
h.set(k, v);
}
return { ...init, headers: h };
}
return init; // No changes
}
if (Array.isArray(init.headers)) {
// Array of tuples
const dirtyIndices: [number, string][] = [];
for (let i = 0; i < init.headers.length; i++) {
const entry = init.headers[i];
if (entry.length >= 2) {
const val = entry[1];
const sanitized = sanitizeHeaderValue(val);
if (sanitized !== val) {
dirtyIndices.push([i, sanitized]);
}
}
}
if (dirtyIndices.length > 0) {
const next = [...init.headers];
for (const [i, v] of dirtyIndices) {
next[i] = [next[i][0], v];
}
return { ...init, headers: next };
}
return init;
}
// Record<string, string>
const rec = init.headers as Record<string, string>;
let nextRec: Record<string, string> | undefined;
for (const k in rec) {
const v = rec[k];
if (typeof v === "string") {
const s = sanitizeHeaderValue(v);
if (s !== v) {
if (!nextRec) nextRec = { ...rec };
nextRec[k] = s;
}
}
}
if (nextRec) {
return { ...init, headers: nextRec };
}
return init;
}
function withDuplex(
init: RequestInit | undefined,
input: RequestInfo | URL,
@ -23,7 +107,8 @@ function withDuplex(
export function wrapFetchWithAbortSignal(fetchImpl: typeof fetch): typeof fetch {
const wrapped = ((input: RequestInfo | URL, init?: RequestInit) => {
const patchedInit = withDuplex(init, input);
const sanitizedInit = sanitizeHeaders(init);
const patchedInit = withDuplex(sanitizedInit, input);
const signal = patchedInit?.signal;
if (!signal) return fetchImpl(input, patchedInit);
if (typeof AbortSignal !== "undefined" && signal instanceof AbortSignal) {
@ -65,3 +150,9 @@ export function resolveFetch(fetchImpl?: typeof fetch): typeof fetch | undefined
if (!resolved) return undefined;
return wrapFetchWithAbortSignal(resolved);
}
export function installGlobalFetchSanitizer(): void {
if (globalThis.fetch) {
globalThis.fetch = wrapFetchWithAbortSignal(globalThis.fetch);
}
}