Merge 819d61b877 into 4583f88626
This commit is contained in:
commit
ebedb4c260
@ -196,7 +196,7 @@ Each event includes:
|
||||
|
||||
```typescript
|
||||
{
|
||||
type: 'command' | 'session' | 'agent' | 'gateway',
|
||||
type: 'command' | 'session' | 'agent' | 'gateway' | 'message',
|
||||
action: string, // e.g., 'new', 'reset', 'stop'
|
||||
sessionKey: string, // Session identifier
|
||||
timestamp: Date, // When the event occurred
|
||||
@ -241,6 +241,35 @@ These hooks are not event-stream listeners; they let plugins synchronously adjus
|
||||
|
||||
- **`tool_result_persist`**: transform tool results before they are written to the session transcript. Must be synchronous; return the updated tool result payload or `undefined` to keep it as-is. See [Agent Loop](/concepts/agent-loop).
|
||||
|
||||
### Message Events
|
||||
|
||||
Triggered during the message lifecycle:
|
||||
|
||||
- **`message:received`**: When an inbound message is received (fired alongside the plugin `message_received` hook)
|
||||
- **`message:sent`**: When an outbound reply is successfully delivered (fired alongside the plugin `message_sent` hook)
|
||||
|
||||
#### `message:received` Context
|
||||
|
||||
| Field | Type | Description |
|
||||
|------------------|---------------------|------------------------------------|
|
||||
| `from` | `string` | Sender identifier |
|
||||
| `content` | `string` | Message body text |
|
||||
| `channel` | `string` | Channel / surface (lowercase) |
|
||||
| `conversationId` | `string \| undefined` | Conversation or chat identifier |
|
||||
| `timestamp` | `number` | Message timestamp (epoch ms) |
|
||||
| `messageId` | `string \| undefined` | Platform message ID |
|
||||
| `senderId` | `string \| undefined` | Sender user ID |
|
||||
| `senderName` | `string \| undefined` | Sender display name |
|
||||
|
||||
#### `message:sent` Context
|
||||
|
||||
| Field | Type | Description |
|
||||
|-----------|----------|-------------------------------------------|
|
||||
| `content` | `string` | Reply body text |
|
||||
| `channel` | `string` | Channel / surface (lowercase) |
|
||||
| `to` | `string` | Recipient identifier |
|
||||
| `kind` | `string` | Dispatch kind (`"tool"`, `"block"`, or `"final"`) |
|
||||
|
||||
### Future Events
|
||||
|
||||
Planned event types:
|
||||
@ -248,8 +277,6 @@ Planned event types:
|
||||
- **`session:start`**: When a new session begins
|
||||
- **`session:end`**: When a session ends
|
||||
- **`agent:error`**: When an agent encounters an error
|
||||
- **`message:sent`**: When a message is sent
|
||||
- **`message:received`**: When a message is received
|
||||
|
||||
## Creating Custom Hooks
|
||||
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import type { MoltbotConfig } from "../config/config.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js";
|
||||
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||
import type { FinalizedMsgContext, MsgContext } from "./templating.js";
|
||||
import type { GetReplyOptions } from "./types.js";
|
||||
import type { GetReplyOptions, ReplyPayload } from "./types.js";
|
||||
import { finalizeInboundContext } from "./reply/inbound-context.js";
|
||||
import type { DispatchFromConfigResult } from "./reply/dispatch-from-config.js";
|
||||
import { dispatchReplyFromConfig } from "./reply/dispatch-from-config.js";
|
||||
@ -10,10 +12,51 @@ import {
|
||||
type ReplyDispatcher,
|
||||
type ReplyDispatcherOptions,
|
||||
type ReplyDispatcherWithTypingOptions,
|
||||
type ReplyDispatchKind,
|
||||
} from "./reply/reply-dispatcher.js";
|
||||
|
||||
export type DispatchInboundResult = DispatchFromConfigResult;
|
||||
|
||||
/**
|
||||
* Create an `onDelivered` callback that fires message:sent hooks
|
||||
* (both internal and plugin) after each reply is successfully delivered.
|
||||
*/
|
||||
function createMessageSentHook(
|
||||
ctx: FinalizedMsgContext,
|
||||
origOnDelivered?: (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void,
|
||||
): (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void {
|
||||
const sessionKey = ctx.SessionKey ?? "";
|
||||
const channel = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
|
||||
const to = ctx.To ?? ctx.From ?? "";
|
||||
const accountId = ctx.AccountId;
|
||||
const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From;
|
||||
|
||||
return (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => {
|
||||
origOnDelivered?.(payload, info);
|
||||
|
||||
// Internal hook: message:sent
|
||||
void triggerInternalHook(
|
||||
createInternalHookEvent("message", "sent", sessionKey, {
|
||||
content: payload.text ?? "",
|
||||
channel,
|
||||
to,
|
||||
kind: info.kind,
|
||||
}),
|
||||
);
|
||||
|
||||
// Plugin hook: message_sent
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
if (hookRunner?.hasHooks("message_sent")) {
|
||||
void hookRunner
|
||||
.runMessageSent(
|
||||
{ to, content: payload.text ?? "", success: true },
|
||||
{ channelId: channel, accountId, conversationId },
|
||||
)
|
||||
.catch(() => {});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export async function dispatchInboundMessage(params: {
|
||||
ctx: MsgContext | FinalizedMsgContext;
|
||||
cfg: MoltbotConfig;
|
||||
@ -38,12 +81,16 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
|
||||
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
||||
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
|
||||
}): Promise<DispatchInboundResult> {
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping(
|
||||
params.dispatcherOptions,
|
||||
);
|
||||
const finalized = finalizeInboundContext(params.ctx);
|
||||
const onDelivered = createMessageSentHook(finalized, params.dispatcherOptions.onDelivered);
|
||||
|
||||
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
|
||||
...params.dispatcherOptions,
|
||||
onDelivered,
|
||||
});
|
||||
|
||||
const result = await dispatchInboundMessage({
|
||||
ctx: params.ctx,
|
||||
ctx: finalized,
|
||||
cfg: params.cfg,
|
||||
dispatcher,
|
||||
replyResolver: params.replyResolver,
|
||||
@ -64,9 +111,15 @@ export async function dispatchInboundMessageWithDispatcher(params: {
|
||||
replyOptions?: Omit<GetReplyOptions, "onToolResult" | "onBlockReply">;
|
||||
replyResolver?: typeof import("./reply.js").getReplyFromConfig;
|
||||
}): Promise<DispatchInboundResult> {
|
||||
const dispatcher = createReplyDispatcher(params.dispatcherOptions);
|
||||
const finalized = finalizeInboundContext(params.ctx);
|
||||
const onDelivered = createMessageSentHook(finalized, params.dispatcherOptions.onDelivered);
|
||||
|
||||
const dispatcher = createReplyDispatcher({
|
||||
...params.dispatcherOptions,
|
||||
onDelivered,
|
||||
});
|
||||
const result = await dispatchInboundMessage({
|
||||
ctx: params.ctx,
|
||||
ctx: finalized,
|
||||
cfg: params.cfg,
|
||||
dispatcher,
|
||||
replyResolver: params.replyResolver,
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
logMessageQueued,
|
||||
logSessionStateChange,
|
||||
} from "../../logging/diagnostic.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
import { getReplyFromConfig } from "../reply.js";
|
||||
import type { FinalizedMsgContext } from "../templating.js";
|
||||
@ -133,31 +134,31 @@ export async function dispatchReplyFromConfig(params: {
|
||||
|
||||
const inboundAudio = isInboundAudioContext(ctx);
|
||||
const sessionTtsAuto = resolveSessionTtsAuto(ctx, cfg);
|
||||
// Extract message context for hooks (shared between plugin and internal hooks)
|
||||
const hookTimestamp =
|
||||
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp) ? ctx.Timestamp : undefined;
|
||||
const hookMessageId =
|
||||
ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
||||
const hookContent =
|
||||
typeof ctx.BodyForCommands === "string"
|
||||
? ctx.BodyForCommands
|
||||
: typeof ctx.RawBody === "string"
|
||||
? ctx.RawBody
|
||||
: typeof ctx.Body === "string"
|
||||
? ctx.Body
|
||||
: "";
|
||||
const hookChannelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
|
||||
const hookConversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined;
|
||||
|
||||
// Plugin hook: message_received
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
if (hookRunner?.hasHooks("message_received")) {
|
||||
const timestamp =
|
||||
typeof ctx.Timestamp === "number" && Number.isFinite(ctx.Timestamp)
|
||||
? ctx.Timestamp
|
||||
: undefined;
|
||||
const messageIdForHook =
|
||||
ctx.MessageSidFull ?? ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
||||
const content =
|
||||
typeof ctx.BodyForCommands === "string"
|
||||
? ctx.BodyForCommands
|
||||
: typeof ctx.RawBody === "string"
|
||||
? ctx.RawBody
|
||||
: typeof ctx.Body === "string"
|
||||
? ctx.Body
|
||||
: "";
|
||||
const channelId = (ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider ?? "").toLowerCase();
|
||||
const conversationId = ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? undefined;
|
||||
|
||||
void hookRunner
|
||||
.runMessageReceived(
|
||||
{
|
||||
from: ctx.From ?? "",
|
||||
content,
|
||||
timestamp,
|
||||
content: hookContent,
|
||||
timestamp: hookTimestamp,
|
||||
metadata: {
|
||||
to: ctx.To,
|
||||
provider: ctx.Provider,
|
||||
@ -165,7 +166,7 @@ export async function dispatchReplyFromConfig(params: {
|
||||
threadId: ctx.MessageThreadId,
|
||||
originatingChannel: ctx.OriginatingChannel,
|
||||
originatingTo: ctx.OriginatingTo,
|
||||
messageId: messageIdForHook,
|
||||
messageId: hookMessageId,
|
||||
senderId: ctx.SenderId,
|
||||
senderName: ctx.SenderName,
|
||||
senderUsername: ctx.SenderUsername,
|
||||
@ -173,9 +174,9 @@ export async function dispatchReplyFromConfig(params: {
|
||||
},
|
||||
},
|
||||
{
|
||||
channelId,
|
||||
channelId: hookChannelId,
|
||||
accountId: ctx.AccountId,
|
||||
conversationId,
|
||||
conversationId: hookConversationId,
|
||||
},
|
||||
)
|
||||
.catch((err) => {
|
||||
@ -183,6 +184,20 @@ export async function dispatchReplyFromConfig(params: {
|
||||
});
|
||||
}
|
||||
|
||||
// Internal hook: message:received
|
||||
void triggerInternalHook(
|
||||
createInternalHookEvent("message", "received", ctx.SessionKey ?? "", {
|
||||
from: ctx.From ?? "",
|
||||
content: hookContent,
|
||||
channel: hookChannelId,
|
||||
conversationId: hookConversationId,
|
||||
timestamp: hookTimestamp ?? Date.now(),
|
||||
messageId: hookMessageId,
|
||||
senderId: ctx.SenderId,
|
||||
senderName: ctx.SenderName,
|
||||
}),
|
||||
);
|
||||
|
||||
// Check if we should route replies to originating channel instead of dispatcher.
|
||||
// Only route when the originating channel is DIFFERENT from the current surface.
|
||||
// This handles cross-provider routing (e.g., message from Telegram being processed
|
||||
|
||||
@ -49,6 +49,8 @@ export type ReplyDispatcherOptions = {
|
||||
onError?: ReplyDispatchErrorHandler;
|
||||
// AIDEV-NOTE: onSkip lets channels detect silent/empty drops (e.g. Telegram empty-response fallback).
|
||||
onSkip?: ReplyDispatchSkipHandler;
|
||||
/** Called after a reply is successfully delivered. */
|
||||
onDelivered?: (payload: ReplyPayload, info: { kind: ReplyDispatchKind }) => void;
|
||||
/** Human-like delay between block replies for natural rhythm. */
|
||||
humanDelay?: HumanDelayConfig;
|
||||
};
|
||||
@ -131,6 +133,7 @@ export function createReplyDispatcher(options: ReplyDispatcherOptions): ReplyDis
|
||||
if (delayMs > 0) await sleep(delayMs);
|
||||
}
|
||||
await options.deliver(normalized, { kind });
|
||||
options.onDelivered?.(normalized, { kind });
|
||||
})
|
||||
.catch((err) => {
|
||||
options.onError?.(err, { kind });
|
||||
|
||||
@ -212,6 +212,82 @@ describe("hooks", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("message events", () => {
|
||||
it("should trigger message:received handlers", async () => {
|
||||
const handler = vi.fn();
|
||||
registerInternalHook("message:received", handler);
|
||||
|
||||
const event = createInternalHookEvent("message", "received", "test-session", {
|
||||
from: "+1234567890",
|
||||
content: "Hello world",
|
||||
channel: "telegram",
|
||||
conversationId: "chat-123",
|
||||
timestamp: Date.now(),
|
||||
messageId: "msg-1",
|
||||
senderId: "user-1",
|
||||
senderName: "Alice",
|
||||
});
|
||||
await triggerInternalHook(event);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(event);
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should trigger message:sent handlers", async () => {
|
||||
const handler = vi.fn();
|
||||
registerInternalHook("message:sent", handler);
|
||||
|
||||
const event = createInternalHookEvent("message", "sent", "test-session", {
|
||||
content: "Reply text",
|
||||
channel: "telegram",
|
||||
to: "chat-123",
|
||||
kind: "final",
|
||||
});
|
||||
await triggerInternalHook(event);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(event);
|
||||
expect(handler).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should trigger general message handler for both received and sent", async () => {
|
||||
const handler = vi.fn();
|
||||
registerInternalHook("message", handler);
|
||||
|
||||
const receivedEvent = createInternalHookEvent("message", "received", "test-session", {
|
||||
from: "user-1",
|
||||
content: "Hello",
|
||||
channel: "telegram",
|
||||
});
|
||||
const sentEvent = createInternalHookEvent("message", "sent", "test-session", {
|
||||
content: "Hi there",
|
||||
channel: "telegram",
|
||||
to: "user-1",
|
||||
kind: "final",
|
||||
});
|
||||
|
||||
await triggerInternalHook(receivedEvent);
|
||||
await triggerInternalHook(sentEvent);
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
expect(handler).toHaveBeenCalledWith(receivedEvent);
|
||||
expect(handler).toHaveBeenCalledWith(sentEvent);
|
||||
});
|
||||
|
||||
it("should not trigger message:sent handler for message:received events", async () => {
|
||||
const sentHandler = vi.fn();
|
||||
registerInternalHook("message:sent", sentHandler);
|
||||
|
||||
const event = createInternalHookEvent("message", "received", "test-session", {
|
||||
from: "user-1",
|
||||
content: "Hello",
|
||||
channel: "telegram",
|
||||
});
|
||||
await triggerInternalHook(event);
|
||||
|
||||
expect(sentHandler).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("integration", () => {
|
||||
it("should handle a complete hook lifecycle", async () => {
|
||||
const results: InternalHookEvent[] = [];
|
||||
|
||||
@ -8,7 +8,7 @@
|
||||
import type { WorkspaceBootstrapFile } from "../agents/workspace.js";
|
||||
import type { MoltbotConfig } from "../config/config.js";
|
||||
|
||||
export type InternalHookEventType = "command" | "session" | "agent" | "gateway";
|
||||
export type InternalHookEventType = "command" | "session" | "agent" | "gateway" | "message";
|
||||
|
||||
export type AgentBootstrapHookContext = {
|
||||
workspaceDir: string;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user