diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 2b923b879..d28caedaf 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -29,21 +29,67 @@ export function resolveInboundDebounceMs(params: { return override ?? byChannel ?? base ?? 0; } +export function resolvePeerBots(params: { cfg: MoltbotConfig }): string[] { + const peerBots = params.cfg.messages?.inbound?.peerBots; + if (!Array.isArray(peerBots)) return []; + return peerBots.filter((id): id is string => typeof id === "string" && id.trim().length > 0); +} + +export function resolvePeerTypingDelayMs(params: { cfg: MoltbotConfig }): number { + return resolveMs(params.cfg.messages?.inbound?.peerTypingDelayMs) ?? 3000; +} + +export function resolvePeerTypingMaxRetries(params: { cfg: MoltbotConfig }): number { + return resolveMs(params.cfg.messages?.inbound?.peerTypingMaxRetries) ?? 3; +} + type DebounceBuffer = { items: T[]; timeout: ReturnType | null; }; +export type DebounceFlushContext = { + /** Re-enqueue an item with a custom delay (e.g., for peer typing backoff) */ + requeue: (item: T, delayMs: number) => void; +}; + export function createInboundDebouncer(params: { debounceMs: number; buildKey: (item: T) => string | null | undefined; shouldDebounce?: (item: T) => boolean; - onFlush: (items: T[]) => Promise; + onFlush: (items: T[], ctx: DebounceFlushContext) => Promise; onError?: (err: unknown, items: T[]) => void; }) { const buffers = new Map>(); const debounceMs = Math.max(0, Math.trunc(params.debounceMs)); + const scheduleFlushWithDelay = (key: string, buffer: DebounceBuffer, delayMs: number) => { + if (buffer.timeout) clearTimeout(buffer.timeout); + buffer.timeout = setTimeout(() => { + void flushBuffer(key, buffer); + }, delayMs); + buffer.timeout.unref?.(); + }; + + const requeueItem = (item: T, delayMs: number) => { + const key = params.buildKey(item); + if (!key) return; + + const existing = buffers.get(key); + if (existing) { + existing.items.push(item); + scheduleFlushWithDelay(key, existing, delayMs); + } else { + const buffer: DebounceBuffer = { items: [item], timeout: null }; + buffers.set(key, buffer); + scheduleFlushWithDelay(key, buffer, delayMs); + } + }; + + const flushContext: DebounceFlushContext = { + requeue: requeueItem, + }; + const flushBuffer = async (key: string, buffer: DebounceBuffer) => { buffers.delete(key); if (buffer.timeout) { @@ -52,7 +98,7 @@ export function createInboundDebouncer(params: { } if (buffer.items.length === 0) return; try { - await params.onFlush(buffer.items); + await params.onFlush(buffer.items, flushContext); } catch (err) { params.onError?.(err, buffer.items); } @@ -80,7 +126,7 @@ export function createInboundDebouncer(params: { if (key && buffers.has(key)) { await flushKey(key); } - await params.onFlush([item]); + await params.onFlush([item], flushContext); return; } diff --git a/src/config/types.messages.ts b/src/config/types.messages.ts index 37ef4e942..f33889ccb 100644 --- a/src/config/types.messages.ts +++ b/src/config/types.messages.ts @@ -25,6 +25,12 @@ export type InboundDebounceByProvider = Record; export type InboundDebounceConfig = { debounceMs?: number; byChannel?: InboundDebounceByProvider; + /** Bot user IDs to watch for typing indicators (multi-bot coordination). */ + peerBots?: string[]; + /** Delay (ms) when a peer bot is typing. Default: 3000. */ + peerTypingDelayMs?: number; + /** Max retries when peer keeps typing. Default: 3. */ + peerTypingMaxRetries?: number; }; export type BroadcastStrategy = "parallel" | "sequential"; diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index a7434aed0..29f5b13ae 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -4,7 +4,11 @@ import { hasControlCommand } from "../../auto-reply/command-detection.js"; import { createInboundDebouncer, resolveInboundDebounceMs, + resolvePeerBots, + resolvePeerTypingDelayMs, + resolvePeerTypingMaxRetries, } from "../../auto-reply/inbound-debounce.js"; +import { isPeerTyping } from "./peer-typing.js"; import type { HistoryEntry } from "../../auto-reply/reply/history.js"; import type { ReplyToMode } from "../../config/config.js"; import { danger } from "../../globals.js"; @@ -41,8 +45,14 @@ export function createDiscordMessageHandler(params: { const groupPolicy = params.discordConfig?.groupPolicy ?? "open"; const ackReactionScope = params.cfg.messages?.ackReactionScope ?? "group-mentions"; const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" }); + const peerBotIds = resolvePeerBots({ cfg: params.cfg }); + const peerTypingDelayMs = resolvePeerTypingDelayMs({ cfg: params.cfg }); + const peerTypingMaxRetries = resolvePeerTypingMaxRetries({ cfg: params.cfg }); - const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({ + // Type for entries with optional retry tracking + type EntryWithRetry = { data: DiscordMessageEvent; client: Client; __peerTypingRetries?: number }; + + const debouncer = createInboundDebouncer({ debounceMs, buildKey: (entry) => { const message = entry.data.message; @@ -60,9 +70,21 @@ export function createDiscordMessageHandler(params: { if (!baseText.trim()) return false; return !hasControlCommand(baseText, params.cfg); }, - onFlush: async (entries) => { + onFlush: async (entries, { requeue }) => { const last = entries.at(-1); if (!last) return; + + // Peer typing check: if a configured peer bot is typing, back off and retry + const channelId = last.data.message?.channelId; + if (channelId && peerBotIds.length > 0) { + const retryCount = last.__peerTypingRetries ?? 0; + if (retryCount < peerTypingMaxRetries && isPeerTyping(channelId, peerBotIds)) { + // Re-enqueue with backoff delay + requeue({ ...last, __peerTypingRetries: retryCount + 1 }, peerTypingDelayMs); + return; + } + } + if (entries.length === 1) { const ctx = await preflightDiscordMessage({ ...params, diff --git a/src/discord/monitor/peer-typing.ts b/src/discord/monitor/peer-typing.ts new file mode 100644 index 000000000..62b3cd642 --- /dev/null +++ b/src/discord/monitor/peer-typing.ts @@ -0,0 +1,65 @@ +import { TypingStartListener, type Client } from "@buape/carbon"; + +const TYPING_TTL_MS = 10_000; + +// channelId → Map +const peerTypingState = new Map>(); + +/** + * Listener that tracks typing indicators from configured peer bots. + * Used to implement typing-aware debounce for multi-bot coordination. + */ +export class PeerTypingListener extends TypingStartListener { + constructor(private peerBotIds: Set) { + super(); + } + + async handle(data: { channel_id: string; user_id: string }, _client: Client): Promise { + // Only track typing from configured peer bots + if (!this.peerBotIds.has(data.user_id)) return; + + let channelMap = peerTypingState.get(data.channel_id); + if (!channelMap) { + channelMap = new Map(); + peerTypingState.set(data.channel_id, channelMap); + } + channelMap.set(data.user_id, Date.now() + TYPING_TTL_MS); + } +} + +/** + * Check if any of the specified peer bots are currently typing in the channel. + * Returns true if at least one peer has a non-expired typing indicator. + */ +export function isPeerTyping(channelId: string, peerBotIds: string[]): boolean { + const channelMap = peerTypingState.get(channelId); + if (!channelMap) return false; + + const now = Date.now(); + for (const id of peerBotIds) { + const expiresAt = channelMap.get(id); + if (expiresAt && expiresAt > now) return true; + } + return false; +} + +/** + * Clean up expired typing indicators from the state map. + * Called periodically or on-demand to prevent memory leaks. + */ +export function clearExpiredTyping(): void { + const now = Date.now(); + for (const [channelId, channelMap] of peerTypingState) { + for (const [userId, expiresAt] of channelMap) { + if (expiresAt <= now) channelMap.delete(userId); + } + if (channelMap.size === 0) peerTypingState.delete(channelId); + } +} + +/** + * Get the current typing state for debugging/diagnostics. + */ +export function getPeerTypingState(): Map> { + return peerTypingState; +} diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index 3ab56a478..9fe7a623c 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -34,6 +34,8 @@ import { registerDiscordListener, } from "./listeners.js"; import { createDiscordMessageHandler } from "./message-handler.js"; +import { PeerTypingListener } from "./peer-typing.js"; +import { resolvePeerBots } from "../../auto-reply/inbound-debounce.js"; import { createDiscordCommandArgFallbackButton, createDiscordNativeCommand, @@ -549,6 +551,13 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime.log?.("discord: GuildPresences intent enabled — presence listener registered"); } + // Register peer typing listener for multi-bot coordination + const peerBotIds = resolvePeerBots({ cfg }); + if (peerBotIds.length > 0) { + registerDiscordListener(client.listeners, new PeerTypingListener(new Set(peerBotIds))); + runtime.log?.(`discord: peer typing listener registered for ${peerBotIds.length} bot(s)`); + } + runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`); // Start exec approvals handler after client is ready