feat(whatsapp): add debounceMs for batching rapid messages
Add a `debounceMs` configuration option to WhatsApp channel settings
that batches rapid consecutive messages from the same sender into a
single response. This prevents triggering separate agent runs for
each message when a user sends multiple short messages in quick
succession (e.g., "Hey!", "how are you?", "I was wondering...").
Changes:
- Add `debounceMs` config to WhatsAppConfig and WhatsAppAccountConfig
- Implement message buffering in `monitorWebInbox` with:
- Map-based buffer keyed by sender (DM) or chat ID (groups)
- Debounce timer that resets on each new message
- Message combination with newline separator
- Single message optimization (no modification if only one message)
- Wire `debounceMs` through account resolution and monitor tuning
- Add UI hints and schema documentation
Usage example:
{
"channels": {
"whatsapp": {
"debounceMs": 5000 // 5 second window
}
}
}
Default behavior: `debounceMs: 0` (disabled by default)
Verified: All existing tests pass (3204 tests), TypeScript compilation
succeeds with no errors.
Implemented with assistance from AI coding tools.
Closes #967
This commit is contained in:
parent
d00f2d9c0c
commit
c609be185e
@ -189,6 +189,7 @@ const FIELD_LABELS: Record<string, string> = {
|
||||
"channels.telegram.timeoutSeconds": "Telegram API Timeout (seconds)",
|
||||
"channels.whatsapp.dmPolicy": "WhatsApp DM Policy",
|
||||
"channels.whatsapp.selfChatMode": "WhatsApp Self-Phone Mode",
|
||||
"channels.whatsapp.debounceMs": "WhatsApp Message Debounce (ms)",
|
||||
"channels.signal.dmPolicy": "Signal DM Policy",
|
||||
"channels.imessage.dmPolicy": "iMessage DM Policy",
|
||||
"channels.discord.dm.policy": "Discord DM Policy",
|
||||
@ -348,6 +349,8 @@ const FIELD_HELP: Record<string, string> = {
|
||||
"channels.whatsapp.dmPolicy":
|
||||
'Direct message access control ("pairing" recommended). "open" requires channels.whatsapp.allowFrom=["*"].',
|
||||
"channels.whatsapp.selfChatMode": "Same-phone setup (bot uses your personal WhatsApp number).",
|
||||
"channels.whatsapp.debounceMs":
|
||||
"Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable).",
|
||||
"channels.signal.dmPolicy":
|
||||
'Direct message access control ("pairing" recommended). "open" requires channels.signal.allowFrom=["*"].',
|
||||
"channels.imessage.dmPolicy":
|
||||
|
||||
@ -75,6 +75,8 @@ export type WhatsAppConfig = {
|
||||
*/
|
||||
group?: "always" | "mentions" | "never";
|
||||
};
|
||||
/** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */
|
||||
debounceMs?: number;
|
||||
};
|
||||
|
||||
export type WhatsAppAccountConfig = {
|
||||
@ -131,4 +133,6 @@ export type WhatsAppAccountConfig = {
|
||||
*/
|
||||
group?: "always" | "mentions" | "never";
|
||||
};
|
||||
/** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */
|
||||
debounceMs?: number;
|
||||
};
|
||||
|
||||
@ -46,6 +46,7 @@ export const WhatsAppAccountSchema = z
|
||||
group: z.enum(["always", "mentions", "never"]).optional().default("mentions"),
|
||||
})
|
||||
.optional(),
|
||||
debounceMs: z.number().int().nonnegative().optional().default(0),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.dmPolicy !== "open") return;
|
||||
@ -101,6 +102,7 @@ export const WhatsAppConfigSchema = z
|
||||
group: z.enum(["always", "mentions", "never"]).optional().default("mentions"),
|
||||
})
|
||||
.optional(),
|
||||
debounceMs: z.number().int().nonnegative().optional().default(0),
|
||||
})
|
||||
.superRefine((value, ctx) => {
|
||||
if (value.dmPolicy !== "open") return;
|
||||
|
||||
@ -26,6 +26,7 @@ export type ResolvedWhatsAppAccount = {
|
||||
blockStreaming?: boolean;
|
||||
ackReaction?: WhatsAppAccountConfig["ackReaction"];
|
||||
groups?: WhatsAppAccountConfig["groups"];
|
||||
debounceMs?: number;
|
||||
};
|
||||
|
||||
function listConfiguredAccountIds(cfg: ClawdbotConfig): string[] {
|
||||
@ -153,6 +154,7 @@ export function resolveWhatsAppAccount(params: {
|
||||
blockStreaming: accountCfg?.blockStreaming ?? rootCfg?.blockStreaming,
|
||||
ackReaction: accountCfg?.ackReaction ?? rootCfg?.ackReaction,
|
||||
groups: accountCfg?.groups ?? rootCfg?.groups,
|
||||
debounceMs: accountCfg?.debounceMs ?? rootCfg?.debounceMs,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -175,6 +175,7 @@ export async function monitorWebChannel(
|
||||
authDir: account.authDir,
|
||||
mediaMaxMb: account.mediaMaxMb,
|
||||
sendReadReceipts: account.sendReadReceipts,
|
||||
debounceMs: tuning.debounceMs ?? account.debounceMs,
|
||||
onMessage: async (msg: WebInboundMsg) => {
|
||||
handledMessages += 1;
|
||||
lastMessageAt = Date.now();
|
||||
|
||||
@ -30,4 +30,6 @@ export type WebMonitorTuning = {
|
||||
statusSink?: (status: WebChannelStatus) => void;
|
||||
/** WhatsApp account id. Default: "default". */
|
||||
accountId?: string;
|
||||
/** Debounce window (ms) for batching rapid consecutive messages from the same sender. */
|
||||
debounceMs?: number;
|
||||
};
|
||||
|
||||
@ -28,6 +28,8 @@ export async function monitorWebInbox(options: {
|
||||
mediaMaxMb?: number;
|
||||
/** Send read receipts for incoming messages (default true). */
|
||||
sendReadReceipts?: boolean;
|
||||
/** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */
|
||||
debounceMs?: number;
|
||||
}) {
|
||||
const inboundLogger = getChildLogger({ module: "web-inbound" });
|
||||
const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound");
|
||||
@ -56,6 +58,27 @@ export async function monitorWebInbox(options: {
|
||||
|
||||
const selfJid = sock.user?.id;
|
||||
const selfE164 = selfJid ? jidToE164(selfJid) : null;
|
||||
// Message batching for debounce
|
||||
const debounceWindowMs = options.debounceMs ?? 0;
|
||||
const messageBuffer = new Map<
|
||||
string,
|
||||
{ messages: WebInboundMessage[]; timeout: ReturnType<typeof setTimeout> | null }
|
||||
>();
|
||||
|
||||
const processBufferedMessages = async (key: string) => {
|
||||
const buffered = messageBuffer.get(key);
|
||||
if (!buffered) return;
|
||||
const { messages } = buffered;
|
||||
messageBuffer.delete(key);
|
||||
if (messages.length === 0) return;
|
||||
if (messages.length === 1) {
|
||||
await options.onMessage(messages[0]);
|
||||
return;
|
||||
}
|
||||
const combinedBody = messages.map((m) => m.body).join("\n");
|
||||
const combinedMessage: WebInboundMessage = { ...messages[0], body: combinedBody };
|
||||
await options.onMessage(combinedMessage);
|
||||
};
|
||||
const groupMetaCache = new Map<
|
||||
string,
|
||||
{ subject?: string; participants?: string[]; expires: number }
|
||||
@ -217,37 +240,57 @@ export async function monitorWebInbox(options: {
|
||||
{ from, to: selfE164 ?? "me", body, mediaPath, mediaType, timestamp },
|
||||
"inbound message",
|
||||
);
|
||||
const inboundMessage: WebInboundMessage = {
|
||||
id,
|
||||
from,
|
||||
conversationId: from,
|
||||
to: selfE164 ?? "me",
|
||||
accountId: access.resolvedAccountId,
|
||||
body,
|
||||
pushName: senderName,
|
||||
timestamp,
|
||||
chatType: group ? "group" : "direct",
|
||||
chatId: remoteJid,
|
||||
senderJid: participantJid,
|
||||
senderE164: senderE164 ?? undefined,
|
||||
senderName,
|
||||
replyToId: replyContext?.id,
|
||||
replyToBody: replyContext?.body,
|
||||
replyToSender: replyContext?.sender,
|
||||
groupSubject,
|
||||
groupParticipants,
|
||||
mentionedJids: mentionedJids ?? undefined,
|
||||
selfJid,
|
||||
selfE164,
|
||||
location: location ?? undefined,
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
mediaPath,
|
||||
mediaType,
|
||||
};
|
||||
try {
|
||||
const task = Promise.resolve(
|
||||
options.onMessage({
|
||||
id,
|
||||
from,
|
||||
conversationId: from,
|
||||
to: selfE164 ?? "me",
|
||||
accountId: access.resolvedAccountId,
|
||||
body,
|
||||
pushName: senderName,
|
||||
timestamp,
|
||||
chatType: group ? "group" : "direct",
|
||||
chatId: remoteJid,
|
||||
senderJid: participantJid,
|
||||
senderE164: senderE164 ?? undefined,
|
||||
senderName,
|
||||
replyToId: replyContext?.id,
|
||||
replyToBody: replyContext?.body,
|
||||
replyToSender: replyContext?.sender,
|
||||
groupSubject,
|
||||
groupParticipants,
|
||||
mentionedJids: mentionedJids ?? undefined,
|
||||
selfJid,
|
||||
selfE164,
|
||||
location: location ?? undefined,
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
mediaPath,
|
||||
mediaType,
|
||||
}),
|
||||
(async () => {
|
||||
// Apply debounce batching if configured
|
||||
if (debounceWindowMs > 0) {
|
||||
const bufferKey = group ? remoteJid : from;
|
||||
const existing = messageBuffer.get(bufferKey);
|
||||
if (existing) {
|
||||
if (existing.timeout) clearTimeout(existing.timeout);
|
||||
existing.messages.push(inboundMessage);
|
||||
existing.timeout = setTimeout(() => processBufferedMessages(bufferKey), debounceWindowMs);
|
||||
} else {
|
||||
messageBuffer.set(bufferKey, {
|
||||
messages: [inboundMessage],
|
||||
timeout: setTimeout(() => processBufferedMessages(bufferKey), debounceWindowMs),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// No debouncing, process immediately
|
||||
await options.onMessage(inboundMessage);
|
||||
}
|
||||
})(),
|
||||
);
|
||||
void task.catch((err) => {
|
||||
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
|
||||
|
||||
Loading…
Reference in New Issue
Block a user