From 90226c05e00d4b06c2c5cea26962c1634ec0a7ca Mon Sep 17 00:00:00 2001 From: ramsbaby Date: Thu, 29 Jan 2026 20:55:23 +0900 Subject: [PATCH] feat(hooks): connect message_sending hook to outbound message pipeline - Add getGlobalHookRunner import to message handlers - Call runMessageSending before onBlockReply in handleMessageEnd - Support content modification and cancellation via hook result - Fallback to original message on hook error Closes #3945 --- ...pi-embedded-subscribe.handlers.messages.ts | 77 ++++++++++++++++--- 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 1f515e113..0307a3b41 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -18,6 +18,7 @@ import { promoteThinkingTagsToBlocks, } from "./pi-embedded-utils.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; +import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; export function handleMessageStart( ctx: EmbeddedPiSubscribeContext, @@ -236,16 +237,72 @@ export function handleMessageEnd( replyToTag, replyToCurrent, } = splitResult; - // Emit if there's content OR audioAsVoice flag (to propagate the flag). - if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { - void onBlockReply({ - text: cleanedText, - mediaUrls: mediaUrls?.length ? mediaUrls : undefined, - audioAsVoice, - replyToId, - replyToTag, - replyToCurrent, - }); + + // Run message_sending hook before emitting + const hookRunner = getGlobalHookRunner(); + const sessionKey = (ctx.params as { sessionKey?: string }).sessionKey; + const targetTo = sessionKey ?? "unknown"; + + if (hookRunner?.hasHooks("message_sending") && cleanedText) { + // Async hook execution with modification/cancellation support + void hookRunner + .runMessageSending( + { + to: targetTo, + content: cleanedText, + metadata: { mediaUrls, sessionKey }, + }, + { + channelId: (ctx.params as { channel?: string }).channel ?? "unknown", + accountId: (ctx.params as { accountId?: string }).accountId, + }, + ) + .then((hookResult) => { + const finalText = hookResult?.content ?? cleanedText; + const shouldCancel = hookResult?.cancel ?? false; + + if ( + !shouldCancel && + (finalText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) + ) { + void onBlockReply({ + text: finalText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } else if (shouldCancel) { + ctx.log.debug("Message cancelled by message_sending hook"); + } + }) + .catch((err) => { + ctx.log.warn(`message_sending hook failed: ${err}`); + // Fallback: send original message on hook error + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } + }); + } else { + // No hooks registered, emit directly + if (cleanedText || (mediaUrls && mediaUrls.length > 0) || audioAsVoice) { + void onBlockReply({ + text: cleanedText, + mediaUrls: mediaUrls?.length ? mediaUrls : undefined, + audioAsVoice, + replyToId, + replyToTag, + replyToCurrent, + }); + } } } }