From c609be185efcc1989e8939bd51683927b3e5e00f Mon Sep 17 00:00:00 2001 From: juanpablodlc <92012363+juanpablodlc@users.noreply.github.com> Date: Thu, 15 Jan 2026 10:59:21 -0800 Subject: [PATCH] feat(whatsapp): add debounceMs for batching rapid messages Add a `debounceMs` configuration option to WhatsApp channel settings that batches rapid consecutive messages from the same sender into a single response. This prevents triggering separate agent runs for each message when a user sends multiple short messages in quick succession (e.g., "Hey!", "how are you?", "I was wondering..."). Changes: - Add `debounceMs` config to WhatsAppConfig and WhatsAppAccountConfig - Implement message buffering in `monitorWebInbox` with: - Map-based buffer keyed by sender (DM) or chat ID (groups) - Debounce timer that resets on each new message - Message combination with newline separator - Single message optimization (no modification if only one message) - Wire `debounceMs` through account resolution and monitor tuning - Add UI hints and schema documentation Usage example: { "channels": { "whatsapp": { "debounceMs": 5000 // 5 second window } } } Default behavior: `debounceMs: 0` (disabled by default) Verified: All existing tests pass (3204 tests), TypeScript compilation succeeds with no errors. Implemented with assistance from AI coding tools. Closes #967 --- src/config/schema.ts | 3 + src/config/types.whatsapp.ts | 4 + src/config/zod-schema.providers-whatsapp.ts | 2 + src/web/accounts.ts | 2 + src/web/auto-reply/monitor.ts | 1 + src/web/auto-reply/types.ts | 2 + src/web/inbound/monitor.ts | 101 ++++++++++++++------ 7 files changed, 86 insertions(+), 29 deletions(-) diff --git a/src/config/schema.ts b/src/config/schema.ts index 4b84c154e..4dabbe405 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -189,6 +189,7 @@ const FIELD_LABELS: Record = { "channels.telegram.timeoutSeconds": "Telegram API Timeout (seconds)", "channels.whatsapp.dmPolicy": "WhatsApp DM Policy", "channels.whatsapp.selfChatMode": "WhatsApp Self-Phone Mode", + "channels.whatsapp.debounceMs": "WhatsApp Message Debounce (ms)", "channels.signal.dmPolicy": "Signal DM Policy", "channels.imessage.dmPolicy": "iMessage DM Policy", "channels.discord.dm.policy": "Discord DM Policy", @@ -348,6 +349,8 @@ const FIELD_HELP: Record = { "channels.whatsapp.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.whatsapp.allowFrom=["*"].', "channels.whatsapp.selfChatMode": "Same-phone setup (bot uses your personal WhatsApp number).", + "channels.whatsapp.debounceMs": + "Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable).", "channels.signal.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.signal.allowFrom=["*"].', "channels.imessage.dmPolicy": diff --git a/src/config/types.whatsapp.ts b/src/config/types.whatsapp.ts index c28178f06..28ed34c56 100644 --- a/src/config/types.whatsapp.ts +++ b/src/config/types.whatsapp.ts @@ -75,6 +75,8 @@ export type WhatsAppConfig = { */ group?: "always" | "mentions" | "never"; }; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; }; export type WhatsAppAccountConfig = { @@ -131,4 +133,6 @@ export type WhatsAppAccountConfig = { */ group?: "always" | "mentions" | "never"; }; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; }; diff --git a/src/config/zod-schema.providers-whatsapp.ts b/src/config/zod-schema.providers-whatsapp.ts index 962bb5619..171a7c476 100644 --- a/src/config/zod-schema.providers-whatsapp.ts +++ b/src/config/zod-schema.providers-whatsapp.ts @@ -46,6 +46,7 @@ export const WhatsAppAccountSchema = z group: z.enum(["always", "mentions", "never"]).optional().default("mentions"), }) .optional(), + debounceMs: z.number().int().nonnegative().optional().default(0), }) .superRefine((value, ctx) => { if (value.dmPolicy !== "open") return; @@ -101,6 +102,7 @@ export const WhatsAppConfigSchema = z group: z.enum(["always", "mentions", "never"]).optional().default("mentions"), }) .optional(), + debounceMs: z.number().int().nonnegative().optional().default(0), }) .superRefine((value, ctx) => { if (value.dmPolicy !== "open") return; diff --git a/src/web/accounts.ts b/src/web/accounts.ts index 618b57d2d..7a4467390 100644 --- a/src/web/accounts.ts +++ b/src/web/accounts.ts @@ -26,6 +26,7 @@ export type ResolvedWhatsAppAccount = { blockStreaming?: boolean; ackReaction?: WhatsAppAccountConfig["ackReaction"]; groups?: WhatsAppAccountConfig["groups"]; + debounceMs?: number; }; function listConfiguredAccountIds(cfg: ClawdbotConfig): string[] { @@ -153,6 +154,7 @@ export function resolveWhatsAppAccount(params: { blockStreaming: accountCfg?.blockStreaming ?? rootCfg?.blockStreaming, ackReaction: accountCfg?.ackReaction ?? rootCfg?.ackReaction, groups: accountCfg?.groups ?? rootCfg?.groups, + debounceMs: accountCfg?.debounceMs ?? rootCfg?.debounceMs, }; } diff --git a/src/web/auto-reply/monitor.ts b/src/web/auto-reply/monitor.ts index c8d139032..771d5ade0 100644 --- a/src/web/auto-reply/monitor.ts +++ b/src/web/auto-reply/monitor.ts @@ -175,6 +175,7 @@ export async function monitorWebChannel( authDir: account.authDir, mediaMaxMb: account.mediaMaxMb, sendReadReceipts: account.sendReadReceipts, + debounceMs: tuning.debounceMs ?? account.debounceMs, onMessage: async (msg: WebInboundMsg) => { handledMessages += 1; lastMessageAt = Date.now(); diff --git a/src/web/auto-reply/types.ts b/src/web/auto-reply/types.ts index db135f4c8..cb6ce4ce4 100644 --- a/src/web/auto-reply/types.ts +++ b/src/web/auto-reply/types.ts @@ -30,4 +30,6 @@ export type WebMonitorTuning = { statusSink?: (status: WebChannelStatus) => void; /** WhatsApp account id. Default: "default". */ accountId?: string; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender. */ + debounceMs?: number; }; diff --git a/src/web/inbound/monitor.ts b/src/web/inbound/monitor.ts index 3c92fe4a2..a2f7991ff 100644 --- a/src/web/inbound/monitor.ts +++ b/src/web/inbound/monitor.ts @@ -28,6 +28,8 @@ export async function monitorWebInbox(options: { mediaMaxMb?: number; /** Send read receipts for incoming messages (default true). */ sendReadReceipts?: boolean; + /** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */ + debounceMs?: number; }) { const inboundLogger = getChildLogger({ module: "web-inbound" }); const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound"); @@ -56,6 +58,27 @@ export async function monitorWebInbox(options: { const selfJid = sock.user?.id; const selfE164 = selfJid ? jidToE164(selfJid) : null; + // Message batching for debounce + const debounceWindowMs = options.debounceMs ?? 0; + const messageBuffer = new Map< + string, + { messages: WebInboundMessage[]; timeout: ReturnType | null } + >(); + + const processBufferedMessages = async (key: string) => { + const buffered = messageBuffer.get(key); + if (!buffered) return; + const { messages } = buffered; + messageBuffer.delete(key); + if (messages.length === 0) return; + if (messages.length === 1) { + await options.onMessage(messages[0]); + return; + } + const combinedBody = messages.map((m) => m.body).join("\n"); + const combinedMessage: WebInboundMessage = { ...messages[0], body: combinedBody }; + await options.onMessage(combinedMessage); + }; const groupMetaCache = new Map< string, { subject?: string; participants?: string[]; expires: number } @@ -217,37 +240,57 @@ export async function monitorWebInbox(options: { { from, to: selfE164 ?? "me", body, mediaPath, mediaType, timestamp }, "inbound message", ); + const inboundMessage: WebInboundMessage = { + id, + from, + conversationId: from, + to: selfE164 ?? "me", + accountId: access.resolvedAccountId, + body, + pushName: senderName, + timestamp, + chatType: group ? "group" : "direct", + chatId: remoteJid, + senderJid: participantJid, + senderE164: senderE164 ?? undefined, + senderName, + replyToId: replyContext?.id, + replyToBody: replyContext?.body, + replyToSender: replyContext?.sender, + groupSubject, + groupParticipants, + mentionedJids: mentionedJids ?? undefined, + selfJid, + selfE164, + location: location ?? undefined, + sendComposing, + reply, + sendMedia, + mediaPath, + mediaType, + }; try { const task = Promise.resolve( - options.onMessage({ - id, - from, - conversationId: from, - to: selfE164 ?? "me", - accountId: access.resolvedAccountId, - body, - pushName: senderName, - timestamp, - chatType: group ? "group" : "direct", - chatId: remoteJid, - senderJid: participantJid, - senderE164: senderE164 ?? undefined, - senderName, - replyToId: replyContext?.id, - replyToBody: replyContext?.body, - replyToSender: replyContext?.sender, - groupSubject, - groupParticipants, - mentionedJids: mentionedJids ?? undefined, - selfJid, - selfE164, - location: location ?? undefined, - sendComposing, - reply, - sendMedia, - mediaPath, - mediaType, - }), + (async () => { + // Apply debounce batching if configured + if (debounceWindowMs > 0) { + const bufferKey = group ? remoteJid : from; + const existing = messageBuffer.get(bufferKey); + if (existing) { + if (existing.timeout) clearTimeout(existing.timeout); + existing.messages.push(inboundMessage); + existing.timeout = setTimeout(() => processBufferedMessages(bufferKey), debounceWindowMs); + } else { + messageBuffer.set(bufferKey, { + messages: [inboundMessage], + timeout: setTimeout(() => processBufferedMessages(bufferKey), debounceWindowMs), + }); + } + } else { + // No debouncing, process immediately + await options.onMessage(inboundMessage); + } + })(), ); void task.catch((err) => { inboundLogger.error({ error: String(err) }, "failed handling inbound web message");