openclaw/src/discord/monitor/provider.ts
2026-01-16 12:10:29 +00:00

302 lines
9.8 KiB
TypeScript

import { Client } from "@buape/carbon";
import { GatewayIntents, GatewayPlugin } from "@buape/carbon/gateway";
import { Routes } from "discord-api-types/v10";
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import { listNativeCommandSpecsForConfig } from "../../auto-reply/commands-registry.js";
import { listSkillCommandsForWorkspace } from "../../auto-reply/skill-commands.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
import type { HistoryEntry } from "../../auto-reply/reply/history.js";
import {
isNativeCommandsExplicitlyDisabled,
resolveNativeCommandsEnabled,
} from "../../config/commands.js";
import type { ClawdbotConfig, ReplyToMode } from "../../config/config.js";
import { loadConfig } from "../../config/config.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { getChildLogger } from "../../logging.js";
import type { RuntimeEnv } from "../../runtime.js";
import { resolveDiscordAccount } from "../accounts.js";
import { attachDiscordGatewayLogging } from "../gateway-logging.js";
import { getDiscordGatewayEmitter, waitForDiscordGatewayStop } from "../monitor.gateway.js";
import { fetchDiscordApplicationId } from "../probe.js";
import { normalizeDiscordToken } from "../token.js";
import {
DiscordMessageListener,
DiscordReactionListener,
DiscordReactionRemoveListener,
registerDiscordListener,
} from "./listeners.js";
import { createDiscordMessageHandler } from "./message-handler.js";
import { createDiscordNativeCommand } from "./native-command.js";
export type MonitorDiscordOpts = {
token?: string;
accountId?: string;
config?: ClawdbotConfig;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
mediaMaxMb?: number;
historyLimit?: number;
replyToMode?: ReplyToMode;
};
function summarizeAllowList(list?: Array<string | number>) {
if (!list || list.length === 0) return "any";
const sample = list.slice(0, 4).map((entry) => String(entry));
const suffix = list.length > sample.length ? ` (+${list.length - sample.length})` : "";
return `${sample.join(", ")}${suffix}`;
}
function summarizeGuilds(entries?: Record<string, unknown>) {
if (!entries || Object.keys(entries).length === 0) return "any";
const keys = Object.keys(entries);
const sample = keys.slice(0, 4);
const suffix = keys.length > sample.length ? ` (+${keys.length - sample.length})` : "";
return `${sample.join(", ")}${suffix}`;
}
export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
const cfg = opts.config ?? loadConfig();
const account = resolveDiscordAccount({
cfg,
accountId: opts.accountId,
});
const token = normalizeDiscordToken(opts.token ?? undefined) ?? account.token;
if (!token) {
throw new Error(
`Discord bot token missing for account "${account.accountId}" (set discord.accounts.${account.accountId}.token or DISCORD_BOT_TOKEN for default).`,
);
}
const runtime: RuntimeEnv = opts.runtime ?? {
log: console.log,
error: console.error,
exit: (code: number): never => {
throw new Error(`exit ${code}`);
},
};
const discordCfg = account.config;
const dmConfig = discordCfg.dm;
const guildEntries = discordCfg.guilds;
const groupPolicy = discordCfg.groupPolicy ?? "open";
const allowFrom = dmConfig?.allowFrom;
const mediaMaxBytes = (opts.mediaMaxMb ?? discordCfg.mediaMaxMb ?? 8) * 1024 * 1024;
const textLimit = resolveTextChunkLimit(cfg, "discord", account.accountId, {
fallbackLimit: 2000,
});
const historyLimit = Math.max(
0,
opts.historyLimit ?? discordCfg.historyLimit ?? cfg.messages?.groupChat?.historyLimit ?? 20,
);
const replyToMode = opts.replyToMode ?? discordCfg.replyToMode ?? "off";
const dmEnabled = dmConfig?.enabled ?? true;
const dmPolicy = dmConfig?.policy ?? "pairing";
const groupDmEnabled = dmConfig?.groupEnabled ?? false;
const groupDmChannels = dmConfig?.groupChannels;
const nativeEnabled = resolveNativeCommandsEnabled({
providerId: "discord",
providerSetting: discordCfg.commands?.native,
globalSetting: cfg.commands?.native,
});
const nativeDisabledExplicit = isNativeCommandsExplicitlyDisabled({
providerSetting: discordCfg.commands?.native,
globalSetting: cfg.commands?.native,
});
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
const sessionPrefix = "discord:slash";
const ephemeralDefault = true;
if (shouldLogVerbose()) {
logVerbose(
`discord: config dm=${dmEnabled ? "on" : "off"} dmPolicy=${dmPolicy} allowFrom=${summarizeAllowList(allowFrom)} groupDm=${groupDmEnabled ? "on" : "off"} groupDmChannels=${summarizeAllowList(groupDmChannels)} groupPolicy=${groupPolicy} guilds=${summarizeGuilds(guildEntries)} historyLimit=${historyLimit} mediaMaxMb=${Math.round(mediaMaxBytes / (1024 * 1024))} native=${nativeEnabled ? "on" : "off"} accessGroups=${useAccessGroups ? "on" : "off"}`,
);
}
const applicationId = await fetchDiscordApplicationId(token, 4000);
if (!applicationId) {
throw new Error("Failed to resolve Discord application id");
}
const skillCommands = nativeEnabled
? listSkillCommandsForWorkspace({
workspaceDir: resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg)),
cfg,
})
: [];
const commandSpecs = nativeEnabled
? listNativeCommandSpecsForConfig(cfg, { skillCommands })
: [];
const commands = commandSpecs.map((spec) =>
createDiscordNativeCommand({
command: spec,
cfg,
discordConfig: discordCfg,
accountId: account.accountId,
sessionPrefix,
ephemeralDefault,
}),
);
const client = new Client(
{
baseUrl: "http://localhost",
deploySecret: "a",
clientId: applicationId,
publicKey: "a",
token,
autoDeploy: nativeEnabled,
},
{
commands,
listeners: [],
},
[
new GatewayPlugin({
reconnect: {
maxAttempts: Number.POSITIVE_INFINITY,
},
intents:
GatewayIntents.Guilds |
GatewayIntents.GuildMessages |
GatewayIntents.MessageContent |
GatewayIntents.DirectMessages |
GatewayIntents.GuildMessageReactions |
GatewayIntents.DirectMessageReactions,
autoInteractions: true,
}),
],
);
const logger = getChildLogger({ module: "discord-auto-reply" });
const guildHistories = new Map<string, HistoryEntry[]>();
let botUserId: string | undefined;
if (nativeDisabledExplicit) {
await clearDiscordNativeCommands({
client,
applicationId,
runtime,
});
}
try {
const botUser = await client.fetchUser("@me");
botUserId = botUser?.id;
} catch (err) {
runtime.error?.(danger(`discord: failed to fetch bot identity: ${String(err)}`));
}
const messageHandler = createDiscordMessageHandler({
cfg,
discordConfig: discordCfg,
accountId: account.accountId,
token,
runtime,
botUserId,
guildHistories,
historyLimit,
mediaMaxBytes,
textLimit,
replyToMode,
dmEnabled,
groupDmEnabled,
groupDmChannels,
allowFrom,
guildEntries,
});
registerDiscordListener(client.listeners, new DiscordMessageListener(messageHandler, logger));
registerDiscordListener(
client.listeners,
new DiscordReactionListener({
cfg,
accountId: account.accountId,
runtime,
botUserId,
guildEntries,
logger,
}),
);
registerDiscordListener(
client.listeners,
new DiscordReactionRemoveListener({
cfg,
accountId: account.accountId,
runtime,
botUserId,
guildEntries,
logger,
}),
);
runtime.log?.(`logged in to discord${botUserId ? ` as ${botUserId}` : ""}`);
const gateway = client.getPlugin<GatewayPlugin>("gateway");
const gatewayEmitter = getDiscordGatewayEmitter(gateway);
const stopGatewayLogging = attachDiscordGatewayLogging({
emitter: gatewayEmitter,
runtime,
});
// Timeout to detect zombie connections where HELLO is never received.
const HELLO_TIMEOUT_MS = 30000;
let helloTimeoutId: ReturnType<typeof setTimeout> | undefined;
const onGatewayDebug = (msg: unknown) => {
const message = String(msg);
if (!message.includes("WebSocket connection opened")) return;
if (helloTimeoutId) clearTimeout(helloTimeoutId);
helloTimeoutId = setTimeout(() => {
if (!gateway?.isConnected) {
runtime.log?.(
danger(
`connection stalled: no HELLO received within ${HELLO_TIMEOUT_MS}ms, forcing reconnect`,
),
);
gateway?.disconnect();
gateway?.connect(false);
}
helloTimeoutId = undefined;
}, HELLO_TIMEOUT_MS);
};
gatewayEmitter?.on("debug", onGatewayDebug);
try {
await waitForDiscordGatewayStop({
gateway: gateway
? {
emitter: gatewayEmitter,
disconnect: () => gateway.disconnect(),
}
: undefined,
abortSignal: opts.abortSignal,
onGatewayError: (err) => {
runtime.error?.(danger(`discord gateway error: ${String(err)}`));
},
shouldStopOnError: (err) => {
const message = String(err);
return (
message.includes("Max reconnect attempts") || message.includes("Fatal Gateway error")
);
},
});
} finally {
stopGatewayLogging();
if (helloTimeoutId) clearTimeout(helloTimeoutId);
gatewayEmitter?.removeListener("debug", onGatewayDebug);
}
}
async function clearDiscordNativeCommands(params: {
client: Client;
applicationId: string;
runtime: RuntimeEnv;
}) {
try {
await params.client.rest.put(Routes.applicationCommands(params.applicationId), {
body: [],
});
logVerbose("discord: cleared native commands (commands.native=false)");
} catch (err) {
params.runtime.error?.(danger(`discord: failed to clear native commands: ${String(err)}`));
}
}