This commit is contained in:
Jhony Sidney 2026-01-30 15:29:26 +01:00 committed by GitHub
commit 66e83f8dc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 412 additions and 37 deletions

View File

@ -11,6 +11,7 @@ import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { createTelegramBot } from "./bot.js";
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
import { makeProxyFetch } from "./proxy.js";
import { acquireTelegramPollLock } from "./poll-lock.js";
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
import { startTelegramWebhook } from "./webhook.js";
@ -93,6 +94,7 @@ const isNetworkRelatedError = (err: unknown) => {
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const cfg = opts.config ?? loadConfig();
const runtimeError = opts.runtime?.error ?? console.error;
const account = resolveTelegramAccount({
cfg,
accountId: opts.accountId,
@ -108,8 +110,28 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
opts.proxyFetch ??
(account.config.proxy ? makeProxyFetch(account.config.proxy as string) : undefined);
let pollLock: { release: () => Promise<void> } | null = null;
try {
pollLock = await acquireTelegramPollLock({
token,
accountId: account.accountId,
});
} catch (err) {
runtimeError(
`[telegram] [${account.accountId}] polling lock unavailable; skipping provider start: ${String(
err,
)}`,
);
return;
}
let lastUpdateId = await readTelegramUpdateOffset({
accountId: account.accountId,
onInvalid: ({ path, backupPath }) => {
runtimeError(
`[telegram] [${account.accountId}] invalid update offset at ${path}; moved to ${backupPath}`,
);
},
});
const persistUpdateId = async (updateId: number) => {
if (lastUpdateId !== null && updateId <= lastUpdateId) return;
@ -151,49 +173,52 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
abortSignal: opts.abortSignal,
publicUrl: opts.webhookUrl,
});
await pollLock.release().catch(() => undefined);
return;
}
// Use grammyjs/runner for concurrent update processing
let restartAttempts = 0;
while (!opts.abortSignal?.aborted) {
const runner = run(bot, createTelegramRunnerOptions(cfg));
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
void runner.stop();
}
};
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
return;
} catch (err) {
if (opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isNetworkError = isNetworkRelatedError(err);
if (!isConflict && !isRecoverable && !isNetworkError) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
(opts.runtime?.error ?? console.error)(
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`,
);
try {
while (!opts.abortSignal?.aborted) {
const runner = run(bot, createTelegramRunnerOptions(cfg));
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
void runner.stop();
}
};
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) return;
throw sleepErr;
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
return;
} catch (err) {
if (opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isNetworkError = isNetworkRelatedError(err);
if (!isConflict && !isRecoverable && !isNetworkError) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
runtimeError(`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationMs(delayMs)}.`);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) return;
throw sleepErr;
}
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
}
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
}
} finally {
await pollLock.release().catch(() => undefined);
}
}

View File

@ -0,0 +1,55 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { acquireTelegramPollLock, TelegramPollLockError } from "./poll-lock.js";
async function withTempStateDir<T>(fn: (dir: string) => Promise<T>) {
const previous = process.env.CLAWDBOT_STATE_DIR;
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-telegram-lock-"));
process.env.CLAWDBOT_STATE_DIR = dir;
try {
return await fn(dir);
} finally {
if (previous === undefined) delete process.env.CLAWDBOT_STATE_DIR;
else process.env.CLAWDBOT_STATE_DIR = previous;
await fs.rm(dir, { recursive: true, force: true });
}
}
describe("telegram poll lock", () => {
it("prevents multiple concurrent pollers for same account+token", async () => {
await withTempStateDir(async () => {
const lock1 = await acquireTelegramPollLock({
token: "123:abc",
accountId: "default",
timeoutMs: 50,
pollIntervalMs: 5,
staleMs: 10_000,
});
await expect(
acquireTelegramPollLock({
token: "123:abc",
accountId: "default",
timeoutMs: 30,
pollIntervalMs: 5,
staleMs: 10_000,
}),
).rejects.toBeInstanceOf(TelegramPollLockError);
await lock1.release();
const lock2 = await acquireTelegramPollLock({
token: "123:abc",
accountId: "default",
timeoutMs: 50,
pollIntervalMs: 5,
staleMs: 10_000,
});
await lock2.release();
});
});
});

241
src/telegram/poll-lock.ts Normal file
View File

@ -0,0 +1,241 @@
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import fsSync from "node:fs";
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
const DEFAULT_TIMEOUT_MS = 5000;
const DEFAULT_POLL_INTERVAL_MS = 100;
const DEFAULT_STALE_MS = 30_000;
type LockPayload = {
pid: number;
createdAt: string;
accountId: string;
tokenHash: string;
startTime?: number;
};
export type TelegramPollLockHandle = {
lockPath: string;
unitKey: string;
release: () => Promise<void>;
};
export class TelegramPollLockError extends Error {
constructor(
message: string,
public readonly cause?: unknown,
) {
super(message);
this.name = "TelegramPollLockError";
}
}
function isAlive(pid: number): boolean {
if (!Number.isFinite(pid) || pid <= 0) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function parseProcCmdline(raw: string): string[] {
return raw
.split("\0")
.map((entry) => entry.trim())
.filter(Boolean);
}
function readLinuxCmdline(pid: number): string[] | null {
try {
const raw = fsSync.readFileSync(`/proc/${pid}/cmdline`, "utf8");
return parseProcCmdline(raw);
} catch {
return null;
}
}
function readLinuxStartTime(pid: number): number | null {
try {
const raw = fsSync.readFileSync(`/proc/${pid}/stat`, "utf8").trim();
const closeParen = raw.lastIndexOf(")");
if (closeParen < 0) return null;
const rest = raw.slice(closeParen + 1).trim();
const fields = rest.split(/\s+/);
const startTime = Number.parseInt(fields[19] ?? "", 10);
return Number.isFinite(startTime) ? startTime : null;
} catch {
return null;
}
}
type LockOwnerStatus = "alive" | "dead" | "unknown";
function resolveOwnerStatus(
pid: number,
payload: LockPayload | null,
platform: NodeJS.Platform,
): LockOwnerStatus {
if (!isAlive(pid)) return "dead";
if (platform !== "linux") return "alive";
const payloadStartTime = payload?.startTime;
if (Number.isFinite(payloadStartTime)) {
const currentStartTime = readLinuxStartTime(pid);
if (currentStartTime == null) return "unknown";
return currentStartTime === payloadStartTime ? "alive" : "dead";
}
const args = readLinuxCmdline(pid);
if (!args) return "unknown";
// Best-effort: still running, but not necessarily a poller; treat unknown as alive unless stale.
return args.length > 0 ? "alive" : "unknown";
}
function normalizeAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) return "default";
return trimmed.replace(/[^a-z0-9._-]+/gi, "_");
}
function hashToken(token: string) {
return createHash("sha1").update(token).digest("hex").slice(0, 10);
}
async function readLockPayload(lockPath: string): Promise<LockPayload | null> {
try {
const raw = await fs.readFile(lockPath, "utf8");
const parsed = JSON.parse(raw) as Partial<LockPayload>;
if (typeof parsed.pid !== "number") return null;
if (typeof parsed.createdAt !== "string") return null;
if (typeof parsed.accountId !== "string") return null;
if (typeof parsed.tokenHash !== "string") return null;
const startTime = typeof parsed.startTime === "number" ? parsed.startTime : undefined;
return {
pid: parsed.pid,
createdAt: parsed.createdAt,
accountId: parsed.accountId,
tokenHash: parsed.tokenHash,
startTime,
};
} catch {
return null;
}
}
function resolveTelegramPollLockPath(params: {
accountId?: string;
token: string;
env: NodeJS.ProcessEnv;
}) {
const stateDir = resolveStateDir(params.env, os.homedir);
const telegramDir = path.join(stateDir, "telegram");
const normalized = normalizeAccountId(params.accountId);
const tokenHash = hashToken(params.token);
const unitKey = `${normalized}:${tokenHash}`;
const lockPath = path.join(telegramDir, `poll-lock.${normalized}.${tokenHash}.lock`);
return { lockPath, unitKey, tokenHash };
}
export async function acquireTelegramPollLock(opts: {
token: string;
accountId?: string;
env?: NodeJS.ProcessEnv;
timeoutMs?: number;
pollIntervalMs?: number;
staleMs?: number;
platform?: NodeJS.Platform;
}): Promise<TelegramPollLockHandle> {
const env = opts.env ?? process.env;
if (env.CLAWDBOT_ALLOW_MULTI_TELEGRAM_POLL === "1") {
return {
lockPath: "",
unitKey: "disabled",
release: async () => undefined,
};
}
const timeoutMs = opts.timeoutMs ?? DEFAULT_TIMEOUT_MS;
const pollIntervalMs = opts.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS;
const staleMs = opts.staleMs ?? DEFAULT_STALE_MS;
const platform = opts.platform ?? process.platform;
const { lockPath, unitKey, tokenHash } = resolveTelegramPollLockPath({
accountId: opts.accountId,
token: opts.token,
env,
});
await fs.mkdir(path.dirname(lockPath), { recursive: true, mode: 0o700 });
const startedAt = Date.now();
let lastPayload: LockPayload | null = null;
while (Date.now() - startedAt < timeoutMs) {
try {
const handle = await fs.open(lockPath, "wx");
const startTime = platform === "linux" ? readLinuxStartTime(process.pid) : null;
const payload: LockPayload = {
pid: process.pid,
createdAt: new Date().toISOString(),
accountId: normalizeAccountId(opts.accountId),
tokenHash,
};
if (typeof startTime === "number" && Number.isFinite(startTime)) {
payload.startTime = startTime;
}
await handle.writeFile(JSON.stringify(payload), "utf8");
return {
lockPath,
unitKey,
release: async () => {
await handle.close().catch(() => undefined);
await fs.rm(lockPath, { force: true });
},
};
} catch (err) {
const code = (err as { code?: unknown }).code;
if (code !== "EEXIST") {
throw new TelegramPollLockError(`failed to acquire telegram poll lock at ${lockPath}`, err);
}
lastPayload = await readLockPayload(lockPath);
const ownerPid = lastPayload?.pid;
const ownerStatus = ownerPid
? resolveOwnerStatus(ownerPid, lastPayload, platform)
: "unknown";
if (ownerStatus === "dead" && ownerPid) {
await fs.rm(lockPath, { force: true });
continue;
}
if (ownerStatus !== "alive") {
let stale = false;
if (lastPayload?.createdAt) {
const createdAt = Date.parse(lastPayload.createdAt);
stale = Number.isFinite(createdAt) ? Date.now() - createdAt > staleMs : false;
}
if (!stale) {
try {
const st = await fs.stat(lockPath);
stale = Date.now() - st.mtimeMs > staleMs;
} catch {
stale = true;
}
}
if (stale) {
await fs.rm(lockPath, { force: true });
continue;
}
}
await new Promise((r) => setTimeout(r, pollIntervalMs));
}
}
const owner = lastPayload?.pid ? `pid=${lastPayload.pid}` : "unknown owner";
throw new TelegramPollLockError(`telegram poll lock timeout (${owner}) for ${unitKey}`);
}

View File

@ -32,4 +32,33 @@ describe("telegram update offset store", () => {
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBe(421);
});
});
it("ignores invalid persisted update ids", async () => {
await withTempStateDir(async (dir) => {
const filePath = path.join(dir, "telegram", "update-offset-primary.json");
await fs.mkdir(path.dirname(filePath), { recursive: true });
const writeRaw = async (lastUpdateId: unknown) => {
await fs.writeFile(
filePath,
`${JSON.stringify({ version: 1, lastUpdateId }, null, 2)}\n`,
"utf-8",
);
};
await writeRaw(2_147_483_648);
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
const backupFiles = (await fs.readdir(path.dirname(filePath))).filter((name) =>
name.includes("update-offset-primary.json.bak.invalid."),
);
expect(backupFiles.length).toBe(1);
await writeRaw(-1);
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
await writeRaw(1.25);
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
});
});
});

View File

@ -6,12 +6,26 @@ import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
const STORE_VERSION = 1;
const TELEGRAM_UPDATE_ID_MAX = 2_147_483_647; // Telegram Bot API: Integer (32-bit signed)
type TelegramUpdateOffsetState = {
version: number;
lastUpdateId: number | null;
};
function isValidTelegramUpdateId(value: unknown): value is number {
return (
typeof value === "number" &&
Number.isSafeInteger(value) &&
value >= 0 &&
value <= TELEGRAM_UPDATE_ID_MAX
);
}
function formatBackupSuffix() {
return new Date().toISOString().replaceAll(":", "").replaceAll(".", "");
}
function normalizeAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) return "default";
@ -31,7 +45,7 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
try {
const parsed = JSON.parse(raw) as TelegramUpdateOffsetState;
if (parsed?.version !== STORE_VERSION) return null;
if (parsed.lastUpdateId !== null && typeof parsed.lastUpdateId !== "number") {
if (parsed.lastUpdateId !== null && !isValidTelegramUpdateId(parsed.lastUpdateId)) {
return null;
}
return parsed;
@ -43,12 +57,23 @@ function safeParseState(raw: string): TelegramUpdateOffsetState | null {
export async function readTelegramUpdateOffset(params: {
accountId?: string;
env?: NodeJS.ProcessEnv;
onInvalid?: (info: { path: string; backupPath: string }) => void;
}): Promise<number | null> {
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
try {
const raw = await fs.readFile(filePath, "utf-8");
const parsed = safeParseState(raw);
return parsed?.lastUpdateId ?? null;
if (!parsed) {
const backupPath = `${filePath}.bak.invalid.${formatBackupSuffix()}`;
try {
await fs.rename(filePath, backupPath);
params.onInvalid?.({ path: filePath, backupPath });
} catch {
// Ignore backup failures; treat as missing offset.
}
return null;
}
return parsed.lastUpdateId ?? null;
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") return null;