diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index 972210226..dd908b6bf 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -1794,4 +1794,126 @@ describe("createTelegramBot", () => { expect.objectContaining({ message_thread_id: 99 }), ); }); + + it("dedupes duplicate message updates by update_id", async () => { + onSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("message") as ( + ctx: Record, + ) => Promise; + + const ctx = { + update: { update_id: 111 }, + message: { + chat: { id: 123, type: "private" }, + from: { id: 456, username: "testuser" }, + text: "hello", + date: 1736380800, + message_id: 42, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({ download: async () => new Uint8Array() }), + }; + + await handler(ctx); + await handler(ctx); + + expect(replySpy).toHaveBeenCalledTimes(1); + }); + + it("dedupes duplicate callback_query updates by update_id", async () => { + onSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("callback_query") as ( + ctx: Record, + ) => Promise; + + const ctx = { + update: { update_id: 222 }, + callbackQuery: { + id: "cb-1", + data: "ping", + from: { id: 789, username: "testuser" }, + message: { + chat: { id: 123, type: "private" }, + date: 1736380800, + message_id: 9001, + }, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({}), + }; + + await handler(ctx); + await handler(ctx); + + expect(replySpy).toHaveBeenCalledTimes(1); + }); + + it("allows distinct callback_query ids without update_id", async () => { + onSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType< + typeof vi.fn + >; + replySpy.mockReset(); + + loadConfig.mockReturnValue({ + telegram: { dmPolicy: "open", allowFrom: ["*"] }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("callback_query") as ( + ctx: Record, + ) => Promise; + + await handler({ + callbackQuery: { + id: "cb-1", + data: "ping", + from: { id: 789, username: "testuser" }, + message: { + chat: { id: 123, type: "private" }, + date: 1736380800, + message_id: 9001, + }, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({}), + }); + + await handler({ + callbackQuery: { + id: "cb-2", + data: "ping", + from: { id: 789, username: "testuser" }, + message: { + chat: { id: 123, type: "private" }, + date: 1736380800, + message_id: 9001, + }, + }, + me: { username: "clawdbot_bot" }, + getFile: async () => ({}), + }); + + expect(replySpy).toHaveBeenCalledTimes(2); + }); }); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 7397f6ab1..c5979ec76 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -71,6 +71,8 @@ const PARSE_ERR_RE = // Media group aggregation - Telegram sends multi-image messages as separate updates // with a shared media_group_id. We buffer them and process as a single message after a short delay. const MEDIA_GROUP_TIMEOUT_MS = 500; +const RECENT_TELEGRAM_UPDATE_TTL_MS = 5 * 60_000; +const RECENT_TELEGRAM_UPDATE_MAX = 2000; type TelegramMessage = Message.CommonMessage; @@ -84,6 +86,62 @@ type MediaGroupEntry = { timer: ReturnType; }; +type TelegramUpdateKeyContext = { + update?: { + update_id?: number; + message?: TelegramMessage; + edited_message?: TelegramMessage; + }; + update_id?: number; + message?: TelegramMessage; + callbackQuery?: { id?: string; message?: TelegramMessage }; +}; + +const buildTelegramUpdateKey = (ctx: TelegramUpdateKeyContext) => { + const updateId = ctx.update?.update_id ?? ctx.update_id; + if (typeof updateId === "number") return `update:${updateId}`; + const callbackId = ctx.callbackQuery?.id; + if (callbackId) return `callback:${callbackId}`; + const msg = + ctx.message ?? + ctx.update?.message ?? + ctx.update?.edited_message ?? + ctx.callbackQuery?.message; + const chatId = msg?.chat?.id; + const messageId = msg?.message_id; + if (typeof chatId !== "undefined" && typeof messageId === "number") { + return `message:${chatId}:${messageId}`; + } + return undefined; +}; + +const shouldSkipTelegramUpdate = ( + cache: Map, + key?: string, +) => { + if (!key) return false; + const now = Date.now(); + const existing = cache.get(key); + if (existing && now - existing.ts < RECENT_TELEGRAM_UPDATE_TTL_MS) { + return true; + } + if (existing) cache.delete(key); + cache.set(key, { ts: now }); + if (cache.size > RECENT_TELEGRAM_UPDATE_MAX) { + for (const [cachedKey, entry] of cache) { + if (now - entry.ts > RECENT_TELEGRAM_UPDATE_TTL_MS) { + cache.delete(cachedKey); + } + } + while (cache.size > RECENT_TELEGRAM_UPDATE_MAX) { + const oldestKey = cache.keys().next().value as string | undefined; + if (!oldestKey) break; + cache.delete(oldestKey); + } + } + return false; +}; + /** Telegram Location object */ interface TelegramLocation { latitude: number; @@ -170,6 +228,16 @@ export function createTelegramBot(opts: TelegramBotOptions) { bot.api.config.use(apiThrottler()); bot.use(sequentialize(getTelegramSequentialKey)); + const recentUpdates = new Map(); + const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { + const key = buildTelegramUpdateKey(ctx); + const skipped = shouldSkipTelegramUpdate(recentUpdates, key); + if (skipped && key && shouldLogVerbose()) { + logVerbose(`telegram dedupe: skipped ${key}`); + } + return skipped; + }; + const mediaGroupBuffer = new Map(); const cfg = opts.config ?? loadConfig(); @@ -804,6 +872,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { bot.command(command.name, async (ctx) => { const msg = ctx.message; if (!msg) return; + if (shouldSkipUpdate(ctx)) return; const chatId = msg.chat.id; const isGroup = msg.chat.type === "group" || msg.chat.type === "supergroup"; @@ -997,6 +1066,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { bot.on("callback_query", async (ctx) => { const callback = ctx.callbackQuery; if (!callback) return; + if (shouldSkipUpdate(ctx)) return; try { const data = (callback.data ?? "").trim(); const callbackMessage = callback.message; @@ -1032,6 +1102,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { try { const msg = ctx.message; if (!msg) return; + if (shouldSkipUpdate(ctx)) return; const chatId = msg.chat.id; const isGroup =