From 7698c29363a326b5ac99fda543778603880b0602 Mon Sep 17 00:00:00 2001 From: selfboot Date: Fri, 30 Jan 2026 14:50:59 +0800 Subject: [PATCH] Gateway: tolerate transient network failures(#4425) --- src/infra/unhandled-rejections.test.ts | 18 +++ src/infra/unhandled-rejections.ts | 10 ++ src/media/fetch.ts | 13 +- src/telegram/monitor.ts | 175 ++++++++++++++----------- 4 files changed, 134 insertions(+), 82 deletions(-) diff --git a/src/infra/unhandled-rejections.test.ts b/src/infra/unhandled-rejections.test.ts index 1ec144ba1..d3656fea4 100644 --- a/src/infra/unhandled-rejections.test.ts +++ b/src/infra/unhandled-rejections.test.ts @@ -100,6 +100,24 @@ describe("isTransientNetworkError", () => { expect(isTransientNetworkError(error)).toBe(true); }); + it("returns true for MediaFetchError with fetch_failed code", () => { + const error = Object.assign(new Error("media fetch failed"), { + name: "MediaFetchError", + code: "fetch_failed", + }); + expect(isTransientNetworkError(error)).toBe(true); + }); + + it("returns true for MediaFetchError that wraps a network cause", () => { + const cause = Object.assign(new Error("timeout"), { code: "ETIMEDOUT" }); + const error = Object.assign(new Error("media fetch failed"), { + name: "MediaFetchError", + code: "fetch_failed", + cause, + }); + expect(isTransientNetworkError(error)).toBe(true); + }); + it("returns false for regular errors without network codes", () => { expect(isTransientNetworkError(new Error("Something went wrong"))).toBe(false); expect(isTransientNetworkError(new TypeError("Cannot read property"))).toBe(false); diff --git a/src/infra/unhandled-rejections.ts b/src/infra/unhandled-rejections.ts index d186c6a78..4439cd245 100644 --- a/src/infra/unhandled-rejections.ts +++ b/src/infra/unhandled-rejections.ts @@ -78,6 +78,16 @@ function isConfigError(err: unknown): boolean { export function isTransientNetworkError(err: unknown): boolean { if (!err) return false; + if (err && typeof err === "object") { + const name = "name" in err ? String(err.name) : ""; + const code = extractErrorCodeWithCause(err); + if (name === "MediaFetchError" && code === "fetch_failed") { + const cause = getErrorCause(err); + if (cause && cause !== err) return isTransientNetworkError(cause); + return true; + } + } + const code = extractErrorCodeWithCause(err); if (code && TRANSIENT_NETWORK_CODES.has(code)) return true; diff --git a/src/media/fetch.ts b/src/media/fetch.ts index 727ab7a5d..f5777892f 100644 --- a/src/media/fetch.ts +++ b/src/media/fetch.ts @@ -13,10 +13,17 @@ export type MediaFetchErrorCode = "max_bytes" | "http_error" | "fetch_failed"; export class MediaFetchError extends Error { readonly code: MediaFetchErrorCode; - constructor(code: MediaFetchErrorCode, message: string) { + constructor(code: MediaFetchErrorCode, message: string, options?: { cause?: unknown }) { super(message); this.code = code; this.name = "MediaFetchError"; + if (options?.cause !== undefined) { + try { + (this as { cause?: unknown }).cause = options.cause; + } catch { + // ignore if cause cannot be assigned + } + } } } @@ -74,7 +81,9 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise { - if (lastUpdateId !== null && updateId <= lastUpdateId) return; - lastUpdateId = updateId; - try { - await writeTelegramUpdateOffset({ - accountId: account.accountId, - updateId, - }); - } catch (err) { - (opts.runtime?.error ?? console.error)( - `telegram: failed to persist update offset: ${String(err)}`, + let unregisterUnhandled: (() => void) | null = null; + try { + unregisterUnhandled = registerUnhandledRejectionHandler((reason) => { + if (!isRecoverableTelegramNetworkError(reason)) return false; + const errMsg = formatErrorMessage(reason); + (opts.runtime?.error ?? console.warn)( + `telegram: suppressed unhandled network rejection: ${errMsg}`, ); - } - }; - - const bot = createTelegramBot({ - token, - runtime: opts.runtime, - proxyFetch, - config: cfg, - accountId: account.accountId, - updateOffset: { - lastUpdateId, - onUpdateId: persistUpdateId, - }, - }); - - if (opts.useWebhook) { - await startTelegramWebhook({ - token, - accountId: account.accountId, - config: cfg, - path: opts.webhookPath, - port: opts.webhookPort, - secret: opts.webhookSecret, - runtime: opts.runtime as RuntimeEnv, - fetch: proxyFetch, - abortSignal: opts.abortSignal, - publicUrl: opts.webhookUrl, + return true; }); - return; - } - // Use grammyjs/runner for concurrent update processing - let restartAttempts = 0; - - while (!opts.abortSignal?.aborted) { - const runner = run(bot, createTelegramRunnerOptions(cfg)); - const stopOnAbort = () => { - if (opts.abortSignal?.aborted) { - void runner.stop(); + let lastUpdateId = await readTelegramUpdateOffset({ + accountId: account.accountId, + }); + const persistUpdateId = async (updateId: number) => { + if (lastUpdateId !== null && updateId <= lastUpdateId) return; + lastUpdateId = updateId; + try { + await writeTelegramUpdateOffset({ + accountId: account.accountId, + updateId, + }); + } catch (err) { + (opts.runtime?.error ?? console.error)( + `telegram: failed to persist update offset: ${String(err)}`, + ); } }; - opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); - try { - // runner.task() returns a promise that resolves when the runner stops - await runner.task(); + + const bot = createTelegramBot({ + token, + runtime: opts.runtime, + proxyFetch, + config: cfg, + accountId: account.accountId, + updateOffset: { + lastUpdateId, + onUpdateId: persistUpdateId, + }, + }); + + if (opts.useWebhook) { + await startTelegramWebhook({ + token, + accountId: account.accountId, + config: cfg, + path: opts.webhookPath, + port: opts.webhookPort, + secret: opts.webhookSecret, + runtime: opts.runtime as RuntimeEnv, + fetch: proxyFetch, + abortSignal: opts.abortSignal, + publicUrl: opts.webhookUrl, + }); return; - } catch (err) { - if (opts.abortSignal?.aborted) { - throw err; - } - const isConflict = isGetUpdatesConflict(err); - const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); - const isNetworkError = isNetworkRelatedError(err); - if (!isConflict && !isRecoverable && !isNetworkError) { - throw err; - } - restartAttempts += 1; - const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); - const reason = isConflict ? "getUpdates conflict" : "network error"; - const errMsg = formatErrorMessage(err); - (opts.runtime?.error ?? console.error)( - `Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`, - ); - try { - await sleepWithAbort(delayMs, opts.abortSignal); - } catch (sleepErr) { - if (opts.abortSignal?.aborted) return; - throw sleepErr; - } - } finally { - opts.abortSignal?.removeEventListener("abort", stopOnAbort); } + + // Use grammyjs/runner for concurrent update processing + let restartAttempts = 0; + + while (!opts.abortSignal?.aborted) { + const runner = run(bot, createTelegramRunnerOptions(cfg)); + const stopOnAbort = () => { + if (opts.abortSignal?.aborted) { + void runner.stop(); + } + }; + opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); + try { + // runner.task() returns a promise that resolves when the runner stops + await runner.task(); + return; + } catch (err) { + if (opts.abortSignal?.aborted) { + throw err; + } + const isConflict = isGetUpdatesConflict(err); + const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); + const isNetworkError = isNetworkRelatedError(err); + if (!isConflict && !isRecoverable && !isNetworkError) { + throw err; + } + restartAttempts += 1; + const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); + const reason = isConflict ? "getUpdates conflict" : "network error"; + const errMsg = formatErrorMessage(err); + (opts.runtime?.error ?? console.error)( + `Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`, + ); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch (sleepErr) { + if (opts.abortSignal?.aborted) return; + throw sleepErr; + } + } finally { + opts.abortSignal?.removeEventListener("abort", stopOnAbort); + } + } + } finally { + unregisterUnhandled?.(); } }