feat(discord): add peer typing-aware debounce for multi-bot coordination
When multiple Moltbot instances share a channel, they can now coordinate via Discord typing indicators to avoid response storms. New config options (messages.inbound): - peerBots: array of bot user IDs to watch - peerTypingDelayMs: backoff delay when peer is typing (default: 3000) - peerTypingMaxRetries: max retries before proceeding anyway (default: 3) Implementation: - PeerTypingListener tracks TYPING_START events from configured peers - onFlush checks peer typing state before processing - If peer is typing, message is re-queued with backoff delay - Debouncer now exposes requeue() for custom delay re-enqueueing Fixes race condition where multiple bots respond simultaneously.
This commit is contained in:
parent
4583f88626
commit
a6bbbaab8f
@ -29,21 +29,67 @@ export function resolveInboundDebounceMs(params: {
|
|||||||
return override ?? byChannel ?? base ?? 0;
|
return override ?? byChannel ?? base ?? 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function resolvePeerBots(params: { cfg: MoltbotConfig }): string[] {
|
||||||
|
const peerBots = params.cfg.messages?.inbound?.peerBots;
|
||||||
|
if (!Array.isArray(peerBots)) return [];
|
||||||
|
return peerBots.filter((id): id is string => typeof id === "string" && id.trim().length > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolvePeerTypingDelayMs(params: { cfg: MoltbotConfig }): number {
|
||||||
|
return resolveMs(params.cfg.messages?.inbound?.peerTypingDelayMs) ?? 3000;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolvePeerTypingMaxRetries(params: { cfg: MoltbotConfig }): number {
|
||||||
|
return resolveMs(params.cfg.messages?.inbound?.peerTypingMaxRetries) ?? 3;
|
||||||
|
}
|
||||||
|
|
||||||
type DebounceBuffer<T> = {
|
type DebounceBuffer<T> = {
|
||||||
items: T[];
|
items: T[];
|
||||||
timeout: ReturnType<typeof setTimeout> | null;
|
timeout: ReturnType<typeof setTimeout> | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type DebounceFlushContext<T> = {
|
||||||
|
/** Re-enqueue an item with a custom delay (e.g., for peer typing backoff) */
|
||||||
|
requeue: (item: T, delayMs: number) => void;
|
||||||
|
};
|
||||||
|
|
||||||
export function createInboundDebouncer<T>(params: {
|
export function createInboundDebouncer<T>(params: {
|
||||||
debounceMs: number;
|
debounceMs: number;
|
||||||
buildKey: (item: T) => string | null | undefined;
|
buildKey: (item: T) => string | null | undefined;
|
||||||
shouldDebounce?: (item: T) => boolean;
|
shouldDebounce?: (item: T) => boolean;
|
||||||
onFlush: (items: T[]) => Promise<void>;
|
onFlush: (items: T[], ctx: DebounceFlushContext<T>) => Promise<void>;
|
||||||
onError?: (err: unknown, items: T[]) => void;
|
onError?: (err: unknown, items: T[]) => void;
|
||||||
}) {
|
}) {
|
||||||
const buffers = new Map<string, DebounceBuffer<T>>();
|
const buffers = new Map<string, DebounceBuffer<T>>();
|
||||||
const debounceMs = Math.max(0, Math.trunc(params.debounceMs));
|
const debounceMs = Math.max(0, Math.trunc(params.debounceMs));
|
||||||
|
|
||||||
|
const scheduleFlushWithDelay = (key: string, buffer: DebounceBuffer<T>, delayMs: number) => {
|
||||||
|
if (buffer.timeout) clearTimeout(buffer.timeout);
|
||||||
|
buffer.timeout = setTimeout(() => {
|
||||||
|
void flushBuffer(key, buffer);
|
||||||
|
}, delayMs);
|
||||||
|
buffer.timeout.unref?.();
|
||||||
|
};
|
||||||
|
|
||||||
|
const requeueItem = (item: T, delayMs: number) => {
|
||||||
|
const key = params.buildKey(item);
|
||||||
|
if (!key) return;
|
||||||
|
|
||||||
|
const existing = buffers.get(key);
|
||||||
|
if (existing) {
|
||||||
|
existing.items.push(item);
|
||||||
|
scheduleFlushWithDelay(key, existing, delayMs);
|
||||||
|
} else {
|
||||||
|
const buffer: DebounceBuffer<T> = { items: [item], timeout: null };
|
||||||
|
buffers.set(key, buffer);
|
||||||
|
scheduleFlushWithDelay(key, buffer, delayMs);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const flushContext: DebounceFlushContext<T> = {
|
||||||
|
requeue: requeueItem,
|
||||||
|
};
|
||||||
|
|
||||||
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
|
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
|
||||||
buffers.delete(key);
|
buffers.delete(key);
|
||||||
if (buffer.timeout) {
|
if (buffer.timeout) {
|
||||||
@ -52,7 +98,7 @@ export function createInboundDebouncer<T>(params: {
|
|||||||
}
|
}
|
||||||
if (buffer.items.length === 0) return;
|
if (buffer.items.length === 0) return;
|
||||||
try {
|
try {
|
||||||
await params.onFlush(buffer.items);
|
await params.onFlush(buffer.items, flushContext);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
params.onError?.(err, buffer.items);
|
params.onError?.(err, buffer.items);
|
||||||
}
|
}
|
||||||
@ -80,7 +126,7 @@ export function createInboundDebouncer<T>(params: {
|
|||||||
if (key && buffers.has(key)) {
|
if (key && buffers.has(key)) {
|
||||||
await flushKey(key);
|
await flushKey(key);
|
||||||
}
|
}
|
||||||
await params.onFlush([item]);
|
await params.onFlush([item], flushContext);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -25,6 +25,12 @@ export type InboundDebounceByProvider = Record<string, number>;
|
|||||||
export type InboundDebounceConfig = {
|
export type InboundDebounceConfig = {
|
||||||
debounceMs?: number;
|
debounceMs?: number;
|
||||||
byChannel?: InboundDebounceByProvider;
|
byChannel?: InboundDebounceByProvider;
|
||||||
|
/** Bot user IDs to watch for typing indicators (multi-bot coordination). */
|
||||||
|
peerBots?: string[];
|
||||||
|
/** Delay (ms) when a peer bot is typing. Default: 3000. */
|
||||||
|
peerTypingDelayMs?: number;
|
||||||
|
/** Max retries when peer keeps typing. Default: 3. */
|
||||||
|
peerTypingMaxRetries?: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type BroadcastStrategy = "parallel" | "sequential";
|
export type BroadcastStrategy = "parallel" | "sequential";
|
||||||
|
|||||||
@ -4,7 +4,11 @@ import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
|||||||
import {
|
import {
|
||||||
createInboundDebouncer,
|
createInboundDebouncer,
|
||||||
resolveInboundDebounceMs,
|
resolveInboundDebounceMs,
|
||||||
|
resolvePeerBots,
|
||||||
|
resolvePeerTypingDelayMs,
|
||||||
|
resolvePeerTypingMaxRetries,
|
||||||
} from "../../auto-reply/inbound-debounce.js";
|
} from "../../auto-reply/inbound-debounce.js";
|
||||||
|
import { isPeerTyping } from "./peer-typing.js";
|
||||||
import type { HistoryEntry } from "../../auto-reply/reply/history.js";
|
import type { HistoryEntry } from "../../auto-reply/reply/history.js";
|
||||||
import type { ReplyToMode } from "../../config/config.js";
|
import type { ReplyToMode } from "../../config/config.js";
|
||||||
import { danger } from "../../globals.js";
|
import { danger } from "../../globals.js";
|
||||||
@ -41,8 +45,14 @@ export function createDiscordMessageHandler(params: {
|
|||||||
const groupPolicy = params.discordConfig?.groupPolicy ?? "open";
|
const groupPolicy = params.discordConfig?.groupPolicy ?? "open";
|
||||||
const ackReactionScope = params.cfg.messages?.ackReactionScope ?? "group-mentions";
|
const ackReactionScope = params.cfg.messages?.ackReactionScope ?? "group-mentions";
|
||||||
const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" });
|
const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" });
|
||||||
|
const peerBotIds = resolvePeerBots({ cfg: params.cfg });
|
||||||
|
const peerTypingDelayMs = resolvePeerTypingDelayMs({ cfg: params.cfg });
|
||||||
|
const peerTypingMaxRetries = resolvePeerTypingMaxRetries({ cfg: params.cfg });
|
||||||
|
|
||||||
const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({
|
// Type for entries with optional retry tracking
|
||||||
|
type EntryWithRetry = { data: DiscordMessageEvent; client: Client; __peerTypingRetries?: number };
|
||||||
|
|
||||||
|
const debouncer = createInboundDebouncer<EntryWithRetry>({
|
||||||
debounceMs,
|
debounceMs,
|
||||||
buildKey: (entry) => {
|
buildKey: (entry) => {
|
||||||
const message = entry.data.message;
|
const message = entry.data.message;
|
||||||
@ -60,9 +70,21 @@ export function createDiscordMessageHandler(params: {
|
|||||||
if (!baseText.trim()) return false;
|
if (!baseText.trim()) return false;
|
||||||
return !hasControlCommand(baseText, params.cfg);
|
return !hasControlCommand(baseText, params.cfg);
|
||||||
},
|
},
|
||||||
onFlush: async (entries) => {
|
onFlush: async (entries, { requeue }) => {
|
||||||
const last = entries.at(-1);
|
const last = entries.at(-1);
|
||||||
if (!last) return;
|
if (!last) return;
|
||||||
|
|
||||||
|
// Peer typing check: if a configured peer bot is typing, back off and retry
|
||||||
|
const channelId = last.data.message?.channelId;
|
||||||
|
if (channelId && peerBotIds.length > 0) {
|
||||||
|
const retryCount = last.__peerTypingRetries ?? 0;
|
||||||
|
if (retryCount < peerTypingMaxRetries && isPeerTyping(channelId, peerBotIds)) {
|
||||||
|
// Re-enqueue with backoff delay
|
||||||
|
requeue({ ...last, __peerTypingRetries: retryCount + 1 }, peerTypingDelayMs);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (entries.length === 1) {
|
if (entries.length === 1) {
|
||||||
const ctx = await preflightDiscordMessage({
|
const ctx = await preflightDiscordMessage({
|
||||||
...params,
|
...params,
|
||||||
|
|||||||
65
src/discord/monitor/peer-typing.ts
Normal file
65
src/discord/monitor/peer-typing.ts
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
import { TypingStartListener, type Client } from "@buape/carbon";
|
||||||
|
|
||||||
|
const TYPING_TTL_MS = 10_000;
|
||||||
|
|
||||||
|
// channelId → Map<userId, expiresAtMs>
|
||||||
|
const peerTypingState = new Map<string, Map<string, number>>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listener that tracks typing indicators from configured peer bots.
|
||||||
|
* Used to implement typing-aware debounce for multi-bot coordination.
|
||||||
|
*/
|
||||||
|
export class PeerTypingListener extends TypingStartListener {
|
||||||
|
constructor(private peerBotIds: Set<string>) {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
async handle(data: { channel_id: string; user_id: string }, _client: Client): Promise<void> {
|
||||||
|
// Only track typing from configured peer bots
|
||||||
|
if (!this.peerBotIds.has(data.user_id)) return;
|
||||||
|
|
||||||
|
let channelMap = peerTypingState.get(data.channel_id);
|
||||||
|
if (!channelMap) {
|
||||||
|
channelMap = new Map();
|
||||||
|
peerTypingState.set(data.channel_id, channelMap);
|
||||||
|
}
|
||||||
|
channelMap.set(data.user_id, Date.now() + TYPING_TTL_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if any of the specified peer bots are currently typing in the channel.
|
||||||
|
* Returns true if at least one peer has a non-expired typing indicator.
|
||||||
|
*/
|
||||||
|
export function isPeerTyping(channelId: string, peerBotIds: string[]): boolean {
|
||||||
|
const channelMap = peerTypingState.get(channelId);
|
||||||
|
if (!channelMap) return false;
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
for (const id of peerBotIds) {
|
||||||
|
const expiresAt = channelMap.get(id);
|
||||||
|
if (expiresAt && expiresAt > now) return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clean up expired typing indicators from the state map.
|
||||||
|
* Called periodically or on-demand to prevent memory leaks.
|
||||||
|
*/
|
||||||
|
export function clearExpiredTyping(): void {
|
||||||
|
const now = Date.now();
|
||||||
|
for (const [channelId, channelMap] of peerTypingState) {
|
||||||
|
for (const [userId, expiresAt] of channelMap) {
|
||||||
|
if (expiresAt <= now) channelMap.delete(userId);
|
||||||
|
}
|
||||||
|
if (channelMap.size === 0) peerTypingState.delete(channelId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current typing state for debugging/diagnostics.
|
||||||
|
*/
|
||||||
|
export function getPeerTypingState(): Map<string, Map<string, number>> {
|
||||||
|
return peerTypingState;
|
||||||
|
}
|
||||||
@ -34,6 +34,8 @@ import {
|
|||||||
registerDiscordListener,
|
registerDiscordListener,
|
||||||
} from "./listeners.js";
|
} from "./listeners.js";
|
||||||
import { createDiscordMessageHandler } from "./message-handler.js";
|
import { createDiscordMessageHandler } from "./message-handler.js";
|
||||||
|
import { PeerTypingListener } from "./peer-typing.js";
|
||||||
|
import { resolvePeerBots } from "../../auto-reply/inbound-debounce.js";
|
||||||
import {
|
import {
|
||||||
createDiscordCommandArgFallbackButton,
|
createDiscordCommandArgFallbackButton,
|
||||||
createDiscordNativeCommand,
|
createDiscordNativeCommand,
|
||||||
@ -549,6 +551,13 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
|||||||
runtime.log?.("discord: GuildPresences intent enabled — presence listener registered");
|
runtime.log?.("discord: GuildPresences intent enabled — presence listener registered");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register peer typing listener for multi-bot coordination
|
||||||
|
const peerBotIds = resolvePeerBots({ cfg });
|
||||||
|
if (peerBotIds.length > 0) {
|
||||||
|
registerDiscordListener(client.listeners, new PeerTypingListener(new Set(peerBotIds)));
|
||||||
|
runtime.log?.(`discord: peer typing listener registered for ${peerBotIds.length} bot(s)`);
|
||||||
|
}
|
||||||
|
|
||||||
runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`);
|
runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`);
|
||||||
|
|
||||||
// Start exec approvals handler after client is ready
|
// Start exec approvals handler after client is ready
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user