openclaw/src/web/inbound/monitor.ts
Nick Sullivan 79f799256d fix(whatsapp): update reaction docs + suppress pairing for reactions
- Update docs: reaction removals now emit events with isRemoval=true
- Pass no-op sendMessage to access control to prevent pairing messages
  being sent when unknown users react (pairing is for messages, not reactions)

(Cursor review)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 12:53:31 -06:00

489 lines
18 KiB
TypeScript

import type { AnyMessageContent, proto, WAMessage } from "@whiskeysockets/baileys";
import { DisconnectReason, isJidGroup } from "@whiskeysockets/baileys";
import { formatLocationText } from "../../channels/location.js";
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import { recordChannelActivity } from "../../infra/channel-activity.js";
import { getChildLogger } from "../../logging/logger.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { saveMediaBuffer } from "../../media/store.js";
import { createInboundDebouncer } from "../../auto-reply/inbound-debounce.js";
import { jidToE164, resolveJidToE164 } from "../../utils.js";
import { createWaSocket, getStatusCode, waitForWaConnection } from "../session.js";
import { checkInboundAccessControl } from "./access-control.js";
import { isRecentInboundMessage } from "./dedupe.js";
import {
describeReplyContext,
extractLocationData,
extractMediaPlaceholder,
extractMentionedJids,
extractText,
} from "./extract.js";
import { downloadInboundMedia } from "./media.js";
import { createWebSendApi } from "./send-api.js";
import type { WebInboundMessage, WebInboundReaction, WebListenerCloseReason } from "./types.js";
export async function monitorWebInbox(options: {
verbose: boolean;
accountId: string;
authDir: string;
onMessage: (msg: WebInboundMessage) => Promise<void>;
/** Called when a reaction is received on a message. */
onReaction?: (reaction: WebInboundReaction) => void;
mediaMaxMb?: number;
/** Send read receipts for incoming messages (default true). */
sendReadReceipts?: boolean;
/** Debounce window (ms) for batching rapid consecutive messages from the same sender (0 to disable). */
debounceMs?: number;
/** Optional debounce gating predicate. */
shouldDebounce?: (msg: WebInboundMessage) => boolean;
}) {
const inboundLogger = getChildLogger({ module: "web-inbound" });
const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound");
const sock = await createWaSocket(false, options.verbose, {
authDir: options.authDir,
});
await waitForWaConnection(sock);
const connectedAtMs = Date.now();
let onCloseResolve: ((reason: WebListenerCloseReason) => void) | null = null;
const onClose = new Promise<WebListenerCloseReason>((resolve) => {
onCloseResolve = resolve;
});
const resolveClose = (reason: WebListenerCloseReason) => {
if (!onCloseResolve) return;
const resolver = onCloseResolve;
onCloseResolve = null;
resolver(reason);
};
try {
await sock.sendPresenceUpdate("available");
if (shouldLogVerbose()) logVerbose("Sent global 'available' presence on connect");
} catch (err) {
logVerbose(`Failed to send 'available' presence on connect: ${String(err)}`);
}
const selfJid = sock.user?.id;
const selfE164 = selfJid ? jidToE164(selfJid) : null;
const debouncer = createInboundDebouncer<WebInboundMessage>({
debounceMs: options.debounceMs ?? 0,
buildKey: (msg) => {
const senderKey =
msg.chatType === "group"
? (msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from)
: msg.from;
if (!senderKey) return null;
const conversationKey = msg.chatType === "group" ? msg.chatId : msg.from;
return `${msg.accountId}:${conversationKey}:${senderKey}`;
},
shouldDebounce: options.shouldDebounce,
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) return;
if (entries.length === 1) {
await options.onMessage(last);
return;
}
const mentioned = new Set<string>();
for (const entry of entries) {
for (const jid of entry.mentionedJids ?? []) mentioned.add(jid);
}
const combinedBody = entries
.map((entry) => entry.body)
.filter(Boolean)
.join("\n");
const combinedMessage: WebInboundMessage = {
...last,
body: combinedBody,
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
};
await options.onMessage(combinedMessage);
},
onError: (err) => {
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
},
});
const groupMetaCache = new Map<
string,
{ subject?: string; participants?: string[]; expires: number }
>();
const GROUP_META_TTL_MS = 5 * 60 * 1000; // 5 minutes
const lidLookup = sock.signalRepository?.lidMapping;
const resolveInboundJid = async (jid: string | null | undefined): Promise<string | null> =>
resolveJidToE164(jid, { authDir: options.authDir, lidLookup });
const getGroupMeta = async (jid: string) => {
const cached = groupMetaCache.get(jid);
if (cached && cached.expires > Date.now()) return cached;
try {
const meta = await sock.groupMetadata(jid);
const participants =
(
await Promise.all(
meta.participants?.map(async (p) => {
const mapped = await resolveInboundJid(p.id);
return mapped ?? p.id;
}) ?? [],
)
).filter(Boolean) ?? [];
const entry = {
subject: meta.subject,
participants,
expires: Date.now() + GROUP_META_TTL_MS,
};
groupMetaCache.set(jid, entry);
return entry;
} catch (err) {
logVerbose(`Failed to fetch group metadata for ${jid}: ${String(err)}`);
return { expires: Date.now() + GROUP_META_TTL_MS };
}
};
const handleMessagesUpsert = async (upsert: { type?: string; messages?: Array<WAMessage> }) => {
if (upsert.type !== "notify" && upsert.type !== "append") return;
for (const msg of upsert.messages ?? []) {
recordChannelActivity({
channel: "whatsapp",
accountId: options.accountId,
direction: "inbound",
});
const id = msg.key?.id ?? undefined;
const remoteJid = msg.key?.remoteJid;
if (!remoteJid) continue;
if (remoteJid.endsWith("@status") || remoteJid.endsWith("@broadcast")) continue;
const group = isJidGroup(remoteJid) === true;
if (id) {
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
if (isRecentInboundMessage(dedupeKey)) continue;
}
const participantJid = msg.key?.participant ?? undefined;
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
if (!from) continue;
const senderE164 = group
? participantJid
? await resolveInboundJid(participantJid)
: null
: from;
let groupSubject: string | undefined;
let groupParticipants: string[] | undefined;
if (group) {
const meta = await getGroupMeta(remoteJid);
groupSubject = meta.subject;
groupParticipants = meta.participants;
}
const messageTimestampMs = msg.messageTimestamp
? Number(msg.messageTimestamp) * 1000
: undefined;
const access = await checkInboundAccessControl({
accountId: options.accountId,
from,
selfE164,
senderE164,
group,
pushName: msg.pushName ?? undefined,
isFromMe: Boolean(msg.key?.fromMe),
messageTimestampMs,
connectedAtMs,
sock: { sendMessage: (jid, content) => sock.sendMessage(jid, content) },
remoteJid,
});
if (!access.allowed) continue;
if (id && !access.isSelfChat && options.sendReadReceipts !== false) {
const participant = msg.key?.participant;
try {
await sock.readMessages([{ remoteJid, id, participant, fromMe: false }]);
if (shouldLogVerbose()) {
const suffix = participant ? ` (participant ${participant})` : "";
logVerbose(`Marked message ${id} as read for ${remoteJid}${suffix}`);
}
} catch (err) {
logVerbose(`Failed to mark message ${id} read: ${String(err)}`);
}
} else if (id && access.isSelfChat && shouldLogVerbose()) {
// Self-chat mode: never auto-send read receipts (blue ticks) on behalf of the owner.
logVerbose(`Self-chat mode: skipping read receipt for ${id}`);
}
// If this is history/offline catch-up, mark read above but skip auto-reply.
if (upsert.type === "append") continue;
const location = extractLocationData(msg.message ?? undefined);
const locationText = location ? formatLocationText(location) : undefined;
let body = extractText(msg.message ?? undefined);
if (locationText) {
body = [body, locationText].filter(Boolean).join("\n").trim();
}
if (!body) {
body = extractMediaPlaceholder(msg.message ?? undefined);
if (!body) continue;
}
const replyContext = describeReplyContext(msg.message as proto.IMessage | undefined);
let mediaPath: string | undefined;
let mediaType: string | undefined;
try {
const inboundMedia = await downloadInboundMedia(msg as proto.IWebMessageInfo, sock);
if (inboundMedia) {
const maxMb =
typeof options.mediaMaxMb === "number" && options.mediaMaxMb > 0
? options.mediaMaxMb
: 50;
const maxBytes = maxMb * 1024 * 1024;
const saved = await saveMediaBuffer(
inboundMedia.buffer,
inboundMedia.mimetype,
"inbound",
maxBytes,
);
mediaPath = saved.path;
mediaType = inboundMedia.mimetype;
}
} catch (err) {
logVerbose(`Inbound media download failed: ${String(err)}`);
}
const chatJid = remoteJid;
const sendComposing = async () => {
try {
await sock.sendPresenceUpdate("composing", chatJid);
} catch (err) {
logVerbose(`Presence update failed: ${String(err)}`);
}
};
const reply = async (text: string) => {
await sock.sendMessage(chatJid, { text });
};
const sendMedia = async (payload: AnyMessageContent) => {
await sock.sendMessage(chatJid, payload);
};
const timestamp = messageTimestampMs;
const mentionedJids = extractMentionedJids(msg.message as proto.IMessage | undefined);
const senderName = msg.pushName ?? undefined;
inboundLogger.info(
{ from, to: selfE164 ?? "me", body, mediaPath, mediaType, timestamp },
"inbound message",
);
const inboundMessage: WebInboundMessage = {
id,
from,
conversationId: from,
to: selfE164 ?? "me",
accountId: access.resolvedAccountId,
body,
pushName: senderName,
timestamp,
chatType: group ? "group" : "direct",
chatId: remoteJid,
senderJid: participantJid,
senderE164: senderE164 ?? undefined,
senderName,
replyToId: replyContext?.id,
replyToBody: replyContext?.body,
replyToSender: replyContext?.sender,
replyToSenderJid: replyContext?.senderJid,
replyToSenderE164: replyContext?.senderE164,
groupSubject,
groupParticipants,
mentionedJids: mentionedJids ?? undefined,
selfJid,
selfE164,
location: location ?? undefined,
sendComposing,
reply,
sendMedia,
mediaPath,
mediaType,
};
try {
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
void task.catch((err) => {
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
});
} catch (err) {
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
inboundConsoleLog.error(`Failed handling inbound web message: ${String(err)}`);
}
}
};
sock.ev.on("messages.upsert", handleMessagesUpsert);
// Baileys emits messages.reaction when someone reacts to a message.
const handleMessagesReaction = async (
reactions: Array<{
key: proto.IMessageKey;
reaction: { text?: string; key?: proto.IMessageKey };
}>,
) => {
if (!options.onReaction) return;
try {
for (const entry of reactions) {
try {
const targetKey = entry.key;
const reactionKey = entry.reaction?.key;
const messageId = targetKey?.id;
if (!messageId) continue;
const chatJid = targetKey.remoteJid;
if (!chatJid) continue;
if (chatJid.endsWith("@status") || chatJid.endsWith("@broadcast")) continue;
const emoji = entry.reaction?.text ?? "";
const isRemoval = !emoji;
const group = isJidGroup(chatJid) === true;
// Determine who sent the reaction:
// - fromMe=true → we reacted (use selfJid)
// - Otherwise: use reaction.key metadata, fallback to chatJid for DMs
let senderJid: string | undefined;
if (reactionKey?.fromMe) {
senderJid = selfJid ?? undefined;
} else {
// Prefer explicit sender info from reaction.key, fall back to chatJid for DMs
senderJid =
reactionKey?.participant ?? reactionKey?.remoteJid ?? (group ? undefined : chatJid);
}
const senderE164 = senderJid ? await resolveInboundJid(senderJid) : null;
// Gate reactions by the same access controls as messages (skip for our own reactions)
const isOwnReaction = Boolean(reactionKey?.fromMe);
if (!isOwnReaction) {
const from = group ? chatJid : await resolveInboundJid(chatJid);
if (!from) continue;
// No-op sendMessage: reactions shouldn't trigger pairing replies
const access = await checkInboundAccessControl({
accountId: options.accountId,
from,
selfE164,
senderE164,
group,
isFromMe: false,
connectedAtMs,
sock: { sendMessage: async () => ({}) },
remoteJid: chatJid,
});
if (!access.allowed) {
inboundLogger.debug(
{ chatJid, senderJid, group },
"reaction blocked by access control",
);
continue;
}
}
const chatType = group ? "group" : "direct";
inboundLogger.info(
{
emoji: emoji || "(removed)",
messageId,
chatJid,
chatType,
senderJid,
isRemoval,
accountId: options.accountId,
},
isRemoval ? "inbound reaction removed" : "inbound reaction added",
);
options.onReaction({
messageId,
emoji,
isRemoval,
chatJid,
chatType,
accountId: options.accountId,
senderJid: senderJid ?? undefined,
senderE164: senderE164 ?? undefined,
reactedToFromMe: targetKey.fromMe ?? undefined,
timestamp: Date.now(),
});
} catch (err) {
inboundLogger.error(
{
error: String(err),
messageId: entry.key?.id,
chatJid: entry.key?.remoteJid,
},
"failed handling inbound reaction",
);
}
}
} catch (outerErr) {
inboundLogger.error({ error: String(outerErr) }, "reaction handler crashed");
}
};
sock.ev.on("messages.reaction", handleMessagesReaction as (...args: unknown[]) => void);
const handleConnectionUpdate = (
update: Partial<import("@whiskeysockets/baileys").ConnectionState>,
) => {
try {
if (update.connection === "close") {
const status = getStatusCode(update.lastDisconnect?.error);
resolveClose({
status,
isLoggedOut: status === DisconnectReason.loggedOut,
error: update.lastDisconnect?.error,
});
}
} catch (err) {
inboundLogger.error({ error: String(err) }, "connection.update handler error");
resolveClose({ status: undefined, isLoggedOut: false, error: err });
}
};
sock.ev.on("connection.update", handleConnectionUpdate);
const sendApi = createWebSendApi({
sock: {
sendMessage: (jid: string, content: AnyMessageContent) => sock.sendMessage(jid, content),
sendPresenceUpdate: (presence, jid?: string) => sock.sendPresenceUpdate(presence, jid),
},
defaultAccountId: options.accountId,
});
return {
close: async () => {
try {
const ev = sock.ev as unknown as {
off?: (event: string, listener: (...args: unknown[]) => void) => void;
removeListener?: (event: string, listener: (...args: unknown[]) => void) => void;
};
const messagesUpsertHandler = handleMessagesUpsert as unknown as (
...args: unknown[]
) => void;
const messagesReactionHandler = handleMessagesReaction as unknown as (
...args: unknown[]
) => void;
const connectionUpdateHandler = handleConnectionUpdate as unknown as (
...args: unknown[]
) => void;
if (typeof ev.off === "function") {
ev.off("messages.upsert", messagesUpsertHandler);
ev.off("messages.reaction", messagesReactionHandler);
ev.off("connection.update", connectionUpdateHandler);
} else if (typeof ev.removeListener === "function") {
ev.removeListener("messages.upsert", messagesUpsertHandler);
ev.removeListener("messages.reaction", messagesReactionHandler);
ev.removeListener("connection.update", connectionUpdateHandler);
}
sock.ws?.close();
} catch (err) {
logVerbose(`Socket close failed: ${String(err)}`);
}
},
onClose,
signalClose: (reason?: WebListenerCloseReason) => {
resolveClose(reason ?? { status: undefined, isLoggedOut: false, error: "closed" });
},
// IPC surface (sendMessage/sendPoll/sendReaction/sendComposingTo)
...sendApi,
} as const;
}