From 1561b1c491431ae0bd507889eba64c10417468a9 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 15 Jan 2026 23:06:22 +0000 Subject: [PATCH] fix: debounce inbound messages across channels (#971) (thanks @juanpablodlc) --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/compact.ts | 10 ++---- src/agents/tools/discord-actions.test.ts | 16 ++++----- src/agents/tools/slack-actions.test.ts | 4 ++- ...uick-model-picker-grouped-by-model.test.ts | 4 +-- src/discord/monitor/message-handler.ts | 34 +++++++++++-------- src/imessage/monitor/monitor-provider.ts | 3 +- src/signal/monitor/event-handler.ts | 6 ++-- src/slack/monitor/message-handler.ts | 2 +- src/web/inbound/monitor.ts | 2 +- 10 files changed, 41 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12f11f298..47d767ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - Agents: add Current Date & Time system prompt section with configurable time format (auto/12/24). - Tools: normalize Slack/Discord message timestamps with `timestampMs`/`timestampUtc` while keeping raw provider fields. - Docs: add Date & Time guide and update prompt/timezone configuration docs. +- Messages: debounce rapid inbound messages across channels with per-connector overrides. (#971) — thanks @juanpablodlc. - Fix: guard model fallback against undefined provider/model values. (#954) — thanks @roshanasingh4. - Memory: make `node-llama-cpp` an optional dependency (avoid Node 25 install failures) and improve local-embeddings fallback/errors. - Browser: add `snapshot refs=aria` (Playwright aria-ref ids) for self-resolving refs across `snapshot` → `act`. diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 3cffa1c08..56dc84ba8 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -54,11 +54,7 @@ import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "./system- import { splitSdkTools } from "./tool-split.js"; import type { EmbeddedPiCompactResult } from "./types.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; -import { - describeUnknownError, - mapThinkingLevel, - resolveExecToolDefaults, -} from "./utils.js"; +import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js"; export async function compactEmbeddedPiSession(params: { sessionId: string; @@ -227,9 +223,7 @@ export async function compactEmbeddedPiSession(params: { const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated); const reasoningTagHint = isReasoningTagProvider(provider); const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone); - const userTimeFormat = resolveUserTimeFormat( - params.config?.agents?.defaults?.timeFormat, - ); + const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat); const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat); const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({ sessionKey: params.sessionKey, diff --git a/src/agents/tools/discord-actions.test.ts b/src/agents/tools/discord-actions.test.ts index 74059ba57..3eead3f40 100644 --- a/src/agents/tools/discord-actions.test.ts +++ b/src/agents/tools/discord-actions.test.ts @@ -138,16 +138,16 @@ describe("handleDiscordMessagingAction", () => { }); it("adds normalized timestamps to readMessages payloads", async () => { - readMessagesDiscord.mockResolvedValueOnce([ - { id: "1", timestamp: "2026-01-15T10:00:00.000Z" }, - ]); + readMessagesDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T10:00:00.000Z" }]); const result = await handleDiscordMessagingAction( "readMessages", { channelId: "C1" }, enableAllActions, ); - const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + messages: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Date.parse("2026-01-15T10:00:00.000Z"); expect(payload.messages[0].timestampMs).toBe(expectedMs); @@ -173,16 +173,16 @@ describe("handleDiscordMessagingAction", () => { }); it("adds normalized timestamps to listPins payloads", async () => { - listPinsDiscord.mockResolvedValueOnce([ - { id: "1", timestamp: "2026-01-15T12:00:00.000Z" }, - ]); + listPinsDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T12:00:00.000Z" }]); const result = await handleDiscordMessagingAction( "listPins", { channelId: "C1" }, enableAllActions, ); - const payload = result.details as { pins: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + pins: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Date.parse("2026-01-15T12:00:00.000Z"); expect(payload.pins[0].timestampMs).toBe(expectedMs); diff --git a/src/agents/tools/slack-actions.test.ts b/src/agents/tools/slack-actions.test.ts index f9ffe72f0..611721e02 100644 --- a/src/agents/tools/slack-actions.test.ts +++ b/src/agents/tools/slack-actions.test.ts @@ -334,7 +334,9 @@ describe("handleSlackAction", () => { }); const result = await handleSlackAction({ action: "readMessages", channelId: "C1" }, cfg); - const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> }; + const payload = result.details as { + messages: Array<{ timestampMs?: number; timestampUtc?: string }>; + }; const expectedMs = Math.round(1735689600.456 * 1000); expect(payload.messages[0].timestampMs).toBe(expectedMs); diff --git a/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts b/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts index 9fc8eca80..1fd21f2f2 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.shows-quick-model-picker-grouped-by-model.test.ts @@ -263,9 +263,7 @@ describe("trigger handling", () => { const text = Array.isArray(res) ? res[0]?.text : res?.text; // Selecting the default model shows "reset to default" instead of "set to" - expect(normalizeTestText(text ?? "")).toContain( - "anthropic/claude-opus-4-5", - ); + expect(normalizeTestText(text ?? "")).toContain("anthropic/claude-opus-4-5"); const store = loadSessionStore(cfg.session.store); // When selecting the default, overrides are cleared diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index 12dffd3a5..c4335d724 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -63,24 +63,30 @@ export function createDiscordMessageHandler(params: { onFlush: async (entries) => { const last = entries.at(-1); if (!last) return; - const combinedBaseText = - entries.length === 1 - ? resolveDiscordMessageText(last.data.message, { includeForwarded: false }) - : entries - .map((entry) => - resolveDiscordMessageText(entry.data.message, { includeForwarded: false }), - ) - .filter(Boolean) - .join("\n"); + if (entries.length === 1) { + const ctx = await preflightDiscordMessage({ + ...params, + ackReactionScope, + groupPolicy, + data: last.data, + client: last.client, + }); + if (!ctx) return; + await processDiscordMessage(ctx); + return; + } + const combinedBaseText = entries + .map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false })) + .filter(Boolean) + .join("\n"); const syntheticMessage = { ...last.data.message, content: combinedBaseText, attachments: [], - message_snapshots: [], - messageSnapshots: [], + message_snapshots: (last.data.message as { message_snapshots?: unknown }).message_snapshots, + messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots, rawData: { ...(last.data.message as { rawData?: Record }).rawData, - message_snapshots: [], }, }; const syntheticData: DiscordMessageEvent = { @@ -96,9 +102,7 @@ export function createDiscordMessageHandler(params: { }); if (!ctx) return; if (entries.length > 1) { - const ids = entries - .map((entry) => entry.data.message?.id) - .filter(Boolean) as string[]; + const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[]; if (ids.length > 0) { const ctxBatch = ctx as typeof ctx & { MessageSids?: string[]; diff --git a/src/imessage/monitor/monitor-provider.ts b/src/imessage/monitor/monitor-provider.ts index 06035021e..2918b5fc8 100644 --- a/src/imessage/monitor/monitor-provider.ts +++ b/src/imessage/monitor/monitor-provider.ts @@ -86,7 +86,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P const conversationId = entry.message.chat_id != null ? `chat:${entry.message.chat_id}` - : entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown"; + : (entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown"); return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`; }, shouldDebounce: (entry) => { @@ -119,7 +119,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P }); async function handleMessageNow(message: IMessagePayload) { - const senderRaw = message.sender ?? ""; const sender = senderRaw.trim(); if (!sender) return; diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 16d0526a8..71dd0b00f 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -109,7 +109,9 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { Body: combinedBody, RawBody: entry.bodyText, CommandBody: entry.bodyText, - From: entry.isGroup ? `group:${entry.groupId ?? "unknown"}` : `signal:${entry.senderRecipient}`, + From: entry.isGroup + ? `group:${entry.groupId ?? "unknown"}` + : `signal:${entry.senderRecipient}`, To: signalTo, SessionKey: route.sessionKey, AccountId: route.accountId, @@ -207,7 +209,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { const inboundDebouncer = createInboundDebouncer({ debounceMs: inboundDebounceMs, buildKey: (entry) => { - const conversationId = entry.isGroup ? entry.groupId ?? "unknown" : entry.senderPeerId; + const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId; if (!conversationId || !entry.senderPeerId) return null; return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`; }, diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 4d27b39a2..2926d6ce2 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -45,7 +45,7 @@ export function createSlackMessageHandler(params: { if (!last) return; const combinedText = entries.length === 1 - ? last.message.text ?? "" + ? (last.message.text ?? "") : entries .map((entry) => entry.message.text ?? "") .filter(Boolean) diff --git a/src/web/inbound/monitor.ts b/src/web/inbound/monitor.ts index 59a70b6bd..b6e8a575b 100644 --- a/src/web/inbound/monitor.ts +++ b/src/web/inbound/monitor.ts @@ -66,7 +66,7 @@ export async function monitorWebInbox(options: { buildKey: (msg) => { const senderKey = msg.chatType === "group" - ? msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from + ? (msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from) : msg.from; if (!senderKey) return null; const conversationKey = msg.chatType === "group" ? msg.chatId : msg.from;