307 lines
9.6 KiB
TypeScript
307 lines
9.6 KiB
TypeScript
import { resolveTextChunkLimit } from "../../auto-reply/chunk.js";
|
|
import type { ReplyPayload } from "../../auto-reply/types.js";
|
|
import { resolveChannelMediaMaxBytes } from "../../channels/plugins/media-limits.js";
|
|
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
|
|
import type { ChannelOutboundAdapter } from "../../channels/plugins/types.js";
|
|
import type { ClawdbotConfig } from "../../config/config.js";
|
|
import type { sendMessageDiscord } from "../../discord/send.js";
|
|
import type { sendMessageIMessage } from "../../imessage/send.js";
|
|
import { markdownToSignalTextChunks, type SignalTextStyleRange } from "../../signal/format.js";
|
|
import { sendMessageSignal } from "../../signal/send.js";
|
|
import type { sendMessageSlack } from "../../slack/send.js";
|
|
import type { sendMessageTelegram } from "../../telegram/send.js";
|
|
import type { sendMessageWhatsApp } from "../../web/outbound.js";
|
|
import {
|
|
appendAssistantMessageToSessionTranscript,
|
|
resolveMirroredTranscriptText,
|
|
} from "../../config/sessions.js";
|
|
import type { NormalizedOutboundPayload } from "./payloads.js";
|
|
import { normalizeOutboundPayloads } from "./payloads.js";
|
|
import type { OutboundChannel } from "./targets.js";
|
|
|
|
export type { NormalizedOutboundPayload } from "./payloads.js";
|
|
export { normalizeOutboundPayloads } from "./payloads.js";
|
|
|
|
type SendMatrixMessage = (
|
|
to: string,
|
|
text: string,
|
|
opts?: { mediaUrl?: string; replyToId?: string; threadId?: string; timeoutMs?: number },
|
|
) => Promise<{ messageId: string; roomId: string }>;
|
|
|
|
export type OutboundSendDeps = {
|
|
sendWhatsApp?: typeof sendMessageWhatsApp;
|
|
sendTelegram?: typeof sendMessageTelegram;
|
|
sendDiscord?: typeof sendMessageDiscord;
|
|
sendSlack?: typeof sendMessageSlack;
|
|
sendSignal?: typeof sendMessageSignal;
|
|
sendIMessage?: typeof sendMessageIMessage;
|
|
sendMatrix?: SendMatrixMessage;
|
|
sendMSTeams?: (
|
|
to: string,
|
|
text: string,
|
|
opts?: { mediaUrl?: string },
|
|
) => Promise<{ messageId: string; conversationId: string }>;
|
|
};
|
|
|
|
export type OutboundDeliveryResult = {
|
|
channel: Exclude<OutboundChannel, "none">;
|
|
messageId: string;
|
|
chatId?: string;
|
|
channelId?: string;
|
|
roomId?: string;
|
|
conversationId?: string;
|
|
timestamp?: number;
|
|
toJid?: string;
|
|
pollId?: string;
|
|
// Channel docking: stash channel-specific fields here to avoid core type churn.
|
|
meta?: Record<string, unknown>;
|
|
};
|
|
|
|
type Chunker = (text: string, limit: number) => string[];
|
|
|
|
type ChannelHandler = {
|
|
chunker: Chunker | null;
|
|
textChunkLimit?: number;
|
|
sendText: (text: string) => Promise<OutboundDeliveryResult>;
|
|
sendMedia: (caption: string, mediaUrl: string) => Promise<OutboundDeliveryResult>;
|
|
};
|
|
|
|
function throwIfAborted(abortSignal?: AbortSignal): void {
|
|
if (abortSignal?.aborted) {
|
|
throw new Error("Outbound delivery aborted");
|
|
}
|
|
}
|
|
|
|
// Channel docking: outbound delivery delegates to plugin.outbound adapters.
|
|
async function createChannelHandler(params: {
|
|
cfg: ClawdbotConfig;
|
|
channel: Exclude<OutboundChannel, "none">;
|
|
to: string;
|
|
accountId?: string;
|
|
replyToId?: string | null;
|
|
threadId?: string | number | null;
|
|
deps?: OutboundSendDeps;
|
|
gifPlayback?: boolean;
|
|
}): Promise<ChannelHandler> {
|
|
const outbound = await loadChannelOutboundAdapter(params.channel);
|
|
if (!outbound?.sendText || !outbound?.sendMedia) {
|
|
throw new Error(`Outbound not configured for channel: ${params.channel}`);
|
|
}
|
|
const handler = createPluginHandler({
|
|
outbound,
|
|
cfg: params.cfg,
|
|
channel: params.channel,
|
|
to: params.to,
|
|
accountId: params.accountId,
|
|
replyToId: params.replyToId,
|
|
threadId: params.threadId,
|
|
deps: params.deps,
|
|
gifPlayback: params.gifPlayback,
|
|
});
|
|
if (!handler) {
|
|
throw new Error(`Outbound not configured for channel: ${params.channel}`);
|
|
}
|
|
return handler;
|
|
}
|
|
|
|
function createPluginHandler(params: {
|
|
outbound?: ChannelOutboundAdapter;
|
|
cfg: ClawdbotConfig;
|
|
channel: Exclude<OutboundChannel, "none">;
|
|
to: string;
|
|
accountId?: string;
|
|
replyToId?: string | null;
|
|
threadId?: string | number | null;
|
|
deps?: OutboundSendDeps;
|
|
gifPlayback?: boolean;
|
|
}): ChannelHandler | null {
|
|
const outbound = params.outbound;
|
|
if (!outbound?.sendText || !outbound?.sendMedia) return null;
|
|
const sendText = outbound.sendText;
|
|
const sendMedia = outbound.sendMedia;
|
|
const chunker = outbound.chunker ?? null;
|
|
return {
|
|
chunker,
|
|
textChunkLimit: outbound.textChunkLimit,
|
|
sendText: async (text) =>
|
|
sendText({
|
|
cfg: params.cfg,
|
|
to: params.to,
|
|
text,
|
|
accountId: params.accountId,
|
|
replyToId: params.replyToId,
|
|
threadId: params.threadId,
|
|
gifPlayback: params.gifPlayback,
|
|
deps: params.deps,
|
|
}),
|
|
sendMedia: async (caption, mediaUrl) =>
|
|
sendMedia({
|
|
cfg: params.cfg,
|
|
to: params.to,
|
|
text: caption,
|
|
mediaUrl,
|
|
accountId: params.accountId,
|
|
replyToId: params.replyToId,
|
|
threadId: params.threadId,
|
|
gifPlayback: params.gifPlayback,
|
|
deps: params.deps,
|
|
}),
|
|
};
|
|
}
|
|
|
|
export async function deliverOutboundPayloads(params: {
|
|
cfg: ClawdbotConfig;
|
|
channel: Exclude<OutboundChannel, "none">;
|
|
to: string;
|
|
accountId?: string;
|
|
payloads: ReplyPayload[];
|
|
replyToId?: string | null;
|
|
threadId?: string | number | null;
|
|
deps?: OutboundSendDeps;
|
|
gifPlayback?: boolean;
|
|
abortSignal?: AbortSignal;
|
|
bestEffort?: boolean;
|
|
onError?: (err: unknown, payload: NormalizedOutboundPayload) => void;
|
|
onPayload?: (payload: NormalizedOutboundPayload) => void;
|
|
mirror?: {
|
|
sessionKey: string;
|
|
agentId?: string;
|
|
text?: string;
|
|
mediaUrls?: string[];
|
|
};
|
|
}): Promise<OutboundDeliveryResult[]> {
|
|
const { cfg, channel, to, payloads } = params;
|
|
const accountId = params.accountId;
|
|
const deps = params.deps;
|
|
const abortSignal = params.abortSignal;
|
|
const sendSignal = params.deps?.sendSignal ?? sendMessageSignal;
|
|
const results: OutboundDeliveryResult[] = [];
|
|
const handler = await createChannelHandler({
|
|
cfg,
|
|
channel,
|
|
to,
|
|
deps,
|
|
accountId,
|
|
replyToId: params.replyToId,
|
|
threadId: params.threadId,
|
|
gifPlayback: params.gifPlayback,
|
|
});
|
|
const textLimit = handler.chunker
|
|
? resolveTextChunkLimit(cfg, channel, accountId, {
|
|
fallbackLimit: handler.textChunkLimit,
|
|
})
|
|
: undefined;
|
|
const isSignalChannel = channel === "signal";
|
|
const signalMaxBytes = isSignalChannel
|
|
? resolveChannelMediaMaxBytes({
|
|
cfg,
|
|
resolveChannelLimitMb: ({ cfg, accountId }) =>
|
|
cfg.channels?.signal?.accounts?.[accountId]?.mediaMaxMb ??
|
|
cfg.channels?.signal?.mediaMaxMb,
|
|
accountId,
|
|
})
|
|
: undefined;
|
|
|
|
const sendTextChunks = async (text: string) => {
|
|
throwIfAborted(abortSignal);
|
|
if (!handler.chunker || textLimit === undefined) {
|
|
results.push(await handler.sendText(text));
|
|
return;
|
|
}
|
|
for (const chunk of handler.chunker(text, textLimit)) {
|
|
throwIfAborted(abortSignal);
|
|
results.push(await handler.sendText(chunk));
|
|
}
|
|
};
|
|
|
|
const sendSignalText = async (text: string, styles: SignalTextStyleRange[]) => {
|
|
throwIfAborted(abortSignal);
|
|
return {
|
|
channel: "signal" as const,
|
|
...(await sendSignal(to, text, {
|
|
maxBytes: signalMaxBytes,
|
|
accountId: accountId ?? undefined,
|
|
textMode: "plain",
|
|
textStyles: styles,
|
|
})),
|
|
};
|
|
};
|
|
|
|
const sendSignalTextChunks = async (text: string) => {
|
|
throwIfAborted(abortSignal);
|
|
let signalChunks =
|
|
textLimit === undefined
|
|
? markdownToSignalTextChunks(text, Number.POSITIVE_INFINITY)
|
|
: markdownToSignalTextChunks(text, textLimit);
|
|
if (signalChunks.length === 0 && text) {
|
|
signalChunks = [{ text, styles: [] }];
|
|
}
|
|
for (const chunk of signalChunks) {
|
|
throwIfAborted(abortSignal);
|
|
results.push(await sendSignalText(chunk.text, chunk.styles));
|
|
}
|
|
};
|
|
|
|
const sendSignalMedia = async (caption: string, mediaUrl: string) => {
|
|
throwIfAborted(abortSignal);
|
|
const formatted = markdownToSignalTextChunks(caption, Number.POSITIVE_INFINITY)[0] ?? {
|
|
text: caption,
|
|
styles: [],
|
|
};
|
|
return {
|
|
channel: "signal" as const,
|
|
...(await sendSignal(to, formatted.text, {
|
|
mediaUrl,
|
|
maxBytes: signalMaxBytes,
|
|
accountId: accountId ?? undefined,
|
|
textMode: "plain",
|
|
textStyles: formatted.styles,
|
|
})),
|
|
};
|
|
};
|
|
const normalizedPayloads = normalizeOutboundPayloads(payloads);
|
|
for (const payload of normalizedPayloads) {
|
|
try {
|
|
throwIfAborted(abortSignal);
|
|
params.onPayload?.(payload);
|
|
if (payload.mediaUrls.length === 0) {
|
|
if (isSignalChannel) {
|
|
await sendSignalTextChunks(payload.text);
|
|
} else {
|
|
await sendTextChunks(payload.text);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
let first = true;
|
|
for (const url of payload.mediaUrls) {
|
|
throwIfAborted(abortSignal);
|
|
const caption = first ? payload.text : "";
|
|
first = false;
|
|
if (isSignalChannel) {
|
|
results.push(await sendSignalMedia(caption, url));
|
|
} else {
|
|
results.push(await handler.sendMedia(caption, url));
|
|
}
|
|
}
|
|
} catch (err) {
|
|
if (!params.bestEffort) throw err;
|
|
params.onError?.(err, payload);
|
|
}
|
|
}
|
|
if (params.mirror && results.length > 0) {
|
|
const mirrorText = resolveMirroredTranscriptText({
|
|
text: params.mirror.text,
|
|
mediaUrls: params.mirror.mediaUrls,
|
|
});
|
|
if (mirrorText) {
|
|
await appendAssistantMessageToSessionTranscript({
|
|
agentId: params.mirror.agentId,
|
|
sessionKey: params.mirror.sessionKey,
|
|
text: mirrorText,
|
|
});
|
|
}
|
|
}
|
|
return results;
|
|
}
|