From bc141e3e03dc3491c022d54c1c30d3de2aefe315 Mon Sep 17 00:00:00 2001 From: Jhony Sidney Date: Wed, 28 Jan 2026 06:50:09 +0000 Subject: [PATCH] fix(telegram): sanitize update offset + lock polling - Ignore/backup invalid persisted lastUpdateId instead of getting stuck\n- Add a per-bot poll lock to avoid multiple getUpdates loops\n- Wire lock + offset sanitize into telegram monitor --- src/telegram/monitor.ts | 95 +++++---- src/telegram/poll-lock.test.ts | 55 ++++++ src/telegram/poll-lock.ts | 241 +++++++++++++++++++++++ src/telegram/update-offset-store.test.ts | 29 +++ src/telegram/update-offset-store.ts | 29 ++- 5 files changed, 412 insertions(+), 37 deletions(-) create mode 100644 src/telegram/poll-lock.test.ts create mode 100644 src/telegram/poll-lock.ts diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index c3b3a5a2f..cff5b5021 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -11,6 +11,7 @@ import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { createTelegramBot } from "./bot.js"; import { isRecoverableTelegramNetworkError } from "./network-errors.js"; import { makeProxyFetch } from "./proxy.js"; +import { acquireTelegramPollLock } from "./poll-lock.js"; import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js"; import { startTelegramWebhook } from "./webhook.js"; @@ -93,6 +94,7 @@ const isNetworkRelatedError = (err: unknown) => { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const cfg = opts.config ?? loadConfig(); + const runtimeError = opts.runtime?.error ?? console.error; const account = resolveTelegramAccount({ cfg, accountId: opts.accountId, @@ -108,8 +110,28 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { opts.proxyFetch ?? (account.config.proxy ? makeProxyFetch(account.config.proxy as string) : undefined); + let pollLock: { release: () => Promise } | null = null; + try { + pollLock = await acquireTelegramPollLock({ + token, + accountId: account.accountId, + }); + } catch (err) { + runtimeError( + `[telegram] [${account.accountId}] polling lock unavailable; skipping provider start: ${String( + err, + )}`, + ); + return; + } + let lastUpdateId = await readTelegramUpdateOffset({ accountId: account.accountId, + onInvalid: ({ path, backupPath }) => { + runtimeError( + `[telegram] [${account.accountId}] invalid update offset at ${path}; moved to ${backupPath}`, + ); + }, }); const persistUpdateId = async (updateId: number) => { if (lastUpdateId !== null && updateId <= lastUpdateId) return; @@ -151,49 +173,52 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { abortSignal: opts.abortSignal, publicUrl: opts.webhookUrl, }); + await pollLock.release().catch(() => undefined); return; } // Use grammyjs/runner for concurrent update processing let restartAttempts = 0; - while (!opts.abortSignal?.aborted) { - const runner = run(bot, createTelegramRunnerOptions(cfg)); - const stopOnAbort = () => { - if (opts.abortSignal?.aborted) { - void runner.stop(); - } - }; - opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); - try { - // runner.task() returns a promise that resolves when the runner stops - await runner.task(); - return; - } catch (err) { - if (opts.abortSignal?.aborted) { - throw err; - } - const isConflict = isGetUpdatesConflict(err); - const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); - const isNetworkError = isNetworkRelatedError(err); - if (!isConflict && !isRecoverable && !isNetworkError) { - throw err; - } - restartAttempts += 1; - const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); - const reason = isConflict ? "getUpdates conflict" : "network error"; - const errMsg = formatErrorMessage(err); - (opts.runtime?.error ?? console.error)( - `Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`, - ); + try { + while (!opts.abortSignal?.aborted) { + const runner = run(bot, createTelegramRunnerOptions(cfg)); + const stopOnAbort = () => { + if (opts.abortSignal?.aborted) { + void runner.stop(); + } + }; + opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); try { - await sleepWithAbort(delayMs, opts.abortSignal); - } catch (sleepErr) { - if (opts.abortSignal?.aborted) return; - throw sleepErr; + // runner.task() returns a promise that resolves when the runner stops + await runner.task(); + return; + } catch (err) { + if (opts.abortSignal?.aborted) { + throw err; + } + const isConflict = isGetUpdatesConflict(err); + const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); + const isNetworkError = isNetworkRelatedError(err); + if (!isConflict && !isRecoverable && !isNetworkError) { + throw err; + } + restartAttempts += 1; + const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); + const reason = isConflict ? "getUpdates conflict" : "network error"; + const errMsg = formatErrorMessage(err); + runtimeError(`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch (sleepErr) { + if (opts.abortSignal?.aborted) return; + throw sleepErr; + } + } finally { + opts.abortSignal?.removeEventListener("abort", stopOnAbort); } - } finally { - opts.abortSignal?.removeEventListener("abort", stopOnAbort); } + } finally { + await pollLock.release().catch(() => undefined); } } diff --git a/src/telegram/poll-lock.test.ts b/src/telegram/poll-lock.test.ts new file mode 100644 index 000000000..3c8dde600 --- /dev/null +++ b/src/telegram/poll-lock.test.ts @@ -0,0 +1,55 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { acquireTelegramPollLock, TelegramPollLockError } from "./poll-lock.js"; + +async function withTempStateDir(fn: (dir: string) => Promise) { + const previous = process.env.CLAWDBOT_STATE_DIR; + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-telegram-lock-")); + process.env.CLAWDBOT_STATE_DIR = dir; + try { + return await fn(dir); + } finally { + if (previous === undefined) delete process.env.CLAWDBOT_STATE_DIR; + else process.env.CLAWDBOT_STATE_DIR = previous; + await fs.rm(dir, { recursive: true, force: true }); + } +} + +describe("telegram poll lock", () => { + it("prevents multiple concurrent pollers for same account+token", async () => { + await withTempStateDir(async () => { + const lock1 = await acquireTelegramPollLock({ + token: "123:abc", + accountId: "default", + timeoutMs: 50, + pollIntervalMs: 5, + staleMs: 10_000, + }); + + await expect( + acquireTelegramPollLock({ + token: "123:abc", + accountId: "default", + timeoutMs: 30, + pollIntervalMs: 5, + staleMs: 10_000, + }), + ).rejects.toBeInstanceOf(TelegramPollLockError); + + await lock1.release(); + + const lock2 = await acquireTelegramPollLock({ + token: "123:abc", + accountId: "default", + timeoutMs: 50, + pollIntervalMs: 5, + staleMs: 10_000, + }); + await lock2.release(); + }); + }); +}); diff --git a/src/telegram/poll-lock.ts b/src/telegram/poll-lock.ts new file mode 100644 index 000000000..cc885d61e --- /dev/null +++ b/src/telegram/poll-lock.ts @@ -0,0 +1,241 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import fsSync from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { resolveStateDir } from "../config/paths.js"; + +const DEFAULT_TIMEOUT_MS = 5000; +const DEFAULT_POLL_INTERVAL_MS = 100; +const DEFAULT_STALE_MS = 30_000; + +type LockPayload = { + pid: number; + createdAt: string; + accountId: string; + tokenHash: string; + startTime?: number; +}; + +export type TelegramPollLockHandle = { + lockPath: string; + unitKey: string; + release: () => Promise; +}; + +export class TelegramPollLockError extends Error { + constructor( + message: string, + public readonly cause?: unknown, + ) { + super(message); + this.name = "TelegramPollLockError"; + } +} + +function isAlive(pid: number): boolean { + if (!Number.isFinite(pid) || pid <= 0) return false; + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +function parseProcCmdline(raw: string): string[] { + return raw + .split("\0") + .map((entry) => entry.trim()) + .filter(Boolean); +} + +function readLinuxCmdline(pid: number): string[] | null { + try { + const raw = fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8"); + return parseProcCmdline(raw); + } catch { + return null; + } +} + +function readLinuxStartTime(pid: number): number | null { + try { + const raw = fsSync.readFileSync(`/proc/${pid}/stat`, "utf8").trim(); + const closeParen = raw.lastIndexOf(")"); + if (closeParen < 0) return null; + const rest = raw.slice(closeParen + 1).trim(); + const fields = rest.split(/\s+/); + const startTime = Number.parseInt(fields[19] ?? "", 10); + return Number.isFinite(startTime) ? startTime : null; + } catch { + return null; + } +} + +type LockOwnerStatus = "alive" | "dead" | "unknown"; + +function resolveOwnerStatus( + pid: number, + payload: LockPayload | null, + platform: NodeJS.Platform, +): LockOwnerStatus { + if (!isAlive(pid)) return "dead"; + if (platform !== "linux") return "alive"; + + const payloadStartTime = payload?.startTime; + if (Number.isFinite(payloadStartTime)) { + const currentStartTime = readLinuxStartTime(pid); + if (currentStartTime == null) return "unknown"; + return currentStartTime === payloadStartTime ? "alive" : "dead"; + } + + const args = readLinuxCmdline(pid); + if (!args) return "unknown"; + // Best-effort: still running, but not necessarily a poller; treat unknown as alive unless stale. + return args.length > 0 ? "alive" : "unknown"; +} + +function normalizeAccountId(accountId?: string) { + const trimmed = accountId?.trim(); + if (!trimmed) return "default"; + return trimmed.replace(/[^a-z0-9._-]+/gi, "_"); +} + +function hashToken(token: string) { + return createHash("sha1").update(token).digest("hex").slice(0, 10); +} + +async function readLockPayload(lockPath: string): Promise { + try { + const raw = await fs.readFile(lockPath, "utf8"); + const parsed = JSON.parse(raw) as Partial; + if (typeof parsed.pid !== "number") return null; + if (typeof parsed.createdAt !== "string") return null; + if (typeof parsed.accountId !== "string") return null; + if (typeof parsed.tokenHash !== "string") return null; + const startTime = typeof parsed.startTime === "number" ? parsed.startTime : undefined; + return { + pid: parsed.pid, + createdAt: parsed.createdAt, + accountId: parsed.accountId, + tokenHash: parsed.tokenHash, + startTime, + }; + } catch { + return null; + } +} + +function resolveTelegramPollLockPath(params: { + accountId?: string; + token: string; + env: NodeJS.ProcessEnv; +}) { + const stateDir = resolveStateDir(params.env, os.homedir); + const telegramDir = path.join(stateDir, "telegram"); + const normalized = normalizeAccountId(params.accountId); + const tokenHash = hashToken(params.token); + const unitKey = `${normalized}:${tokenHash}`; + const lockPath = path.join(telegramDir, `poll-lock.${normalized}.${tokenHash}.lock`); + return { lockPath, unitKey, tokenHash }; +} + +export async function acquireTelegramPollLock(opts: { + token: string; + accountId?: string; + env?: NodeJS.ProcessEnv; + timeoutMs?: number; + pollIntervalMs?: number; + staleMs?: number; + platform?: NodeJS.Platform; +}): Promise { + const env = opts.env ?? process.env; + if (env.CLAWDBOT_ALLOW_MULTI_TELEGRAM_POLL === "1") { + return { + lockPath: "", + unitKey: "disabled", + release: async () => undefined, + }; + } + + const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; + const staleMs = opts.staleMs ?? DEFAULT_STALE_MS; + const platform = opts.platform ?? process.platform; + + const { lockPath, unitKey, tokenHash } = resolveTelegramPollLockPath({ + accountId: opts.accountId, + token: opts.token, + env, + }); + await fs.mkdir(path.dirname(lockPath), { recursive: true, mode: 0o700 }); + + const startedAt = Date.now(); + let lastPayload: LockPayload | null = null; + + while (Date.now() - startedAt < timeoutMs) { + try { + const handle = await fs.open(lockPath, "wx"); + const startTime = platform === "linux" ? readLinuxStartTime(process.pid) : null; + const payload: LockPayload = { + pid: process.pid, + createdAt: new Date().toISOString(), + accountId: normalizeAccountId(opts.accountId), + tokenHash, + }; + if (typeof startTime === "number" && Number.isFinite(startTime)) { + payload.startTime = startTime; + } + await handle.writeFile(JSON.stringify(payload), "utf8"); + return { + lockPath, + unitKey, + release: async () => { + await handle.close().catch(() => undefined); + await fs.rm(lockPath, { force: true }); + }, + }; + } catch (err) { + const code = (err as { code?: unknown }).code; + if (code !== "EEXIST") { + throw new TelegramPollLockError(`failed to acquire telegram poll lock at ${lockPath}`, err); + } + + lastPayload = await readLockPayload(lockPath); + const ownerPid = lastPayload?.pid; + const ownerStatus = ownerPid + ? resolveOwnerStatus(ownerPid, lastPayload, platform) + : "unknown"; + if (ownerStatus === "dead" && ownerPid) { + await fs.rm(lockPath, { force: true }); + continue; + } + if (ownerStatus !== "alive") { + let stale = false; + if (lastPayload?.createdAt) { + const createdAt = Date.parse(lastPayload.createdAt); + stale = Number.isFinite(createdAt) ? Date.now() - createdAt > staleMs : false; + } + if (!stale) { + try { + const st = await fs.stat(lockPath); + stale = Date.now() - st.mtimeMs > staleMs; + } catch { + stale = true; + } + } + if (stale) { + await fs.rm(lockPath, { force: true }); + continue; + } + } + + await new Promise((r) => setTimeout(r, pollIntervalMs)); + } + } + + const owner = lastPayload?.pid ? `pid=${lastPayload.pid}` : "unknown owner"; + throw new TelegramPollLockError(`telegram poll lock timeout (${owner}) for ${unitKey}`); +} diff --git a/src/telegram/update-offset-store.test.ts b/src/telegram/update-offset-store.test.ts index cab586173..c5911a3d1 100644 --- a/src/telegram/update-offset-store.test.ts +++ b/src/telegram/update-offset-store.test.ts @@ -32,4 +32,33 @@ describe("telegram update offset store", () => { expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBe(421); }); }); + + it("ignores invalid persisted update ids", async () => { + await withTempStateDir(async (dir) => { + const filePath = path.join(dir, "telegram", "update-offset-primary.json"); + await fs.mkdir(path.dirname(filePath), { recursive: true }); + + const writeRaw = async (lastUpdateId: unknown) => { + await fs.writeFile( + filePath, + `${JSON.stringify({ version: 1, lastUpdateId }, null, 2)}\n`, + "utf-8", + ); + }; + + await writeRaw(2_147_483_648); + expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull(); + + const backupFiles = (await fs.readdir(path.dirname(filePath))).filter((name) => + name.includes("update-offset-primary.json.bak.invalid."), + ); + expect(backupFiles.length).toBe(1); + + await writeRaw(-1); + expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull(); + + await writeRaw(1.25); + expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull(); + }); + }); }); diff --git a/src/telegram/update-offset-store.ts b/src/telegram/update-offset-store.ts index 303db8c9c..838c4a5e3 100644 --- a/src/telegram/update-offset-store.ts +++ b/src/telegram/update-offset-store.ts @@ -6,12 +6,26 @@ import path from "node:path"; import { resolveStateDir } from "../config/paths.js"; const STORE_VERSION = 1; +const TELEGRAM_UPDATE_ID_MAX = 2_147_483_647; // Telegram Bot API: Integer (32-bit signed) type TelegramUpdateOffsetState = { version: number; lastUpdateId: number | null; }; +function isValidTelegramUpdateId(value: unknown): value is number { + return ( + typeof value === "number" && + Number.isSafeInteger(value) && + value >= 0 && + value <= TELEGRAM_UPDATE_ID_MAX + ); +} + +function formatBackupSuffix() { + return new Date().toISOString().replaceAll(":", "").replaceAll(".", ""); +} + function normalizeAccountId(accountId?: string) { const trimmed = accountId?.trim(); if (!trimmed) return "default"; @@ -31,7 +45,7 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null { try { const parsed = JSON.parse(raw) as TelegramUpdateOffsetState; if (parsed?.version !== STORE_VERSION) return null; - if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") { + if (parsed.lastUpdateId !== null && !isValidTelegramUpdateId(parsed.lastUpdateId)) { return null; } return parsed; @@ -43,12 +57,23 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null { export async function readTelegramUpdateOffset(params: { accountId?: string; env?: NodeJS.ProcessEnv; + onInvalid?: (info: { path: string; backupPath: string }) => void; }): Promise { const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); try { const raw = await fs.readFile(filePath, "utf-8"); const parsed = safeParseState(raw); - return parsed?.lastUpdateId ?? null; + if (!parsed) { + const backupPath = `${filePath}.bak.invalid.${formatBackupSuffix()}`; + try { + await fs.rename(filePath, backupPath); + params.onInvalid?.({ path: filePath, backupPath }); + } catch { + // Ignore backup failures; treat as missing offset. + } + return null; + } + return parsed.lastUpdateId ?? null; } catch (err) { const code = (err as { code?: string }).code; if (code === "ENOENT") return null;