Gateway: tolerate transient network failures(#4425)

This commit is contained in:
selfboot 2026-01-30 14:50:59 +08:00
parent 4583f88626
commit 7698c29363
4 changed files with 134 additions and 82 deletions

View File

@ -100,6 +100,24 @@ describe("isTransientNetworkError", () => {
expect(isTransientNetworkError(error)).toBe(true); expect(isTransientNetworkError(error)).toBe(true);
}); });
it("returns true for MediaFetchError with fetch_failed code", () => {
const error = Object.assign(new Error("media fetch failed"), {
name: "MediaFetchError",
code: "fetch_failed",
});
expect(isTransientNetworkError(error)).toBe(true);
});
it("returns true for MediaFetchError that wraps a network cause", () => {
const cause = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" });
const error = Object.assign(new Error("media fetch failed"), {
name: "MediaFetchError",
code: "fetch_failed",
cause,
});
expect(isTransientNetworkError(error)).toBe(true);
});
it("returns false for regular errors without network codes", () => { it("returns false for regular errors without network codes", () => {
expect(isTransientNetworkError(new Error("Something went wrong"))).toBe(false); expect(isTransientNetworkError(new Error("Something went wrong"))).toBe(false);
expect(isTransientNetworkError(new TypeError("Cannot read property"))).toBe(false); expect(isTransientNetworkError(new TypeError("Cannot read property"))).toBe(false);

View File

@ -78,6 +78,16 @@ function isConfigError(err: unknown): boolean {
export function isTransientNetworkError(err: unknown): boolean { export function isTransientNetworkError(err: unknown): boolean {
if (!err) return false; if (!err) return false;
if (err && typeof err === "object") {
const name = "name" in err ? String(err.name) : "";
const code = extractErrorCodeWithCause(err);
if (name === "MediaFetchError" && code === "fetch_failed") {
const cause = getErrorCause(err);
if (cause && cause !== err) return isTransientNetworkError(cause);
return true;
}
}
const code = extractErrorCodeWithCause(err); const code = extractErrorCodeWithCause(err);
if (code && TRANSIENT_NETWORK_CODES.has(code)) return true; if (code && TRANSIENT_NETWORK_CODES.has(code)) return true;

View File

@ -13,10 +13,17 @@ export type MediaFetchErrorCode = "max_bytes" | "http_error" | "fetch_failed";
export class MediaFetchError extends Error { export class MediaFetchError extends Error {
readonly code: MediaFetchErrorCode; readonly code: MediaFetchErrorCode;
constructor(code: MediaFetchErrorCode, message: string) { constructor(code: MediaFetchErrorCode, message: string, options?: { cause?: unknown }) {
super(message); super(message);
this.code = code; this.code = code;
this.name = "MediaFetchError"; this.name = "MediaFetchError";
if (options?.cause !== undefined) {
try {
(this as { cause?: unknown }).cause = options.cause;
} catch {
// ignore if cause cannot be assigned
}
}
} }
} }
@ -74,7 +81,9 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise<Fetc
try { try {
res = await fetcher(url); res = await fetcher(url);
} catch (err) { } catch (err) {
throw new MediaFetchError("fetch_failed", `Failed to fetch media from ${url}: ${String(err)}`); throw new MediaFetchError("fetch_failed", `Failed to fetch media from ${url}: ${String(err)}`, {
cause: err,
});
} }
if (!res.ok) { if (!res.ok) {

View File

@ -5,6 +5,7 @@ import { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js"; import { formatErrorMessage } from "../infra/errors.js";
import { formatDurationMs } from "../infra/format-duration.js"; import { formatDurationMs } from "../infra/format-duration.js";
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { resolveTelegramAccount } from "./accounts.js"; import { resolveTelegramAccount } from "./accounts.js";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
@ -108,92 +109,106 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
opts.proxyFetch ?? opts.proxyFetch ??
(account.config.proxy ? makeProxyFetch(account.config.proxy as string) : undefined); (account.config.proxy ? makeProxyFetch(account.config.proxy as string) : undefined);
let lastUpdateId = await readTelegramUpdateOffset({ let unregisterUnhandled: (() => void) | null = null;
accountId: account.accountId, try {
}); unregisterUnhandled = registerUnhandledRejectionHandler((reason) => {
const persistUpdateId = async (updateId: number) => { if (!isRecoverableTelegramNetworkError(reason)) return false;
if (lastUpdateId !== null && updateId <= lastUpdateId) return; const errMsg = formatErrorMessage(reason);
lastUpdateId = updateId; (opts.runtime?.error ?? console.warn)(
try { `telegram: suppressed unhandled network rejection: ${errMsg}`,
await writeTelegramUpdateOffset({
accountId: account.accountId,
updateId,
});
} catch (err) {
(opts.runtime?.error ?? console.error)(
`telegram: failed to persist update offset: ${String(err)}`,
); );
} return true;
};
const bot = createTelegramBot({
token,
runtime: opts.runtime,
proxyFetch,
config: cfg,
accountId: account.accountId,
updateOffset: {
lastUpdateId,
onUpdateId: persistUpdateId,
},
});
if (opts.useWebhook) {
await startTelegramWebhook({
token,
accountId: account.accountId,
config: cfg,
path: opts.webhookPath,
port: opts.webhookPort,
secret: opts.webhookSecret,
runtime: opts.runtime as RuntimeEnv,
fetch: proxyFetch,
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
}); });
return;
}
// Use grammyjs/runner for concurrent update processing let lastUpdateId = await readTelegramUpdateOffset({
let restartAttempts = 0; accountId: account.accountId,
});
while (!opts.abortSignal?.aborted) { const persistUpdateId = async (updateId: number) => {
const runner = run(bot, createTelegramRunnerOptions(cfg)); if (lastUpdateId !== null && updateId <= lastUpdateId) return;
const stopOnAbort = () => { lastUpdateId = updateId;
if (opts.abortSignal?.aborted) { try {
void runner.stop(); await writeTelegramUpdateOffset({
accountId: account.accountId,
updateId,
});
} catch (err) {
(opts.runtime?.error ?? console.error)(
`telegram: failed to persist update offset: ${String(err)}`,
);
} }
}; };
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try { const bot = createTelegramBot({
// runner.task() returns a promise that resolves when the runner stops token,
await runner.task(); runtime: opts.runtime,
proxyFetch,
config: cfg,
accountId: account.accountId,
updateOffset: {
lastUpdateId,
onUpdateId: persistUpdateId,
},
});
if (opts.useWebhook) {
await startTelegramWebhook({
token,
accountId: account.accountId,
config: cfg,
path: opts.webhookPath,
port: opts.webhookPort,
secret: opts.webhookSecret,
runtime: opts.runtime as RuntimeEnv,
fetch: proxyFetch,
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
});
return; return;
} catch (err) {
if (opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isNetworkError = isNetworkRelatedError(err);
if (!isConflict && !isRecoverable && !isNetworkError) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
(opts.runtime?.error ?? console.error)(
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) return;
throw sleepErr;
}
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
} }
// Use grammyjs/runner for concurrent update processing
let restartAttempts = 0;
while (!opts.abortSignal?.aborted) {
const runner = run(bot, createTelegramRunnerOptions(cfg));
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
void runner.stop();
}
};
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
return;
} catch (err) {
if (opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isNetworkError = isNetworkRelatedError(err);
if (!isConflict && !isRecoverable && !isNetworkError) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
(opts.runtime?.error ?? console.error)(
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) return;
throw sleepErr;
}
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
}
}
} finally {
unregisterUnhandled?.();
} }
} }