diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 1f515e113..0307a3b41 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -18,6 +18,7 @@ import { promoteThinkingTagsToBlocks, } from "./pi-embedded-utils.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; +import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export function handleMessageStart( ctx: EmbeddedPiSubscribeContext, @@ -236,16 +237,72 @@ export function handleMessageEnd( replyToTag, replyToCurrent, } = splitResult; - // Emit if there's content OR audioAsVoice flag (to propagate the flag). - if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { - void onBlockReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - audioAsVoice, - replyToId, - replyToTag, - replyToCurrent, - }); + + // Run message_sending hook before emitting + const hookRunner = getGlobalHookRunner(); + const sessionKey = (ctx.params as { sessionKey?: string }).sessionKey; + const targetTo = sessionKey ?? "unknown"; + + if (hookRunner?.hasHooks("message_sending") && cleanedText) { + // Async hook execution with modification/cancellation support + void hookRunner + .runMessageSending( + { + to: targetTo, + content: cleanedText, + metadata: { mediaUrls, sessionKey }, + }, + { + channelId: (ctx.params as { channel?: string }).channel ?? "unknown", + accountId: (ctx.params as { accountId?: string }).accountId, + }, + ) + .then((hookResult) => { + const finalText = hookResult?.content ?? cleanedText; + const shouldCancel = hookResult?.cancel ?? false; + + if ( + !shouldCancel && + (finalText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) + ) { + void onBlockReply({ + text: finalText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } else if (shouldCancel) { + ctx.log.debug("Message cancelled by message_sending hook"); + } + }) + .catch((err) => { + ctx.log.warn(`message_sending hook failed: ${err}`); + // Fallback: send original message on hook error + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } + }); + } else { + // No hooks registered, emit directly + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } } } }