fix(telegram): sanitize update offset + lock polling
- Ignore/backup invalid persisted lastUpdateId instead of getting stuck\n- Add a per-bot poll lock to avoid multiple getUpdates loops\n- Wire lock + offset sanitize into telegram monitor
This commit is contained in:
parent
6044bf3637
commit
bc141e3e03
@ -11,6 +11,7 @@ import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
|
||||
import { createTelegramBot } from "./bot.js";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { acquireTelegramPollLock } from "./poll-lock.js";
|
||||
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
|
||||
import { startTelegramWebhook } from "./webhook.js";
|
||||
|
||||
@ -93,6 +94,7 @@ const isNetworkRelatedError = (err: unknown) => {
|
||||
|
||||
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const runtimeError = opts.runtime?.error ?? console.error;
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
accountId: opts.accountId,
|
||||
@ -108,8 +110,28 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
opts.proxyFetch ??
|
||||
(account.config.proxy ? makeProxyFetch(account.config.proxy as string) : undefined);
|
||||
|
||||
let pollLock: { release: () => Promise<void> } | null = null;
|
||||
try {
|
||||
pollLock = await acquireTelegramPollLock({
|
||||
token,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
} catch (err) {
|
||||
runtimeError(
|
||||
`[telegram] [${account.accountId}] polling lock unavailable; skipping provider start: ${String(
|
||||
err,
|
||||
)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let lastUpdateId = await readTelegramUpdateOffset({
|
||||
accountId: account.accountId,
|
||||
onInvalid: ({ path, backupPath }) => {
|
||||
runtimeError(
|
||||
`[telegram] [${account.accountId}] invalid update offset at ${path}; moved to ${backupPath}`,
|
||||
);
|
||||
},
|
||||
});
|
||||
const persistUpdateId = async (updateId: number) => {
|
||||
if (lastUpdateId !== null && updateId <= lastUpdateId) return;
|
||||
@ -151,49 +173,52 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
abortSignal: opts.abortSignal,
|
||||
publicUrl: opts.webhookUrl,
|
||||
});
|
||||
await pollLock.release().catch(() => undefined);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use grammyjs/runner for concurrent update processing
|
||||
let restartAttempts = 0;
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
void runner.stop();
|
||||
}
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
try {
|
||||
// runner.task() returns a promise that resolves when the runner stops
|
||||
await runner.task();
|
||||
return;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
const isNetworkError = isNetworkRelatedError(err);
|
||||
if (!isConflict && !isRecoverable && !isNetworkError) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
void runner.stop();
|
||||
}
|
||||
};
|
||||
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) return;
|
||||
throw sleepErr;
|
||||
// runner.task() returns a promise that resolves when the runner stops
|
||||
await runner.task();
|
||||
return;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
throw err;
|
||||
}
|
||||
const isConflict = isGetUpdatesConflict(err);
|
||||
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
||||
const isNetworkError = isNetworkRelatedError(err);
|
||||
if (!isConflict && !isRecoverable && !isNetworkError) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
const reason = isConflict ? "getUpdates conflict" : "network error";
|
||||
const errMsg = formatErrorMessage(err);
|
||||
runtimeError(`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) return;
|
||||
throw sleepErr;
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
}
|
||||
} finally {
|
||||
await pollLock.release().catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
55
src/telegram/poll-lock.test.ts
Normal file
55
src/telegram/poll-lock.test.ts
Normal file
@ -0,0 +1,55 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { acquireTelegramPollLock, TelegramPollLockError } from "./poll-lock.js";
|
||||
|
||||
async function withTempStateDir<T>(fn: (dir: string) => Promise<T>) {
|
||||
const previous = process.env.CLAWDBOT_STATE_DIR;
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-telegram-lock-"));
|
||||
process.env.CLAWDBOT_STATE_DIR = dir;
|
||||
try {
|
||||
return await fn(dir);
|
||||
} finally {
|
||||
if (previous === undefined) delete process.env.CLAWDBOT_STATE_DIR;
|
||||
else process.env.CLAWDBOT_STATE_DIR = previous;
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
describe("telegram poll lock", () => {
|
||||
it("prevents multiple concurrent pollers for same account+token", async () => {
|
||||
await withTempStateDir(async () => {
|
||||
const lock1 = await acquireTelegramPollLock({
|
||||
token: "123:abc",
|
||||
accountId: "default",
|
||||
timeoutMs: 50,
|
||||
pollIntervalMs: 5,
|
||||
staleMs: 10_000,
|
||||
});
|
||||
|
||||
await expect(
|
||||
acquireTelegramPollLock({
|
||||
token: "123:abc",
|
||||
accountId: "default",
|
||||
timeoutMs: 30,
|
||||
pollIntervalMs: 5,
|
||||
staleMs: 10_000,
|
||||
}),
|
||||
).rejects.toBeInstanceOf(TelegramPollLockError);
|
||||
|
||||
await lock1.release();
|
||||
|
||||
const lock2 = await acquireTelegramPollLock({
|
||||
token: "123:abc",
|
||||
accountId: "default",
|
||||
timeoutMs: 50,
|
||||
pollIntervalMs: 5,
|
||||
staleMs: 10_000,
|
||||
});
|
||||
await lock2.release();
|
||||
});
|
||||
});
|
||||
});
|
||||
241
src/telegram/poll-lock.ts
Normal file
241
src/telegram/poll-lock.ts
Normal file
@ -0,0 +1,241 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import fsSync from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
|
||||
const DEFAULT_TIMEOUT_MS = 5000;
|
||||
const DEFAULT_POLL_INTERVAL_MS = 100;
|
||||
const DEFAULT_STALE_MS = 30_000;
|
||||
|
||||
type LockPayload = {
|
||||
pid: number;
|
||||
createdAt: string;
|
||||
accountId: string;
|
||||
tokenHash: string;
|
||||
startTime?: number;
|
||||
};
|
||||
|
||||
export type TelegramPollLockHandle = {
|
||||
lockPath: string;
|
||||
unitKey: string;
|
||||
release: () => Promise<void>;
|
||||
};
|
||||
|
||||
export class TelegramPollLockError extends Error {
|
||||
constructor(
|
||||
message: string,
|
||||
public readonly cause?: unknown,
|
||||
) {
|
||||
super(message);
|
||||
this.name = "TelegramPollLockError";
|
||||
}
|
||||
}
|
||||
|
||||
function isAlive(pid: number): boolean {
|
||||
if (!Number.isFinite(pid) || pid <= 0) return false;
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function parseProcCmdline(raw: string): string[] {
|
||||
return raw
|
||||
.split("\0")
|
||||
.map((entry) => entry.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function readLinuxCmdline(pid: number): string[] | null {
|
||||
try {
|
||||
const raw = fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8");
|
||||
return parseProcCmdline(raw);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function readLinuxStartTime(pid: number): number | null {
|
||||
try {
|
||||
const raw = fsSync.readFileSync(`/proc/${pid}/stat`, "utf8").trim();
|
||||
const closeParen = raw.lastIndexOf(")");
|
||||
if (closeParen < 0) return null;
|
||||
const rest = raw.slice(closeParen + 1).trim();
|
||||
const fields = rest.split(/\s+/);
|
||||
const startTime = Number.parseInt(fields[19] ?? "", 10);
|
||||
return Number.isFinite(startTime) ? startTime : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
type LockOwnerStatus = "alive" | "dead" | "unknown";
|
||||
|
||||
function resolveOwnerStatus(
|
||||
pid: number,
|
||||
payload: LockPayload | null,
|
||||
platform: NodeJS.Platform,
|
||||
): LockOwnerStatus {
|
||||
if (!isAlive(pid)) return "dead";
|
||||
if (platform !== "linux") return "alive";
|
||||
|
||||
const payloadStartTime = payload?.startTime;
|
||||
if (Number.isFinite(payloadStartTime)) {
|
||||
const currentStartTime = readLinuxStartTime(pid);
|
||||
if (currentStartTime == null) return "unknown";
|
||||
return currentStartTime === payloadStartTime ? "alive" : "dead";
|
||||
}
|
||||
|
||||
const args = readLinuxCmdline(pid);
|
||||
if (!args) return "unknown";
|
||||
// Best-effort: still running, but not necessarily a poller; treat unknown as alive unless stale.
|
||||
return args.length > 0 ? "alive" : "unknown";
|
||||
}
|
||||
|
||||
function normalizeAccountId(accountId?: string) {
|
||||
const trimmed = accountId?.trim();
|
||||
if (!trimmed) return "default";
|
||||
return trimmed.replace(/[^a-z0-9._-]+/gi, "_");
|
||||
}
|
||||
|
||||
function hashToken(token: string) {
|
||||
return createHash("sha1").update(token).digest("hex").slice(0, 10);
|
||||
}
|
||||
|
||||
async function readLockPayload(lockPath: string): Promise<LockPayload | null> {
|
||||
try {
|
||||
const raw = await fs.readFile(lockPath, "utf8");
|
||||
const parsed = JSON.parse(raw) as Partial<LockPayload>;
|
||||
if (typeof parsed.pid !== "number") return null;
|
||||
if (typeof parsed.createdAt !== "string") return null;
|
||||
if (typeof parsed.accountId !== "string") return null;
|
||||
if (typeof parsed.tokenHash !== "string") return null;
|
||||
const startTime = typeof parsed.startTime === "number" ? parsed.startTime : undefined;
|
||||
return {
|
||||
pid: parsed.pid,
|
||||
createdAt: parsed.createdAt,
|
||||
accountId: parsed.accountId,
|
||||
tokenHash: parsed.tokenHash,
|
||||
startTime,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function resolveTelegramPollLockPath(params: {
|
||||
accountId?: string;
|
||||
token: string;
|
||||
env: NodeJS.ProcessEnv;
|
||||
}) {
|
||||
const stateDir = resolveStateDir(params.env, os.homedir);
|
||||
const telegramDir = path.join(stateDir, "telegram");
|
||||
const normalized = normalizeAccountId(params.accountId);
|
||||
const tokenHash = hashToken(params.token);
|
||||
const unitKey = `${normalized}:${tokenHash}`;
|
||||
const lockPath = path.join(telegramDir, `poll-lock.${normalized}.${tokenHash}.lock`);
|
||||
return { lockPath, unitKey, tokenHash };
|
||||
}
|
||||
|
||||
export async function acquireTelegramPollLock(opts: {
|
||||
token: string;
|
||||
accountId?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
timeoutMs?: number;
|
||||
pollIntervalMs?: number;
|
||||
staleMs?: number;
|
||||
platform?: NodeJS.Platform;
|
||||
}): Promise<TelegramPollLockHandle> {
|
||||
const env = opts.env ?? process.env;
|
||||
if (env.CLAWDBOT_ALLOW_MULTI_TELEGRAM_POLL === "1") {
|
||||
return {
|
||||
lockPath: "",
|
||||
unitKey: "disabled",
|
||||
release: async () => undefined,
|
||||
};
|
||||
}
|
||||
|
||||
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
|
||||
const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
|
||||
const staleMs = opts.staleMs ?? DEFAULT_STALE_MS;
|
||||
const platform = opts.platform ?? process.platform;
|
||||
|
||||
const { lockPath, unitKey, tokenHash } = resolveTelegramPollLockPath({
|
||||
accountId: opts.accountId,
|
||||
token: opts.token,
|
||||
env,
|
||||
});
|
||||
await fs.mkdir(path.dirname(lockPath), { recursive: true, mode: 0o700 });
|
||||
|
||||
const startedAt = Date.now();
|
||||
let lastPayload: LockPayload | null = null;
|
||||
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
try {
|
||||
const handle = await fs.open(lockPath, "wx");
|
||||
const startTime = platform === "linux" ? readLinuxStartTime(process.pid) : null;
|
||||
const payload: LockPayload = {
|
||||
pid: process.pid,
|
||||
createdAt: new Date().toISOString(),
|
||||
accountId: normalizeAccountId(opts.accountId),
|
||||
tokenHash,
|
||||
};
|
||||
if (typeof startTime === "number" && Number.isFinite(startTime)) {
|
||||
payload.startTime = startTime;
|
||||
}
|
||||
await handle.writeFile(JSON.stringify(payload), "utf8");
|
||||
return {
|
||||
lockPath,
|
||||
unitKey,
|
||||
release: async () => {
|
||||
await handle.close().catch(() => undefined);
|
||||
await fs.rm(lockPath, { force: true });
|
||||
},
|
||||
};
|
||||
} catch (err) {
|
||||
const code = (err as { code?: unknown }).code;
|
||||
if (code !== "EEXIST") {
|
||||
throw new TelegramPollLockError(`failed to acquire telegram poll lock at ${lockPath}`, err);
|
||||
}
|
||||
|
||||
lastPayload = await readLockPayload(lockPath);
|
||||
const ownerPid = lastPayload?.pid;
|
||||
const ownerStatus = ownerPid
|
||||
? resolveOwnerStatus(ownerPid, lastPayload, platform)
|
||||
: "unknown";
|
||||
if (ownerStatus === "dead" && ownerPid) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
}
|
||||
if (ownerStatus !== "alive") {
|
||||
let stale = false;
|
||||
if (lastPayload?.createdAt) {
|
||||
const createdAt = Date.parse(lastPayload.createdAt);
|
||||
stale = Number.isFinite(createdAt) ? Date.now() - createdAt > staleMs : false;
|
||||
}
|
||||
if (!stale) {
|
||||
try {
|
||||
const st = await fs.stat(lockPath);
|
||||
stale = Date.now() - st.mtimeMs > staleMs;
|
||||
} catch {
|
||||
stale = true;
|
||||
}
|
||||
}
|
||||
if (stale) {
|
||||
await fs.rm(lockPath, { force: true });
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
await new Promise((r) => setTimeout(r, pollIntervalMs));
|
||||
}
|
||||
}
|
||||
|
||||
const owner = lastPayload?.pid ? `pid=${lastPayload.pid}` : "unknown owner";
|
||||
throw new TelegramPollLockError(`telegram poll lock timeout (${owner}) for ${unitKey}`);
|
||||
}
|
||||
@ -32,4 +32,33 @@ describe("telegram update offset store", () => {
|
||||
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBe(421);
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores invalid persisted update ids", async () => {
|
||||
await withTempStateDir(async (dir) => {
|
||||
const filePath = path.join(dir, "telegram", "update-offset-primary.json");
|
||||
await fs.mkdir(path.dirname(filePath), { recursive: true });
|
||||
|
||||
const writeRaw = async (lastUpdateId: unknown) => {
|
||||
await fs.writeFile(
|
||||
filePath,
|
||||
`${JSON.stringify({ version: 1, lastUpdateId }, null, 2)}\n`,
|
||||
"utf-8",
|
||||
);
|
||||
};
|
||||
|
||||
await writeRaw(2_147_483_648);
|
||||
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
|
||||
|
||||
const backupFiles = (await fs.readdir(path.dirname(filePath))).filter((name) =>
|
||||
name.includes("update-offset-primary.json.bak.invalid."),
|
||||
);
|
||||
expect(backupFiles.length).toBe(1);
|
||||
|
||||
await writeRaw(-1);
|
||||
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
|
||||
|
||||
await writeRaw(1.25);
|
||||
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -6,12 +6,26 @@ import path from "node:path";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
const TELEGRAM_UPDATE_ID_MAX = 2_147_483_647; // Telegram Bot API: Integer (32-bit signed)
|
||||
|
||||
type TelegramUpdateOffsetState = {
|
||||
version: number;
|
||||
lastUpdateId: number | null;
|
||||
};
|
||||
|
||||
function isValidTelegramUpdateId(value: unknown): value is number {
|
||||
return (
|
||||
typeof value === "number" &&
|
||||
Number.isSafeInteger(value) &&
|
||||
value >= 0 &&
|
||||
value <= TELEGRAM_UPDATE_ID_MAX
|
||||
);
|
||||
}
|
||||
|
||||
function formatBackupSuffix() {
|
||||
return new Date().toISOString().replaceAll(":", "").replaceAll(".", "");
|
||||
}
|
||||
|
||||
function normalizeAccountId(accountId?: string) {
|
||||
const trimmed = accountId?.trim();
|
||||
if (!trimmed) return "default";
|
||||
@ -31,7 +45,7 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as TelegramUpdateOffsetState;
|
||||
if (parsed?.version !== STORE_VERSION) return null;
|
||||
if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") {
|
||||
if (parsed.lastUpdateId !== null && !isValidTelegramUpdateId(parsed.lastUpdateId)) {
|
||||
return null;
|
||||
}
|
||||
return parsed;
|
||||
@ -43,12 +57,23 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
|
||||
export async function readTelegramUpdateOffset(params: {
|
||||
accountId?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
onInvalid?: (info: { path: string; backupPath: string }) => void;
|
||||
}): Promise<number | null> {
|
||||
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
|
||||
try {
|
||||
const raw = await fs.readFile(filePath, "utf-8");
|
||||
const parsed = safeParseState(raw);
|
||||
return parsed?.lastUpdateId ?? null;
|
||||
if (!parsed) {
|
||||
const backupPath = `${filePath}.bak.invalid.${formatBackupSuffix()}`;
|
||||
try {
|
||||
await fs.rename(filePath, backupPath);
|
||||
params.onInvalid?.({ path: filePath, backupPath });
|
||||
} catch {
|
||||
// Ignore backup failures; treat as missing offset.
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return parsed.lastUpdateId ?? null;
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code === "ENOENT") return null;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user