fix: Telegram channel auto-reconnect on getUpdates timeout (#4617)
- Add explicit isGetUpdatesTimeout() check for timeout errors - Fix runner.task() normal completion to restart with backoff instead of exiting - Enhance error logging to distinguish timeout vs network errors - Add test cases for timeout recovery and normal stop scenarios Fixes #4617
This commit is contained in:
parent
4be5c58c94
commit
d610088f6f
@ -184,4 +184,47 @@ describe("monitorTelegramProvider (grammY)", () => {
|
|||||||
|
|
||||||
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
|
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("retries on getUpdates timeout errors", async () => {
|
||||||
|
const timeoutError = Object.assign(
|
||||||
|
new Error("Request to 'getUpdates' timed out after 30 seconds"),
|
||||||
|
{
|
||||||
|
method: "getUpdates",
|
||||||
|
description: "Request timed out",
|
||||||
|
},
|
||||||
|
);
|
||||||
|
runSpy
|
||||||
|
.mockImplementationOnce(() => ({
|
||||||
|
task: () => Promise.reject(timeoutError),
|
||||||
|
stop: vi.fn(),
|
||||||
|
}))
|
||||||
|
.mockImplementationOnce(() => ({
|
||||||
|
task: () => Promise.resolve(),
|
||||||
|
stop: vi.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
await monitorTelegramProvider({ token: "tok" });
|
||||||
|
|
||||||
|
expect(computeBackoff).toHaveBeenCalled();
|
||||||
|
expect(sleepWithAbort).toHaveBeenCalled();
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("restarts when runner exhausts retries and stops normally", async () => {
|
||||||
|
runSpy
|
||||||
|
.mockImplementationOnce(() => ({
|
||||||
|
task: () => Promise.resolve(), // Runner stopped normally after exhausting retries
|
||||||
|
stop: vi.fn(),
|
||||||
|
}))
|
||||||
|
.mockImplementationOnce(() => ({
|
||||||
|
task: () => Promise.resolve(),
|
||||||
|
stop: vi.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
await monitorTelegramProvider({ token: "tok" });
|
||||||
|
|
||||||
|
expect(computeBackoff).toHaveBeenCalled();
|
||||||
|
expect(sleepWithAbort).toHaveBeenCalled();
|
||||||
|
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -74,6 +74,22 @@ const isGetUpdatesConflict = (err: unknown) => {
|
|||||||
return haystack.includes("getupdates");
|
return haystack.includes("getupdates");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const isGetUpdatesTimeout = (err: unknown) => {
|
||||||
|
if (!err || typeof err !== "object") return false;
|
||||||
|
const typed = err as {
|
||||||
|
description?: string;
|
||||||
|
method?: string;
|
||||||
|
message?: string;
|
||||||
|
error_description?: string;
|
||||||
|
};
|
||||||
|
const haystack = [typed.method, typed.description, typed.message, typed.error_description]
|
||||||
|
.filter((value): value is string => typeof value === "string")
|
||||||
|
.join(" ")
|
||||||
|
.toLowerCase();
|
||||||
|
|
||||||
|
return haystack.includes("getupdates") && haystack.includes("timeout");
|
||||||
|
};
|
||||||
|
|
||||||
const NETWORK_ERROR_SNIPPETS = [
|
const NETWORK_ERROR_SNIPPETS = [
|
||||||
"fetch failed",
|
"fetch failed",
|
||||||
"network",
|
"network",
|
||||||
@ -168,20 +184,41 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
|||||||
try {
|
try {
|
||||||
// runner.task() returns a promise that resolves when the runner stops
|
// runner.task() returns a promise that resolves when the runner stops
|
||||||
await runner.task();
|
await runner.task();
|
||||||
return;
|
// Runner stopped normally (aborted or exhausted retries).
|
||||||
|
// If not aborted, this is a transient failure - retry with backoff.
|
||||||
|
if (opts.abortSignal?.aborted) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// grammY runner exhausted its maxRetryTime - treat as recoverable network error
|
||||||
|
restartAttempts += 1;
|
||||||
|
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||||
|
(opts.runtime?.error ?? console.error)(
|
||||||
|
`Telegram polling stopped after retry timeout; restarting in ${formatDurationMs(delayMs)}.`,
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||||
|
} catch (sleepErr) {
|
||||||
|
if (opts.abortSignal?.aborted) return;
|
||||||
|
throw sleepErr;
|
||||||
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
if (opts.abortSignal?.aborted) {
|
if (opts.abortSignal?.aborted) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
const isConflict = isGetUpdatesConflict(err);
|
const isConflict = isGetUpdatesConflict(err);
|
||||||
|
const isTimeout = isGetUpdatesTimeout(err);
|
||||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||||
const isNetworkError = isNetworkRelatedError(err);
|
const isNetworkError = isNetworkRelatedError(err);
|
||||||
if (!isConflict && !isRecoverable && !isNetworkError) {
|
if (!isConflict && !isTimeout && !isRecoverable && !isNetworkError) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
restartAttempts += 1;
|
restartAttempts += 1;
|
||||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
const reason = isConflict
|
||||||
|
? "getUpdates conflict"
|
||||||
|
: isTimeout
|
||||||
|
? "getUpdates timeout"
|
||||||
|
: "network error";
|
||||||
const errMsg = formatErrorMessage(err);
|
const errMsg = formatErrorMessage(err);
|
||||||
(opts.runtime?.error ?? console.error)(
|
(opts.runtime?.error ?? console.error)(
|
||||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user