Merge 65446b0c48 into da71eaebd2
This commit is contained in:
commit
d3fba9ed8c
@ -29,6 +29,25 @@ export function resolveInboundDebounceMs(params: {
|
||||
return override ?? byChannel ?? base ?? 0;
|
||||
}
|
||||
|
||||
export function resolveChannelDelayMs(params: {
|
||||
cfg: MoltbotConfig;
|
||||
channel: string;
|
||||
overrideMs?: number;
|
||||
}): number {
|
||||
const inbound = params.cfg.messages?.inbound;
|
||||
const override = resolveMs(params.overrideMs);
|
||||
const byChannel = resolveChannelOverride({
|
||||
byChannel: inbound?.channelDelayMsByChannel,
|
||||
channel: params.channel,
|
||||
});
|
||||
const base = resolveMs(inbound?.channelDelayMs);
|
||||
return override ?? byChannel ?? base ?? 0;
|
||||
}
|
||||
|
||||
export function resolveSkipIfPeerRepliedMs(params: { cfg: MoltbotConfig }): number {
|
||||
return resolveMs(params.cfg.messages?.inbound?.skipIfPeerRepliedMs) ?? 0;
|
||||
}
|
||||
|
||||
type DebounceBuffer<T> = {
|
||||
items: T[];
|
||||
timeout: ReturnType<typeof setTimeout> | null;
|
||||
|
||||
@ -25,6 +25,19 @@ export type InboundDebounceByProvider = Record<string, number>;
|
||||
export type InboundDebounceConfig = {
|
||||
debounceMs?: number;
|
||||
byChannel?: InboundDebounceByProvider;
|
||||
/**
|
||||
* Delay (ms) before processing any inbound message.
|
||||
* Useful for multi-bot coordination: gives other bots time to respond first.
|
||||
* The bot can then check if a peer already replied before responding.
|
||||
*/
|
||||
channelDelayMs?: number;
|
||||
/** Per-channel delay overrides (ms). */
|
||||
channelDelayMsByChannel?: InboundDebounceByProvider;
|
||||
/**
|
||||
* Skip responding if another bot replied within this window (ms) after the triggering message.
|
||||
* Only effective when channelDelayMs is set. Default: 0 (disabled).
|
||||
*/
|
||||
skipIfPeerRepliedMs?: number;
|
||||
};
|
||||
|
||||
export type BroadcastStrategy = "parallel" | "sequential";
|
||||
|
||||
@ -336,6 +336,9 @@ export const InboundDebounceSchema = z
|
||||
.object({
|
||||
debounceMs: z.number().int().nonnegative().optional(),
|
||||
byChannel: DebounceMsBySurfaceSchema,
|
||||
channelDelayMs: z.number().int().nonnegative().optional(),
|
||||
channelDelayMsByChannel: DebounceMsBySurfaceSchema,
|
||||
skipIfPeerRepliedMs: z.number().int().nonnegative().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
@ -4,6 +4,8 @@ import { hasControlCommand } from "../../auto-reply/command-detection.js";
|
||||
import {
|
||||
createInboundDebouncer,
|
||||
resolveInboundDebounceMs,
|
||||
resolveChannelDelayMs,
|
||||
resolveSkipIfPeerRepliedMs,
|
||||
} from "../../auto-reply/inbound-debounce.js";
|
||||
import type { HistoryEntry } from "../../auto-reply/reply/history.js";
|
||||
import type { ReplyToMode } from "../../config/config.js";
|
||||
@ -41,6 +43,8 @@ export function createDiscordMessageHandler(params: {
|
||||
const groupPolicy = params.discordConfig?.groupPolicy ?? "open";
|
||||
const ackReactionScope = params.cfg.messages?.ackReactionScope ?? "group-mentions";
|
||||
const debounceMs = resolveInboundDebounceMs({ cfg: params.cfg, channel: "discord" });
|
||||
// Note: channelDelayMs is resolved per-message below to support per-channel overrides
|
||||
const skipIfPeerRepliedMs = resolveSkipIfPeerRepliedMs({ cfg: params.cfg });
|
||||
|
||||
const debouncer = createInboundDebouncer<{ data: DiscordMessageEvent; client: Client }>({
|
||||
debounceMs,
|
||||
@ -121,8 +125,61 @@ export function createDiscordMessageHandler(params: {
|
||||
},
|
||||
});
|
||||
|
||||
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
|
||||
|
||||
const checkPeerReplied = async (data: DiscordMessageEvent, client: Client): Promise<boolean> => {
|
||||
if (skipIfPeerRepliedMs <= 0) return false;
|
||||
const channelId = data.message?.channelId;
|
||||
const triggerMessageId = data.message?.id;
|
||||
const triggerTimestamp = data.message?.timestamp;
|
||||
if (!channelId || !triggerMessageId || !triggerTimestamp) return false;
|
||||
|
||||
try {
|
||||
// Fetch recent messages after the trigger
|
||||
const messages = (await client.rest.get(
|
||||
`/channels/${channelId}/messages?after=${triggerMessageId}&limit=10`,
|
||||
)) as
|
||||
| Array<{ id: string; author?: { bot?: boolean; id?: string }; timestamp: string }>
|
||||
| undefined;
|
||||
if (!Array.isArray(messages)) return false;
|
||||
|
||||
const triggerTime = new Date(triggerTimestamp).getTime();
|
||||
const windowEnd = triggerTime + skipIfPeerRepliedMs;
|
||||
|
||||
// Check if any bot (not us) replied within the window
|
||||
for (const msg of messages) {
|
||||
if (!msg.author?.bot) continue;
|
||||
// Skip our own bot's messages
|
||||
if (params.botUserId && msg.author?.id === params.botUserId) continue;
|
||||
const msgTime = new Date(msg.timestamp).getTime();
|
||||
if (msgTime <= windowEnd) {
|
||||
params.runtime.log?.(
|
||||
`Skipping response: peer bot replied within ${skipIfPeerRepliedMs}ms window`,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} catch {
|
||||
// If we can't check, proceed anyway
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
return async (data, client) => {
|
||||
try {
|
||||
// Resolve channel delay per-message to support per-channel overrides via Discord channel ID
|
||||
const discordChannelId = data.message?.channelId ?? "discord";
|
||||
const channelDelayMs = resolveChannelDelayMs({ cfg: params.cfg, channel: discordChannelId });
|
||||
|
||||
// Apply channel delay for multi-bot coordination
|
||||
if (channelDelayMs > 0) {
|
||||
await sleep(channelDelayMs);
|
||||
// Check if a peer bot already replied
|
||||
if (await checkPeerReplied(data, client)) {
|
||||
return; // Skip responding
|
||||
}
|
||||
}
|
||||
await debouncer.enqueue({ data, client });
|
||||
} catch (err) {
|
||||
params.runtime.error?.(danger(`handler failed: ${String(err)}`));
|
||||
|
||||
Loading…
Reference in New Issue
Block a user