diff --git a/docs/hooks.md b/docs/hooks.md index 8576146ba..532607ce3 100644 --- a/docs/hooks.md +++ b/docs/hooks.md @@ -196,7 +196,7 @@ Each event includes: ```typescript { - type: 'command' | 'session' | 'agent' | 'gateway', + type: 'command' | 'session' | 'agent' | 'gateway' | 'message', action: string, // e.g., 'new', 'reset', 'stop' sessionKey: string, // Session identifier timestamp: Date, // When the event occurred @@ -241,6 +241,35 @@ These hooks are not event-stream listeners; they let plugins synchronously adjus - **`tool_result_persist`**: transform tool results before they are written to the session transcript. Must be synchronous; return the updated tool result payload or `undefined` to keep it as-is. See [Agent Loop](/concepts/agent-loop). +### Message Events + +Triggered during the message lifecycle: + +- **`message:received`**: When an inbound message is received (fired alongside the plugin `message_received` hook) +- **`message:sent`**: When an outbound reply is successfully delivered (fired alongside the plugin `message_sent` hook) + +#### `message:received` Context + +| Field | Type | Description | +|------------------|---------------------|------------------------------------| +| `from` | `string` | Sender identifier | +| `content` | `string` | Message body text | +| `channel` | `string` | Channel / surface (lowercase) | +| `conversationId` | `string \| undefined` | Conversation or chat identifier | +| `timestamp` | `number` | Message timestamp (epoch ms) | +| `messageId` | `string \| undefined` | Platform message ID | +| `senderId` | `string \| undefined` | Sender user ID | +| `senderName` | `string \| undefined` | Sender display name | + +#### `message:sent` Context + +| Field | Type | Description | +|-----------|----------|-------------------------------------------| +| `content` | `string` | Reply body text | +| `channel` | `string` | Channel / surface (lowercase) | +| `to` | `string` | Recipient identifier | +| `kind` | `string` | Dispatch kind (`"tool"`, `"block"`, or `"final"`) | + ### Future Events Planned event types: @@ -248,8 +277,6 @@ Planned event types: - **`session:start`**: When a new session begins - **`session:end`**: When a session ends - **`agent:error`**: When an agent encounters an error -- **`message:sent`**: When a message is sent -- **`message:received`**: When a message is received ## Creating Custom Hooks diff --git a/src/auto-reply/dispatch.ts b/src/auto-reply/dispatch.ts index 13fc37a1c..53f4608ce 100644 --- a/src/auto-reply/dispatch.ts +++ b/src/auto-reply/dispatch.ts @@ -1,6 +1,8 @@ import type { MoltbotConfig } from "../config/config.js"; +import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js"; +import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import type { FinalizedMsgContext, MsgContext } from "./templating.js"; -import type { GetReplyOptions } from "./types.js"; +import type { GetReplyOptions, ReplyPayload } from "./types.js"; import { finalizeInboundContext } from "./reply/inbound-context.js"; import type { DispatchFromConfigResult } from "./reply/dispatch-from-config.js"; import { dispatchReplyFromConfig } from "./reply/dispatch-from-config.js"; @@ -10,10 +12,51 @@ import { type ReplyDispatcher, type ReplyDispatcherOptions, type ReplyDispatcherWithTypingOptions, + type ReplyDispatchKind, } from "./reply/reply-dispatcher.js"; export type DispatchInboundResult = DispatchFromConfigResult; +/** + * Create an `onDelivered` callback that fires message:sent hooks + * (both internal and plugin) after each reply is successfully delivered. + */ +function createMessageSentHook( + ctx: FinalizedMsgContext, + origOnDelivered?: (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void, +): (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void { + const sessionKey = ctx.SessionKey ?? ""; + const channel = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); + const to = ctx.To ?? ctx.From ?? ""; + const accountId = ctx.AccountId; + const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From; + + return (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => { + origOnDelivered?.(payload, info); + + // Internal hook: message:sent + void triggerInternalHook( + createInternalHookEvent("message", "sent", sessionKey, { + content: payload.text ?? "", + channel, + to, + kind: info.kind, + }), + ); + + // Plugin hook: message_sent + const hookRunner = getGlobalHookRunner(); + if (hookRunner?.hasHooks("message_sent")) { + void hookRunner + .runMessageSent( + { to, content: payload.text ?? "", success: true }, + { channelId: channel, accountId, conversationId }, + ) + .catch(() => {}); + } + }; +} + export async function dispatchInboundMessage(params: { ctx: MsgContext | FinalizedMsgContext; cfg: MoltbotConfig; @@ -38,12 +81,16 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: { replyOptions?: Omit; replyResolver?: typeof import("./reply.js").getReplyFromConfig; }): Promise { - const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping( - params.dispatcherOptions, - ); + const finalized = finalizeInboundContext(params.ctx); + const onDelivered = createMessageSentHook(finalized, params.dispatcherOptions.onDelivered); + + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ + ...params.dispatcherOptions, + onDelivered, + }); const result = await dispatchInboundMessage({ - ctx: params.ctx, + ctx: finalized, cfg: params.cfg, dispatcher, replyResolver: params.replyResolver, @@ -64,9 +111,15 @@ export async function dispatchInboundMessageWithDispatcher(params: { replyOptions?: Omit; replyResolver?: typeof import("./reply.js").getReplyFromConfig; }): Promise { - const dispatcher = createReplyDispatcher(params.dispatcherOptions); + const finalized = finalizeInboundContext(params.ctx); + const onDelivered = createMessageSentHook(finalized, params.dispatcherOptions.onDelivered); + + const dispatcher = createReplyDispatcher({ + ...params.dispatcherOptions, + onDelivered, + }); const result = await dispatchInboundMessage({ - ctx: params.ctx, + ctx: finalized, cfg: params.cfg, dispatcher, replyResolver: params.replyResolver, diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index c85e654de..7bc3ec58c 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -8,6 +8,7 @@ import { logMessageQueued, logSessionStateChange, } from "../../logging/diagnostic.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import { getReplyFromConfig } from "../reply.js"; import type { FinalizedMsgContext } from "../templating.js"; @@ -133,31 +134,31 @@ export async function dispatchReplyFromConfig(params: { const inboundAudio = isInboundAudioContext(ctx); const sessionTtsAuto = resolveSessionTtsAuto(ctx, cfg); + // Extract message context for hooks (shared between plugin and internal hooks) + const hookTimestamp = + typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined; + const hookMessageId = + ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; + const hookContent = + typeof ctx.BodyForCommands === "string" + ? ctx.BodyForCommands + : typeof ctx.RawBody === "string" + ? ctx.RawBody + : typeof ctx.Body === "string" + ? ctx.Body + : ""; + const hookChannelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); + const hookConversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; + + // Plugin hook: message_received const hookRunner = getGlobalHookRunner(); if (hookRunner?.hasHooks("message_received")) { - const timestamp = - typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) - ? ctx.Timestamp - : undefined; - const messageIdForHook = - ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast; - const content = - typeof ctx.BodyForCommands === "string" - ? ctx.BodyForCommands - : typeof ctx.RawBody === "string" - ? ctx.RawBody - : typeof ctx.Body === "string" - ? ctx.Body - : ""; - const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase(); - const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined; - void hookRunner .runMessageReceived( { from: ctx.From ?? "", - content, - timestamp, + content: hookContent, + timestamp: hookTimestamp, metadata: { to: ctx.To, provider: ctx.Provider, @@ -165,7 +166,7 @@ export async function dispatchReplyFromConfig(params: { threadId: ctx.MessageThreadId, originatingChannel: ctx.OriginatingChannel, originatingTo: ctx.OriginatingTo, - messageId: messageIdForHook, + messageId: hookMessageId, senderId: ctx.SenderId, senderName: ctx.SenderName, senderUsername: ctx.SenderUsername, @@ -173,9 +174,9 @@ export async function dispatchReplyFromConfig(params: { }, }, { - channelId, + channelId: hookChannelId, accountId: ctx.AccountId, - conversationId, + conversationId: hookConversationId, }, ) .catch((err) => { @@ -183,6 +184,20 @@ export async function dispatchReplyFromConfig(params: { }); } + // Internal hook: message:received + void triggerInternalHook( + createInternalHookEvent("message", "received", ctx.SessionKey ?? "", { + from: ctx.From ?? "", + content: hookContent, + channel: hookChannelId, + conversationId: hookConversationId, + timestamp: hookTimestamp ?? Date.now(), + messageId: hookMessageId, + senderId: ctx.SenderId, + senderName: ctx.SenderName, + }), + ); + // Check if we should route replies to originating channel instead of dispatcher. // Only route when the originating channel is DIFFERENT from the current surface. // This handles cross-provider routing (e.g., message from Telegram being processed diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index fd7fb5493..2aac9851a 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -49,6 +49,8 @@ export type ReplyDispatcherOptions = { onError?: ReplyDispatchErrorHandler; // AIDEV-NOTE: onSkip lets channels detect silent/empty drops (e.g. Telegram empty-response fallback). onSkip?: ReplyDispatchSkipHandler; + /** Called after a reply is successfully delivered. */ + onDelivered?: (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void; /** Human-like delay between block replies for natural rhythm. */ humanDelay?: HumanDelayConfig; }; @@ -131,6 +133,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis if (delayMs > 0) await sleep(delayMs); } await options.deliver(normalized, { kind }); + options.onDelivered?.(normalized, { kind }); }) .catch((err) => { options.onError?.(err, { kind }); diff --git a/src/hooks/internal-hooks.test.ts b/src/hooks/internal-hooks.test.ts index e01e5bc3c..aa7b00ae9 100644 --- a/src/hooks/internal-hooks.test.ts +++ b/src/hooks/internal-hooks.test.ts @@ -212,6 +212,82 @@ describe("hooks", () => { }); }); + describe("message events", () => { + it("should trigger message:received handlers", async () => { + const handler = vi.fn(); + registerInternalHook("message:received", handler); + + const event = createInternalHookEvent("message", "received", "test-session", { + from: "+1234567890", + content: "Hello world", + channel: "telegram", + conversationId: "chat-123", + timestamp: Date.now(), + messageId: "msg-1", + senderId: "user-1", + senderName: "Alice", + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledWith(event); + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("should trigger message:sent handlers", async () => { + const handler = vi.fn(); + registerInternalHook("message:sent", handler); + + const event = createInternalHookEvent("message", "sent", "test-session", { + content: "Reply text", + channel: "telegram", + to: "chat-123", + kind: "final", + }); + await triggerInternalHook(event); + + expect(handler).toHaveBeenCalledWith(event); + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("should trigger general message handler for both received and sent", async () => { + const handler = vi.fn(); + registerInternalHook("message", handler); + + const receivedEvent = createInternalHookEvent("message", "received", "test-session", { + from: "user-1", + content: "Hello", + channel: "telegram", + }); + const sentEvent = createInternalHookEvent("message", "sent", "test-session", { + content: "Hi there", + channel: "telegram", + to: "user-1", + kind: "final", + }); + + await triggerInternalHook(receivedEvent); + await triggerInternalHook(sentEvent); + + expect(handler).toHaveBeenCalledTimes(2); + expect(handler).toHaveBeenCalledWith(receivedEvent); + expect(handler).toHaveBeenCalledWith(sentEvent); + }); + + it("should not trigger message:sent handler for message:received events", async () => { + const sentHandler = vi.fn(); + registerInternalHook("message:sent", sentHandler); + + const event = createInternalHookEvent("message", "received", "test-session", { + from: "user-1", + content: "Hello", + channel: "telegram", + }); + await triggerInternalHook(event); + + expect(sentHandler).not.toHaveBeenCalled(); + }); + }); + describe("integration", () => { it("should handle a complete hook lifecycle", async () => { const results: InternalHookEvent[] = []; diff --git a/src/hooks/internal-hooks.ts b/src/hooks/internal-hooks.ts index 1b866d444..b33eb63db 100644 --- a/src/hooks/internal-hooks.ts +++ b/src/hooks/internal-hooks.ts @@ -8,7 +8,7 @@ import type { WorkspaceBootstrapFile } from "../agents/workspace.js"; import type { MoltbotConfig } from "../config/config.js"; -export type InternalHookEventType = "command" | "session" | "agent" | "gateway"; +export type InternalHookEventType = "command" | "session" | "agent" | "gateway" | "message"; export type AgentBootstrapHookContext = { workspaceDir: string;