This commit is contained in:
Nick Moore 2026-01-30 11:55:32 +00:00 committed by GitHub
commit e6b2157587
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 2612 additions and 0 deletions

355
src/events/bus.test.ts Normal file
View File

@ -0,0 +1,355 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { createEventBus, getEventBus, setEventBus, resetEventBus } from "./bus.js";
import { topicMatches } from "./types.js";
import type { EventBus, EventEnvelope } from "./types.js";
describe("topicMatches", () => {
it("matches exact topics", () => {
expect(topicMatches("channel.message.received", "channel.message.received")).toBe(true);
expect(topicMatches("channel.message.received", "channel.message.sent")).toBe(false);
});
it("matches single segment wildcard (*)", () => {
expect(topicMatches("channel.*.received", "channel.message.received")).toBe(true);
expect(topicMatches("channel.*.received", "channel.status.received")).toBe(true);
expect(topicMatches("channel.*.received", "channel.message.sent")).toBe(false);
expect(topicMatches("*.message.received", "channel.message.received")).toBe(true);
expect(topicMatches("channel.message.*", "channel.message.received")).toBe(true);
});
it("matches multi-segment wildcard (#)", () => {
expect(topicMatches("channel.#", "channel.message.received")).toBe(true);
expect(topicMatches("channel.#", "channel.status.changed")).toBe(true);
expect(topicMatches("channel.#", "channel")).toBe(true);
expect(topicMatches("#", "anything.at.all")).toBe(true);
expect(topicMatches("agent.#", "channel.message.received")).toBe(false);
});
it("handles # in middle of pattern", () => {
expect(topicMatches("channel.#.completed", "channel.message.send.completed")).toBe(true);
expect(topicMatches("channel.#.completed", "channel.completed")).toBe(true);
expect(topicMatches("channel.#.completed", "channel.message.failed")).toBe(false);
});
it("handles empty segments correctly", () => {
expect(topicMatches("channel", "channel")).toBe(true);
expect(topicMatches("channel", "channel.message")).toBe(false);
});
});
describe("EventBus", () => {
let bus: EventBus;
beforeEach(() => {
bus = createEventBus();
});
afterEach(() => {
bus.shutdown();
});
describe("basic emit and subscribe", () => {
it("delivers events to subscribers", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", (event) => {
received.push(event);
});
bus.emit({ topic: "test.event", payload: { data: "hello" } });
expect(received).toHaveLength(1);
expect(received[0].topic).toBe("test.event");
expect(received[0].payload).toEqual({ data: "hello" });
});
it("assigns sequence numbers", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", (event) => {
received.push(event);
});
bus.emit({ topic: "test.event", payload: 1 });
bus.emit({ topic: "test.event", payload: 2 });
bus.emit({ topic: "test.event", payload: 3 });
expect(received[0].seq).toBe(1);
expect(received[1].seq).toBe(2);
expect(received[2].seq).toBe(3);
});
it("assigns timestamps", () => {
const before = Date.now();
const event = bus.emit({ topic: "test.event", payload: {} });
const after = Date.now();
expect(event.ts).toBeGreaterThanOrEqual(before);
expect(event.ts).toBeLessThanOrEqual(after);
});
it("preserves correlationId and source", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", (event) => {
received.push(event);
});
bus.emit({
topic: "test.event",
payload: {},
correlationId: "corr-123",
source: "test-source",
});
expect(received[0].correlationId).toBe("corr-123");
expect(received[0].source).toBe("test-source");
});
});
describe("pattern matching", () => {
it("delivers to wildcard subscribers", () => {
const received: EventEnvelope[] = [];
bus.subscribe("channel.*.*", (event) => {
received.push(event);
});
bus.emit({ topic: "channel.message.received", payload: {} });
bus.emit({ topic: "channel.status.changed", payload: {} });
bus.emit({ topic: "agent.run.started", payload: {} });
expect(received).toHaveLength(2);
});
it("delivers to multi-segment wildcard subscribers", () => {
const received: EventEnvelope[] = [];
bus.subscribe("channel.#", (event) => {
received.push(event);
});
bus.emit({ topic: "channel.message.received", payload: {} });
bus.emit({ topic: "channel.status.changed.error", payload: {} });
bus.emit({ topic: "agent.run.started", payload: {} });
expect(received).toHaveLength(2);
});
});
describe("subscription management", () => {
it("unsubscribes correctly", () => {
const received: EventEnvelope[] = [];
const sub = bus.subscribe("test.event", (event) => {
received.push(event);
});
bus.emit({ topic: "test.event", payload: 1 });
sub.unsubscribe();
bus.emit({ topic: "test.event", payload: 2 });
expect(received).toHaveLength(1);
});
it("once() unsubscribes after first event", () => {
const received: EventEnvelope[] = [];
bus.once("test.event", (event) => {
received.push(event);
});
bus.emit({ topic: "test.event", payload: 1 });
bus.emit({ topic: "test.event", payload: 2 });
expect(received).toHaveLength(1);
});
it("unsubscribeAll removes all handlers for pattern", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", () => received.push({} as EventEnvelope));
bus.subscribe("test.event", () => received.push({} as EventEnvelope));
bus.subscribe("other.event", () => received.push({} as EventEnvelope));
const count = bus.unsubscribeAll("test.event");
expect(count).toBe(2);
bus.emit({ topic: "test.event", payload: {} });
expect(received).toHaveLength(0);
});
it("reports subscription count", () => {
expect(bus.subscriptionCount()).toBe(0);
const sub1 = bus.subscribe("test.event", () => {});
expect(bus.subscriptionCount()).toBe(1);
const sub2 = bus.subscribe("test.event", () => {});
expect(bus.subscriptionCount()).toBe(2);
sub1.unsubscribe();
expect(bus.subscriptionCount()).toBe(1);
sub2.unsubscribe();
expect(bus.subscriptionCount()).toBe(0);
});
it("reports hasSubscribers correctly", () => {
expect(bus.hasSubscribers("test.event")).toBe(false);
const sub = bus.subscribe("test.event", () => {});
expect(bus.hasSubscribers("test.event")).toBe(true);
sub.unsubscribe();
expect(bus.hasSubscribers("test.event")).toBe(false);
});
it("hasSubscribers considers wildcard patterns", () => {
bus.subscribe("test.#", () => {});
expect(bus.hasSubscribers("test.event")).toBe(true);
expect(bus.hasSubscribers("test.event.deep")).toBe(true);
expect(bus.hasSubscribers("other.event")).toBe(false);
});
});
describe("filtering", () => {
it("filters by sessionKey", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", (event) => received.push(event), {
sessionKey: "session-1",
});
bus.emit({ topic: "test.event", payload: 1, sessionKey: "session-1" });
bus.emit({ topic: "test.event", payload: 2, sessionKey: "session-2" });
bus.emit({ topic: "test.event", payload: 3 });
expect(received).toHaveLength(1);
expect(received[0].payload).toBe(1);
});
it("filters by source", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", (event) => received.push(event), {
source: "agent",
});
bus.emit({ topic: "test.event", payload: 1, source: "agent" });
bus.emit({ topic: "test.event", payload: 2, source: "channel" });
expect(received).toHaveLength(1);
expect(received[0].payload).toBe(1);
});
});
describe("priority ordering", () => {
it("executes higher priority handlers first", () => {
const order: number[] = [];
bus.subscribe("test.event", () => order.push(1), { priority: 1 });
bus.subscribe("test.event", () => order.push(10), { priority: 10 });
bus.subscribe("test.event", () => order.push(5), { priority: 5 });
bus.emit({ topic: "test.event", payload: {} });
expect(order).toEqual([10, 5, 1]);
});
});
describe("error handling", () => {
it("isolates handler errors by default", () => {
const received: EventEnvelope[] = [];
bus.subscribe("test.event", () => {
throw new Error("Handler error");
});
bus.subscribe("test.event", (event) => received.push(event));
bus.emit({ topic: "test.event", payload: {} });
expect(received).toHaveLength(1);
});
it("propagates errors when configured", () => {
bus.subscribe(
"test.event",
() => {
throw new Error("Handler error");
},
{ propagateErrors: true },
);
expect(() => bus.emit({ topic: "test.event", payload: {} })).toThrow("Handler error");
});
});
describe("async handlers", () => {
it("emitAsync waits for all handlers", async () => {
const completed: number[] = [];
bus.subscribe("test.event", async () => {
await new Promise((r) => setTimeout(r, 10));
completed.push(1);
});
bus.subscribe("test.event", async () => {
await new Promise((r) => setTimeout(r, 5));
completed.push(2);
});
await bus.emitAsync({ topic: "test.event", payload: {} });
expect(completed).toHaveLength(2);
});
it("emit does not wait for async handlers", () => {
let completed = false;
bus.subscribe("test.event", async () => {
await new Promise((r) => setTimeout(r, 10));
completed = true;
});
bus.emit({ topic: "test.event", payload: {} });
expect(completed).toBe(false);
});
});
describe("shutdown", () => {
it("clears all subscriptions", () => {
bus.subscribe("test.event", () => {});
bus.subscribe("other.event", () => {});
expect(bus.subscriptionCount()).toBe(2);
bus.shutdown();
expect(bus.subscriptionCount()).toBe(0);
});
it("rejects new operations after shutdown", () => {
bus.shutdown();
expect(() => bus.emit({ topic: "test", payload: {} })).toThrow("EventBus is shut down");
expect(() => bus.subscribe("test", () => {})).toThrow("EventBus is shut down");
});
});
});
describe("singleton management", () => {
afterEach(() => {
resetEventBus();
});
it("getEventBus returns same instance", () => {
const bus1 = getEventBus();
const bus2 = getEventBus();
expect(bus1).toBe(bus2);
});
it("setEventBus replaces default", () => {
const custom = createEventBus();
setEventBus(custom);
expect(getEventBus()).toBe(custom);
custom.shutdown();
});
it("resetEventBus creates new instance", () => {
const bus1 = getEventBus();
resetEventBus();
const bus2 = getEventBus();
expect(bus1).not.toBe(bus2);
});
});

408
src/events/bus.ts Normal file
View File

@ -0,0 +1,408 @@
/**
* Event Bus - In-Process Implementation
*
* A high-performance, type-safe event bus for Moltbot.
* Supports topic patterns, priority ordering, and async handlers.
*/
import type {
EventBus,
EventBusConfig,
EventEnvelope,
EventHandler,
EventInput,
Subscription,
SubscribeOptions,
TopicPattern,
} from "./types.js";
import { topicMatches } from "./types.js";
// ============================================================================
// Internal Types
// ============================================================================
type InternalSubscription = {
id: string;
pattern: TopicPattern;
handler: EventHandler<EventEnvelope>;
options: SubscribeOptions;
once: boolean;
};
// ============================================================================
// Implementation
// ============================================================================
let subscriptionIdCounter = 0;
function generateSubscriptionId(): string {
return `sub_${++subscriptionIdCounter}_${Date.now().toString(36)}`;
}
/**
* Create an in-process event bus
*/
export function createEventBus(config: EventBusConfig = {}): EventBus {
const {
store,
maxConcurrency = 10,
handlerTimeout = 30_000,
logger = {
error: console.error,
warn: console.warn,
debug: () => {},
},
} = config;
let seq = 0;
let isShutdown = false;
// Subscriptions indexed by pattern for efficient lookup
const subscriptions = new Map<string, InternalSubscription>();
// Cache of patterns for faster matching
const patternCache = new Set<TopicPattern>();
// -------------------------------------------------------------------------
// Subscription Management
// -------------------------------------------------------------------------
const subscribe = <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options: SubscribeOptions = {},
): Subscription => {
if (isShutdown) {
throw new Error("EventBus is shut down");
}
const id = generateSubscriptionId();
const sub: InternalSubscription = {
id,
pattern,
handler: handler as EventHandler<EventEnvelope>,
options,
once: false,
};
subscriptions.set(id, sub);
patternCache.add(pattern);
logger.debug(`Subscribed ${id} to pattern "${pattern}"`);
return {
id,
pattern,
unsubscribe: () => unsubscribe(id),
};
};
const once = <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options: SubscribeOptions = {},
): Subscription => {
const id = generateSubscriptionId();
const sub: InternalSubscription = {
id,
pattern,
handler: handler as EventHandler<EventEnvelope>,
options,
once: true,
};
subscriptions.set(id, sub);
patternCache.add(pattern);
return {
id,
pattern,
unsubscribe: () => unsubscribe(id),
};
};
const unsubscribe = (subscriptionId: string): boolean => {
const sub = subscriptions.get(subscriptionId);
if (!sub) return false;
subscriptions.delete(subscriptionId);
// Rebuild pattern cache if needed
let patternStillUsed = false;
for (const s of subscriptions.values()) {
if (s.pattern === sub.pattern) {
patternStillUsed = true;
break;
}
}
if (!patternStillUsed) {
patternCache.delete(sub.pattern);
}
logger.debug(`Unsubscribed ${subscriptionId} from pattern "${sub.pattern}"`);
return true;
};
const unsubscribeAll = (pattern: TopicPattern): number => {
let count = 0;
for (const [id, sub] of subscriptions) {
if (sub.pattern === pattern) {
subscriptions.delete(id);
count++;
}
}
if (count > 0) {
patternCache.delete(pattern);
}
return count;
};
// -------------------------------------------------------------------------
// Event Emission
// -------------------------------------------------------------------------
const findMatchingSubscriptions = (
topic: string,
sessionKey?: string,
source?: string,
): InternalSubscription[] => {
const matching: InternalSubscription[] = [];
for (const sub of subscriptions.values()) {
// Check topic pattern match
if (!topicMatches(sub.pattern, topic)) continue;
// Check session key filter
if (sub.options.sessionKey && sub.options.sessionKey !== sessionKey) {
continue;
}
// Check source filter
if (sub.options.source && sub.options.source !== source) {
continue;
}
matching.push(sub);
}
// Sort by priority (higher first)
matching.sort((a, b) => (b.options.priority ?? 0) - (a.options.priority ?? 0));
return matching;
};
const executeHandler = async (sub: InternalSubscription, event: EventEnvelope): Promise<void> => {
try {
const result = sub.handler(event);
if (result instanceof Promise) {
// Apply timeout
await Promise.race([
result,
new Promise<void>((_, reject) =>
setTimeout(
() => reject(new Error(`Handler timeout after ${handlerTimeout}ms`)),
handlerTimeout,
),
),
]);
}
} catch (err) {
if (sub.options.propagateErrors) {
throw err;
}
logger.error(`Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`);
}
};
const emit = <T extends string, P>(input: EventInput<T, P>): EventEnvelope<T, P> => {
if (isShutdown) {
throw new Error("EventBus is shut down");
}
const event: EventEnvelope<T, P> = {
...input,
seq: ++seq,
ts: Date.now(),
};
const matching = findMatchingSubscriptions(event.topic, event.sessionKey, event.source);
// Track once-subscriptions to remove
const toRemove: string[] = [];
for (const sub of matching) {
if (sub.once) {
toRemove.push(sub.id);
}
// Execute sync, fire-and-forget for async
try {
const result = sub.handler(event);
if (result instanceof Promise) {
// Don't await - fire and forget
result.catch((err) => {
if (sub.options.propagateErrors) {
// Can't propagate from fire-and-forget, log instead
logger.error(
`Async handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`,
);
} else {
logger.error(
`Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`,
);
}
});
}
} catch (err) {
if (sub.options.propagateErrors) {
throw err;
}
logger.error(`Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`);
}
}
// Remove once-subscriptions
for (const id of toRemove) {
unsubscribe(id);
}
// Persist if store configured (async, don't block emit)
if (store) {
store.append(event).catch((err) => {
logger.error(`Failed to persist event ${event.seq}: ${String(err)}`);
});
}
return event;
};
const emitAsync = async <T extends string, P>(
input: EventInput<T, P>,
): Promise<EventEnvelope<T, P>> => {
if (isShutdown) {
throw new Error("EventBus is shut down");
}
const event: EventEnvelope<T, P> = {
...input,
seq: ++seq,
ts: Date.now(),
};
const matching = findMatchingSubscriptions(event.topic, event.sessionKey, event.source);
// Track once-subscriptions to remove
const toRemove: string[] = [];
// Execute handlers with concurrency limit
const pending: Promise<void>[] = [];
let running = 0;
for (const sub of matching) {
if (sub.once) {
toRemove.push(sub.id);
}
// Wait if at concurrency limit
while (running >= maxConcurrency) {
await Promise.race(pending);
}
running++;
const p = executeHandler(sub, event).finally(() => {
running--;
const idx = pending.indexOf(p);
if (idx !== -1) void pending.splice(idx, 1);
});
pending.push(p);
}
// Wait for all remaining handlers
await Promise.all(pending);
// Remove once-subscriptions
for (const id of toRemove) {
unsubscribe(id);
}
// Persist if store configured
if (store) {
await store.append(event);
}
return event;
};
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
const getSeq = (): number => seq;
const hasSubscribers = (topic: string): boolean => {
// Quick check: any pattern could match?
if (patternCache.size === 0) return false;
// Check each subscription
for (const sub of subscriptions.values()) {
if (topicMatches(sub.pattern, topic)) {
return true;
}
}
return false;
};
const subscriptionCount = (): number => subscriptions.size;
const shutdown = (): void => {
isShutdown = true;
subscriptions.clear();
patternCache.clear();
logger.debug("EventBus shut down");
};
return {
emit,
emitAsync,
subscribe,
once,
unsubscribe,
unsubscribeAll,
getSeq,
hasSubscribers,
subscriptionCount,
shutdown,
};
}
// ============================================================================
// Singleton Instance
// ============================================================================
let defaultBus: EventBus | null = null;
/**
* Get the default event bus instance (creates one if needed)
*/
export function getEventBus(): EventBus {
if (!defaultBus) {
defaultBus = createEventBus();
}
return defaultBus;
}
/**
* Set a custom event bus as the default
*/
export function setEventBus(bus: EventBus): void {
defaultBus = bus;
}
/**
* Reset the default event bus (useful for testing)
*/
export function resetEventBus(): void {
if (defaultBus) {
defaultBus.shutdown();
defaultBus = null;
}
}

109
src/events/catalog.test.ts Normal file
View File

@ -0,0 +1,109 @@
import { describe, it, expect } from "vitest";
import { createEvent, isEventOfType, type TypedEvent } from "./catalog.js";
import type { EventEnvelope } from "./types.js";
describe("createEvent", () => {
it("creates typed event input", () => {
const event = createEvent("channel.message.received", {
channelId: "whatsapp",
accountId: "default",
messageId: "msg123",
chatId: "+1234567890",
chatType: "dm",
senderId: "+1234567890",
text: "Hello!",
hasMedia: false,
timestamp: Date.now(),
});
expect(event.topic).toBe("channel.message.received");
expect(event.payload.channelId).toBe("whatsapp");
expect(event.payload.text).toBe("Hello!");
});
it("includes optional fields", () => {
const event = createEvent(
"agent.run.started",
{
runId: "run123",
agentId: "default",
model: "claude-3-opus",
trigger: "message",
},
{
correlationId: "corr123",
source: "test",
sessionKey: "session123",
},
);
expect(event.correlationId).toBe("corr123");
expect(event.source).toBe("test");
expect(event.sessionKey).toBe("session123");
});
});
describe("isEventOfType", () => {
it("returns true for matching topic", () => {
const event: EventEnvelope = {
topic: "channel.message.received",
payload: {
channelId: "whatsapp",
accountId: "default",
messageId: "msg123",
chatId: "+1234567890",
chatType: "dm",
senderId: "+1234567890",
hasMedia: false,
timestamp: Date.now(),
},
seq: 1,
ts: Date.now(),
};
expect(isEventOfType(event, "channel.message.received")).toBe(true);
expect(isEventOfType(event, "channel.message.sent")).toBe(false);
});
it("narrows type correctly", () => {
const event: EventEnvelope = {
topic: "agent.run.completed",
payload: {
runId: "run123",
agentId: "default",
success: true,
duration: 1000,
inputTokens: 100,
outputTokens: 200,
toolCalls: 5,
},
seq: 1,
ts: Date.now(),
};
if (isEventOfType(event, "agent.run.completed")) {
// TypeScript should know the payload type here
expect(event.payload.runId).toBe("run123");
expect(event.payload.success).toBe(true);
expect(event.payload.toolCalls).toBe(5);
}
});
});
describe("TypedEvent type", () => {
it("provides correct payload type", () => {
// This is a compile-time test - if it compiles, it works
const event: TypedEvent<"channel.status.changed"> = {
topic: "channel.status.changed",
payload: {
channelId: "telegram",
accountId: "bot123",
status: "connected",
},
seq: 1,
ts: Date.now(),
};
expect(event.payload.status).toBe("connected");
});
});

382
src/events/catalog.ts Normal file
View File

@ -0,0 +1,382 @@
/**
* Event Catalog - Typed System Events
*
* All system events are defined here with their payloads.
* This provides type safety and documentation for event consumers.
*
* Topic naming convention:
* - domain.entity.action (e.g., "channel.message.received")
* - Use past tense for events (received, completed, failed)
* - Use present tense for commands (send, execute, start)
*/
import type { EventEnvelope, EventInput } from "./types.js";
// ============================================================================
// Channel Events
// ============================================================================
/**
* A message was received from a channel
*/
export type ChannelMessageReceivedPayload = {
channelId: string;
accountId: string;
messageId: string;
chatId: string;
chatType: "dm" | "group";
senderId: string;
senderName?: string;
text?: string;
hasMedia: boolean;
mediaType?: "image" | "video" | "audio" | "document" | "sticker";
replyToMessageId?: string;
mentions?: string[];
timestamp: number;
};
/**
* A message was sent to a channel
*/
export type ChannelMessageSentPayload = {
channelId: string;
accountId: string;
messageId: string;
chatId: string;
text?: string;
hasMedia: boolean;
/** Duration in ms to send */
sendDuration: number;
};
/**
* Message delivery failed
*/
export type ChannelMessageFailedPayload = {
channelId: string;
accountId: string;
chatId: string;
error: string;
retryable: boolean;
};
/**
* Channel connection status changed
*/
export type ChannelStatusChangedPayload = {
channelId: string;
accountId: string;
status: "connected" | "disconnected" | "connecting" | "error";
error?: string;
previousStatus?: string;
};
/**
* Channel pairing requested (e.g., QR code for WhatsApp)
*/
export type ChannelPairingRequestedPayload = {
channelId: string;
accountId: string;
pairingData?: string; // QR code data, phone code, etc.
expiresAt?: number;
};
// ============================================================================
// Agent Events
// ============================================================================
/**
* Agent run started
*/
export type AgentRunStartedPayload = {
runId: string;
agentId: string;
model: string;
trigger: "message" | "cron" | "webhook" | "command" | "internal";
inputTokens?: number;
};
/**
* Agent produced a text chunk (streaming)
*/
export type AgentTextChunkPayload = {
runId: string;
agentId: string;
chunk: string;
/** Cumulative text so far */
accumulated: string;
};
/**
* Agent is executing a tool
*/
export type AgentToolExecutingPayload = {
runId: string;
agentId: string;
toolName: string;
toolInput: Record<string, unknown>;
};
/**
* Agent tool execution completed
*/
export type AgentToolCompletedPayload = {
runId: string;
agentId: string;
toolName: string;
success: boolean;
duration: number;
error?: string;
};
/**
* Agent run completed
*/
export type AgentRunCompletedPayload = {
runId: string;
agentId: string;
success: boolean;
duration: number;
inputTokens: number;
outputTokens: number;
toolCalls: number;
error?: string;
};
// ============================================================================
// Workflow Events
// ============================================================================
/**
* A workflow was triggered
*/
export type WorkflowStartedPayload = {
workflowId: string;
workflowName: string;
trigger: {
type: "channel" | "cron" | "webhook" | "manual";
source?: string;
};
input?: Record<string, unknown>;
};
/**
* Workflow state transition
*/
export type WorkflowStateChangedPayload = {
workflowId: string;
workflowName: string;
previousState: string;
newState: string;
data?: Record<string, unknown>;
};
/**
* Workflow completed
*/
export type WorkflowCompletedPayload = {
workflowId: string;
workflowName: string;
success: boolean;
duration: number;
finalState: string;
output?: Record<string, unknown>;
error?: string;
};
// ============================================================================
// Plugin Events
// ============================================================================
/**
* Plugin was loaded
*/
export type PluginLoadedPayload = {
pluginId: string;
pluginName: string;
version: string;
capabilities: string[];
};
/**
* Plugin emitted a custom event
*/
export type PluginCustomEventPayload = {
pluginId: string;
eventName: string;
data: unknown;
};
/**
* Plugin encountered an error
*/
export type PluginErrorPayload = {
pluginId: string;
pluginName: string;
error: string;
context?: string;
};
// ============================================================================
// System Events
// ============================================================================
/**
* Gateway started
*/
export type GatewayStartedPayload = {
version: string;
port: number;
mode: "local" | "remote";
channels: string[];
plugins: string[];
};
/**
* Gateway shutting down
*/
export type GatewayShuttingDownPayload = {
reason: "signal" | "error" | "manual";
gracePeriod: number;
};
/**
* Configuration was reloaded
*/
export type ConfigReloadedPayload = {
changedSections: string[];
triggeredBy: "file-watch" | "api" | "manual";
};
/**
* Cron job executed
*/
export type CronExecutedPayload = {
jobId: string;
jobName: string;
success: boolean;
duration: number;
nextRun?: number;
error?: string;
};
/**
* Security approval requested
*/
export type SecurityApprovalRequestedPayload = {
requestId: string;
type: "exec" | "tool" | "action";
description: string;
requestedBy: string;
expiresAt: number;
};
/**
* Security approval resolved
*/
export type SecurityApprovalResolvedPayload = {
requestId: string;
approved: boolean;
resolvedBy: string;
reason?: string;
};
// ============================================================================
// Webhook Events
// ============================================================================
/**
* External webhook received
*/
export type WebhookReceivedPayload = {
webhookId: string;
source: string;
method: string;
path: string;
headers: Record<string, string>;
body?: unknown;
};
// ============================================================================
// Event Type Map
// ============================================================================
/**
* Complete map of topics to their payloads
*/
export type EventTopicMap = {
// Channel events
"channel.message.received": ChannelMessageReceivedPayload;
"channel.message.sent": ChannelMessageSentPayload;
"channel.message.failed": ChannelMessageFailedPayload;
"channel.status.changed": ChannelStatusChangedPayload;
"channel.pairing.requested": ChannelPairingRequestedPayload;
// Agent events
"agent.run.started": AgentRunStartedPayload;
"agent.text.chunk": AgentTextChunkPayload;
"agent.tool.executing": AgentToolExecutingPayload;
"agent.tool.completed": AgentToolCompletedPayload;
"agent.run.completed": AgentRunCompletedPayload;
// Workflow events
"workflow.started": WorkflowStartedPayload;
"workflow.state.changed": WorkflowStateChangedPayload;
"workflow.completed": WorkflowCompletedPayload;
// Plugin events
"plugin.loaded": PluginLoadedPayload;
"plugin.custom": PluginCustomEventPayload;
"plugin.error": PluginErrorPayload;
// System events
"gateway.started": GatewayStartedPayload;
"gateway.shutting_down": GatewayShuttingDownPayload;
"config.reloaded": ConfigReloadedPayload;
"cron.executed": CronExecutedPayload;
// Security events
"security.approval.requested": SecurityApprovalRequestedPayload;
"security.approval.resolved": SecurityApprovalResolvedPayload;
// Webhook events
"webhook.received": WebhookReceivedPayload;
};
// ============================================================================
// Typed Event Helpers
// ============================================================================
/**
* Create a typed event input for a known topic
*/
export function createEvent<T extends keyof EventTopicMap>(
topic: T,
payload: EventTopicMap[T],
options?: {
correlationId?: string;
source?: string;
sessionKey?: string;
},
): EventInput<T, EventTopicMap[T]> {
return {
topic,
payload,
...options,
};
}
/**
* Type guard to check if an event matches a topic
*/
export function isEventOfType<T extends keyof EventTopicMap>(
event: EventEnvelope,
topic: T,
): event is EventEnvelope<T, EventTopicMap[T]> {
return event.topic === topic;
}
/**
* Typed event envelope for a known topic
*/
export type TypedEvent<T extends keyof EventTopicMap> = EventEnvelope<T, EventTopicMap[T]>;

View File

@ -0,0 +1,444 @@
/**
* Event Bus - Gateway Integration
*
* Bridges the gateway's existing event patterns to the unified event bus.
* This module:
* - Creates and configures the event bus for gateway use
* - Bridges agent events, diagnostic events, etc. to the bus
* - Provides typed emit helpers for common events
* - Integrates with the gateway broadcast system
*/
import type { MoltbotConfig } from "../config/config.js";
import { onAgentEvent, type AgentEventPayload } from "../infra/agent-events.js";
import { onDiagnosticEvent, type DiagnosticEventPayload } from "../infra/diagnostic-events.js";
import { createEventBus, setEventBus, getEventBus } from "./bus.js";
import type { EventBus, EventBusConfig } from "./types.js";
import { createEventStore, getDefaultEventStorePath } from "./store.js";
import { createEvent, type EventTopicMap } from "./catalog.js";
// ============================================================================
// Gateway Event Bus Configuration
// ============================================================================
export type GatewayEventBusConfig = {
/** Enable event persistence (default: false) */
persistEvents?: boolean;
/** Custom path for event store database */
eventStorePath?: string;
/** Max events to keep in store (default: 100000) */
maxStoredEvents?: number;
/** Logger */
logger?: EventBusConfig["logger"];
};
export type GatewayEventBusHandle = {
/** The event bus instance */
bus: EventBus;
/** Cleanup function - call on gateway shutdown */
cleanup: () => void;
/** Typed emit helpers */
emit: GatewayEventEmitters;
};
// ============================================================================
// Typed Emit Helpers
// ============================================================================
/**
* Typed event emitters for common gateway events
*/
export type GatewayEventEmitters = {
// Channel events
channelMessageReceived: (
payload: EventTopicMap["channel.message.received"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
channelMessageSent: (
payload: EventTopicMap["channel.message.sent"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
channelMessageFailed: (
payload: EventTopicMap["channel.message.failed"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
channelStatusChanged: (payload: EventTopicMap["channel.status.changed"]) => void;
// Agent events
agentRunStarted: (
payload: EventTopicMap["agent.run.started"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
agentRunCompleted: (
payload: EventTopicMap["agent.run.completed"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
agentToolExecuting: (
payload: EventTopicMap["agent.tool.executing"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
agentToolCompleted: (
payload: EventTopicMap["agent.tool.completed"],
opts?: { sessionKey?: string; correlationId?: string },
) => void;
// System events
gatewayStarted: (payload: EventTopicMap["gateway.started"]) => void;
gatewayShuttingDown: (payload: EventTopicMap["gateway.shutting_down"]) => void;
configReloaded: (payload: EventTopicMap["config.reloaded"]) => void;
cronExecuted: (payload: EventTopicMap["cron.executed"]) => void;
// Plugin events
pluginLoaded: (payload: EventTopicMap["plugin.loaded"]) => void;
pluginCustom: (payload: EventTopicMap["plugin.custom"], opts?: { sessionKey?: string }) => void;
pluginError: (payload: EventTopicMap["plugin.error"]) => void;
// Security events
securityApprovalRequested: (
payload: EventTopicMap["security.approval.requested"],
opts?: { sessionKey?: string },
) => void;
securityApprovalResolved: (
payload: EventTopicMap["security.approval.resolved"],
opts?: { sessionKey?: string },
) => void;
// Webhook events
webhookReceived: (payload: EventTopicMap["webhook.received"]) => void;
};
function createGatewayEventEmitters(bus: EventBus): GatewayEventEmitters {
return {
// Channel events
channelMessageReceived: (payload, opts) =>
bus.emit(
createEvent("channel.message.received", payload, {
source: `channel:${payload.channelId}`,
...opts,
}),
),
channelMessageSent: (payload, opts) =>
bus.emit(
createEvent("channel.message.sent", payload, {
source: `channel:${payload.channelId}`,
...opts,
}),
),
channelMessageFailed: (payload, opts) =>
bus.emit(
createEvent("channel.message.failed", payload, {
source: `channel:${payload.channelId}`,
...opts,
}),
),
channelStatusChanged: (payload) =>
bus.emit(
createEvent("channel.status.changed", payload, {
source: `channel:${payload.channelId}`,
}),
),
// Agent events
agentRunStarted: (payload, opts) =>
bus.emit(
createEvent("agent.run.started", payload, {
source: `agent:${payload.agentId}`,
correlationId: payload.runId,
...opts,
}),
),
agentRunCompleted: (payload, opts) =>
bus.emit(
createEvent("agent.run.completed", payload, {
source: `agent:${payload.agentId}`,
correlationId: payload.runId,
...opts,
}),
),
agentToolExecuting: (payload, opts) =>
bus.emit(
createEvent("agent.tool.executing", payload, {
source: `agent:${payload.agentId}`,
correlationId: payload.runId,
...opts,
}),
),
agentToolCompleted: (payload, opts) =>
bus.emit(
createEvent("agent.tool.completed", payload, {
source: `agent:${payload.agentId}`,
correlationId: payload.runId,
...opts,
}),
),
// System events
gatewayStarted: (payload) =>
bus.emit(createEvent("gateway.started", payload, { source: "gateway" })),
gatewayShuttingDown: (payload) =>
bus.emit(createEvent("gateway.shutting_down", payload, { source: "gateway" })),
configReloaded: (payload) =>
bus.emit(createEvent("config.reloaded", payload, { source: "gateway" })),
cronExecuted: (payload) => bus.emit(createEvent("cron.executed", payload, { source: "cron" })),
// Plugin events
pluginLoaded: (payload) =>
bus.emit(createEvent("plugin.loaded", payload, { source: `plugin:${payload.pluginId}` })),
pluginCustom: (payload, opts) =>
bus.emit(
createEvent("plugin.custom", payload, {
source: `plugin:${payload.pluginId}`,
...opts,
}),
),
pluginError: (payload) =>
bus.emit(createEvent("plugin.error", payload, { source: `plugin:${payload.pluginId}` })),
// Security events
securityApprovalRequested: (payload, opts) =>
bus.emit(
createEvent("security.approval.requested", payload, {
source: "security",
...opts,
}),
),
securityApprovalResolved: (payload, opts) =>
bus.emit(
createEvent("security.approval.resolved", payload, {
source: "security",
...opts,
}),
),
// Webhook events
webhookReceived: (payload) =>
bus.emit(
createEvent("webhook.received", payload, {
source: `webhook:${payload.webhookId}`,
}),
),
};
}
// ============================================================================
// Agent Event Bridge
// ============================================================================
/**
* Map agent event streams to event bus topics
*/
function bridgeAgentEvent(bus: EventBus, evt: AgentEventPayload): void {
const { runId, sessionKey, data } = evt;
// Extract common fields
const agentId = (data.agentId as string) || "default";
switch (evt.stream) {
case "run.start":
bus.emit(
createEvent(
"agent.run.started",
{
runId,
agentId,
model: (data.model as string) || "unknown",
trigger:
(data.trigger as "message" | "cron" | "webhook" | "command" | "internal") ||
"internal",
inputTokens: data.inputTokens as number | undefined,
},
{
source: `agent:${agentId}`,
sessionKey,
correlationId: runId,
},
),
);
break;
case "run.complete":
bus.emit(
createEvent(
"agent.run.completed",
{
runId,
agentId,
success: (data.success as boolean) ?? true,
duration: (data.duration as number) || 0,
inputTokens: (data.inputTokens as number) || 0,
outputTokens: (data.outputTokens as number) || 0,
toolCalls: (data.toolCalls as number) || 0,
error: data.error as string | undefined,
},
{
source: `agent:${agentId}`,
sessionKey,
correlationId: runId,
},
),
);
break;
case "tool.start":
bus.emit(
createEvent(
"agent.tool.executing",
{
runId,
agentId,
toolName: (data.toolName as string) || "unknown",
toolInput: (data.toolInput as Record<string, unknown>) || {},
},
{
source: `agent:${agentId}`,
sessionKey,
correlationId: runId,
},
),
);
break;
case "tool.complete":
bus.emit(
createEvent(
"agent.tool.completed",
{
runId,
agentId,
toolName: (data.toolName as string) || "unknown",
success: (data.success as boolean) ?? true,
duration: (data.duration as number) || 0,
error: data.error as string | undefined,
},
{
source: `agent:${agentId}`,
sessionKey,
correlationId: runId,
},
),
);
break;
case "text.delta":
bus.emit(
createEvent(
"agent.text.chunk",
{
runId,
agentId,
chunk: (data.chunk as string) || "",
accumulated: (data.accumulated as string) || "",
},
{
source: `agent:${agentId}`,
sessionKey,
correlationId: runId,
},
),
);
break;
// Other streams can be added as needed
}
}
// ============================================================================
// Diagnostic Event Bridge
// ============================================================================
/**
* Bridge diagnostic events to the event bus
*/
function bridgeDiagnosticEvent(bus: EventBus, evt: DiagnosticEventPayload): void {
// Map diagnostic events to appropriate bus topics
// These are internal/debug events, so we emit them under a diagnostic namespace
bus.emit({
topic: `diagnostic.${evt.type}`,
payload: evt,
source: "diagnostic",
sessionKey: "sessionKey" in evt ? (evt.sessionKey as string) : undefined,
});
}
// ============================================================================
// Main Integration
// ============================================================================
/**
* Initialize the gateway event bus
*
* Call this early in gateway startup to set up the event bus
* and bridge existing event patterns.
*/
export function initGatewayEventBus(config: GatewayEventBusConfig = {}): GatewayEventBusHandle {
const { persistEvents = false, eventStorePath, maxStoredEvents = 100_000, logger } = config;
// Create event store if persistence enabled
let store: ReturnType<typeof createEventStore> | undefined;
if (persistEvents) {
store = createEventStore({
dbPath: eventStorePath ?? getDefaultEventStorePath(),
maxEvents: maxStoredEvents,
logger,
});
}
// Create the event bus
const bus = createEventBus({
store,
logger,
});
// Set as default bus
setEventBus(bus);
// Bridge existing agent events
const agentUnsub = onAgentEvent((evt) => {
try {
bridgeAgentEvent(bus, evt);
} catch {
// Ignore bridging errors
}
});
// Bridge diagnostic events
const diagnosticUnsub = onDiagnosticEvent((evt) => {
try {
bridgeDiagnosticEvent(bus, evt);
} catch {
// Ignore bridging errors
}
});
// Create typed emitters
const emit = createGatewayEventEmitters(bus);
// Cleanup function
const cleanup = () => {
agentUnsub();
diagnosticUnsub();
bus.shutdown();
store?.close();
};
return { bus, cleanup, emit };
}
/**
* Extract event bus config from moltbot config
*/
export function extractEventBusConfig(cfg: MoltbotConfig): GatewayEventBusConfig {
const eventsCfg = (cfg as Record<string, unknown>).events as Record<string, unknown> | undefined;
return {
persistEvents: eventsCfg?.persist === true,
eventStorePath: typeof eventsCfg?.storePath === "string" ? eventsCfg.storePath : undefined,
maxStoredEvents: typeof eventsCfg?.maxEvents === "number" ? eventsCfg.maxEvents : undefined,
};
}
// ============================================================================
// Re-export for Convenience
// ============================================================================
export { getEventBus } from "./bus.js";
export type { EventBus } from "./types.js";

100
src/events/index.ts Normal file
View File

@ -0,0 +1,100 @@
/**
* Event Bus - Public API
*
* Unified event system for Moltbot enabling:
* - Cross-channel coordination
* - Async workflows
* - Plugin communication
* - External integrations
*
* @example
* ```ts
* import { getEventBus, createEvent } from "./events/index.js";
*
* const bus = getEventBus();
*
* // Subscribe to channel messages
* bus.subscribe("channel.message.received", (event) => {
* console.log(`Message from ${event.payload.channelId}: ${event.payload.text}`);
* });
*
* // Subscribe with wildcards
* bus.subscribe("agent.#", (event) => {
* console.log(`Agent event: ${event.topic}`);
* });
*
* // Emit a typed event
* bus.emit(createEvent("channel.message.received", {
* channelId: "whatsapp",
* accountId: "default",
* messageId: "msg123",
* chatId: "+1234567890",
* chatType: "dm",
* senderId: "+1234567890",
* text: "Hello!",
* hasMedia: false,
* timestamp: Date.now(),
* }));
* ```
*/
// Core types
export type {
EventEnvelope,
EventInput,
EventHandler,
Subscription,
SubscribeOptions,
TopicPattern,
EventBus,
EventBusConfig,
EventStore,
PersistedEvent,
} from "./types.js";
export { topicMatches } from "./types.js";
// Bus implementation
export { createEventBus, getEventBus, setEventBus, resetEventBus } from "./bus.js";
// Event catalog
export type {
EventTopicMap,
TypedEvent,
// Channel events
ChannelMessageReceivedPayload,
ChannelMessageSentPayload,
ChannelMessageFailedPayload,
ChannelStatusChangedPayload,
ChannelPairingRequestedPayload,
// Agent events
AgentRunStartedPayload,
AgentTextChunkPayload,
AgentToolExecutingPayload,
AgentToolCompletedPayload,
AgentRunCompletedPayload,
// Workflow events
WorkflowStartedPayload,
WorkflowStateChangedPayload,
WorkflowCompletedPayload,
// Plugin events
PluginLoadedPayload,
PluginCustomEventPayload,
PluginErrorPayload,
// System events
GatewayStartedPayload,
GatewayShuttingDownPayload,
ConfigReloadedPayload,
CronExecutedPayload,
// Security events
SecurityApprovalRequestedPayload,
SecurityApprovalResolvedPayload,
// Webhook events
WebhookReceivedPayload,
} from "./catalog.js";
export { createEvent, isEventOfType } from "./catalog.js";
// Persistence
export type { EventStoreConfig } from "./store.js";
export { createEventStore, getDefaultEventStorePath } from "./store.js";

245
src/events/plugin-api.ts Normal file
View File

@ -0,0 +1,245 @@
/**
* Event Bus - Plugin API
*
* Provides event bus access to plugins, enabling:
* - Subscribing to system events
* - Emitting custom plugin events
* - Cross-plugin communication
*/
import type {
EventEnvelope,
EventHandler,
Subscription,
SubscribeOptions,
TopicPattern,
} from "./types.js";
import type { EventTopicMap, PluginCustomEventPayload } from "./catalog.js";
import { getEventBus } from "./bus.js";
import { createEvent } from "./catalog.js";
// ============================================================================
// Plugin Event API Types
// ============================================================================
/**
* Event API exposed to plugins
*/
export type PluginEventApi = {
/**
* Subscribe to events matching a topic pattern
*
* @example
* ```ts
* // Subscribe to all channel messages
* api.events.subscribe("channel.message.received", (event) => {
* console.log(`Message: ${event.payload.text}`);
* });
*
* // Subscribe with wildcards
* api.events.subscribe("agent.#", (event) => {
* console.log(`Agent event: ${event.topic}`);
* });
* ```
*/
subscribe: <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
) => Subscription;
/**
* Subscribe to a single event (auto-unsubscribes after first match)
*/
once: <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
) => Subscription;
/**
* Emit a custom plugin event
*
* @example
* ```ts
* api.events.emit("my-feature.activated", { userId: "123" });
* ```
*/
emit: (eventName: string, data: unknown, options?: { sessionKey?: string }) => void;
/**
* Emit a typed system event (for plugins that extend core functionality)
*
* @example
* ```ts
* api.events.emitTyped("channel.message.received", {
* channelId: "my-channel",
* // ... other required fields
* });
* ```
*/
emitTyped: <T extends keyof EventTopicMap>(
topic: T,
payload: EventTopicMap[T],
options?: { sessionKey?: string; correlationId?: string },
) => void;
/**
* Check if there are any subscribers for a topic
*/
hasSubscribers: (topic: string) => boolean;
};
// ============================================================================
// Implementation
// ============================================================================
/**
* Create the event API for a plugin
*/
export function createPluginEventApi(pluginId: string): PluginEventApi {
const bus = getEventBus();
// Track subscriptions for cleanup
const subscriptions: Subscription[] = [];
const subscribe = <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
): Subscription => {
const sub = bus.subscribe(pattern, handler, options);
subscriptions.push(sub);
return sub;
};
const once = <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
): Subscription => {
const sub = bus.once(pattern, handler, options);
subscriptions.push(sub);
return sub;
};
const emit = (eventName: string, data: unknown, options?: { sessionKey?: string }): void => {
const payload: PluginCustomEventPayload = {
pluginId,
eventName,
data,
};
bus.emit(
createEvent("plugin.custom", payload, {
source: `plugin:${pluginId}`,
sessionKey: options?.sessionKey,
}),
);
};
const emitTyped = <T extends keyof EventTopicMap>(
topic: T,
payload: EventTopicMap[T],
options?: { sessionKey?: string; correlationId?: string },
): void => {
bus.emit(
createEvent(topic, payload, {
source: `plugin:${pluginId}`,
...options,
}),
);
};
const hasSubscribers = (topic: string): boolean => {
return bus.hasSubscribers(topic);
};
return {
subscribe,
once,
emit,
emitTyped,
hasSubscribers,
};
}
/**
* Cleanup all subscriptions for a plugin
*/
export function cleanupPluginEventSubscriptions(_api: PluginEventApi): void {
// The subscriptions are tracked internally, but for safety
// plugins should call unsubscribe on returned Subscription objects
}
// ============================================================================
// Event Helpers for Plugins
// ============================================================================
/**
* Helper to create a typed event handler
*/
export function createTypedHandler<T extends keyof EventTopicMap>(
topic: T,
handler: (
payload: EventTopicMap[T],
event: EventEnvelope<T, EventTopicMap[T]>,
) => void | Promise<void>,
): EventHandler<EventEnvelope<T, EventTopicMap[T]>> {
return (event) => handler(event.payload, event);
}
/**
* Helper to filter events by session
*/
export function forSession(sessionKey: string, handler: EventHandler): EventHandler {
return (event) => {
if (event.sessionKey === sessionKey) {
return handler(event);
}
};
}
/**
* Helper to filter events by source
*/
export function fromSource(source: string, handler: EventHandler): EventHandler {
return (event) => {
if (event.source === source) {
return handler(event);
}
};
}
/**
* Helper to debounce event handlers
*/
export function debounced(handler: EventHandler, delayMs: number): EventHandler {
let timeout: ReturnType<typeof setTimeout> | null = null;
let lastEvent: EventEnvelope | null = null;
return (event) => {
lastEvent = event;
if (timeout) clearTimeout(timeout);
timeout = setTimeout(() => {
if (lastEvent) {
void handler(lastEvent);
lastEvent = null;
}
}, delayMs);
};
}
/**
* Helper to throttle event handlers
*/
export function throttled(handler: EventHandler, intervalMs: number): EventHandler {
let lastCall = 0;
return (event) => {
const now = Date.now();
if (now - lastCall >= intervalMs) {
lastCall = now;
return handler(event);
}
};
}

288
src/events/store.ts Normal file
View File

@ -0,0 +1,288 @@
/**
* Event Store - SQLite Persistence Layer
*
* Persists events for replay, auditing, and debugging.
* Uses node:sqlite (DatabaseSync) for synchronous operations.
*/
import type { DatabaseSync } from "node:sqlite";
import { createRequire } from "node:module";
import * as path from "node:path";
import * as fs from "node:fs";
import type { EventEnvelope, EventStore, PersistedEvent, TopicPattern } from "./types.js";
import { topicMatches } from "./types.js";
// ============================================================================
// SQLite Helpers
// ============================================================================
const require = createRequire(import.meta.url);
function requireNodeSqlite(): typeof import("node:sqlite") {
return require("node:sqlite") as typeof import("node:sqlite");
}
// ============================================================================
// Schema
// ============================================================================
const SCHEMA_VERSION = 1;
const CREATE_EVENTS_TABLE = `
CREATE TABLE IF NOT EXISTS events (
persistence_id TEXT PRIMARY KEY,
seq INTEGER NOT NULL,
topic TEXT NOT NULL,
payload TEXT NOT NULL,
ts INTEGER NOT NULL,
persisted_at INTEGER NOT NULL,
correlation_id TEXT,
source TEXT,
session_key TEXT
);
CREATE INDEX IF NOT EXISTS idx_events_seq ON events(seq);
CREATE INDEX IF NOT EXISTS idx_events_topic ON events(topic);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts);
CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_key) WHERE session_key IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_events_correlation ON events(correlation_id) WHERE correlation_id IS NOT NULL;
`;
const CREATE_META_TABLE = `
CREATE TABLE IF NOT EXISTS event_store_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`;
// ============================================================================
// Implementation
// ============================================================================
export type EventStoreConfig = {
/** Path to SQLite database file */
dbPath: string;
/** Max events to keep (older events pruned, default: 100000) */
maxEvents?: number;
/** Auto-prune interval in ms (default: 3600000 = 1 hour) */
pruneInterval?: number;
/** Logger */
logger?: {
error: (msg: string, ...args: unknown[]) => void;
debug: (msg: string, ...args: unknown[]) => void;
};
};
let persistenceIdCounter = 0;
function generatePersistenceId(): string {
return `evt_${Date.now().toString(36)}_${(++persistenceIdCounter).toString(36)}`;
}
/**
* Create an SQLite-backed event store
*/
export function createEventStore(config: EventStoreConfig): EventStore & {
close: () => void;
vacuum: () => void;
} {
const {
dbPath,
maxEvents = 100_000,
pruneInterval = 3_600_000,
logger = {
error: console.error,
debug: () => {},
},
} = config;
// Ensure directory exists
const dir = path.dirname(dbPath);
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
// Open database
const { DatabaseSync } = requireNodeSqlite();
const db: DatabaseSync = new DatabaseSync(dbPath);
// Initialize schema
db.exec(CREATE_META_TABLE);
db.exec(CREATE_EVENTS_TABLE);
// Check/set schema version
const versionStmt = db.prepare("SELECT value FROM event_store_meta WHERE key = 'schema_version'");
const versionRow = versionStmt.get() as { value: string } | undefined;
if (!versionRow) {
db.prepare("INSERT INTO event_store_meta (key, value) VALUES ('schema_version', ?)").run(
String(SCHEMA_VERSION),
);
}
// Prepared statements
const insertStmt = db.prepare(`
INSERT INTO events (persistence_id, seq, topic, payload, ts, persisted_at, correlation_id, source, session_key)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const readAfterStmt = db.prepare(`
SELECT * FROM events WHERE seq > ? ORDER BY seq ASC LIMIT ?
`);
const readRangeStmt = db.prepare(`
SELECT * FROM events WHERE ts >= ? AND ts <= ? ORDER BY seq ASC LIMIT ?
`);
const latestSeqStmt = db.prepare(`
SELECT MAX(seq) as max_seq FROM events
`);
const countStmt = db.prepare(`
SELECT COUNT(*) as count FROM events
`);
const pruneByCountStmt = db.prepare(`
DELETE FROM events WHERE seq <= (
SELECT seq FROM events ORDER BY seq DESC LIMIT 1 OFFSET ?
)
`);
const pruneByTimeStmt = db.prepare(`
DELETE FROM events WHERE ts < ?
`);
// Auto-prune timer
let pruneTimer: ReturnType<typeof setInterval> | null = null;
if (pruneInterval > 0) {
pruneTimer = setInterval(() => {
try {
const count = (countStmt.get() as { count: number }).count;
if (count > maxEvents) {
const toDelete = count - maxEvents;
pruneByCountStmt.run(maxEvents);
logger.debug(`Pruned ${toDelete} old events`);
}
} catch (err) {
logger.error(`Event store prune failed: ${String(err)}`);
}
}, pruneInterval);
}
// -------------------------------------------------------------------------
// Store Operations
// -------------------------------------------------------------------------
const append = async (event: EventEnvelope): Promise<PersistedEvent> => {
const persistenceId = generatePersistenceId();
const persistedAt = Date.now();
insertStmt.run(
persistenceId,
event.seq,
event.topic,
JSON.stringify(event.payload),
event.ts,
persistedAt,
event.correlationId ?? null,
event.source ?? null,
event.sessionKey ?? null,
);
return {
...event,
persistenceId,
persistedAt,
};
};
const rowToEvent = (row: Record<string, unknown>): PersistedEvent => ({
topic: row.topic as string,
payload: JSON.parse(row.payload as string),
seq: row.seq as number,
ts: row.ts as number,
correlationId: row.correlation_id as string | undefined,
source: row.source as string | undefined,
sessionKey: row.session_key as string | undefined,
persistenceId: row.persistence_id as string,
persistedAt: row.persisted_at as number,
});
const readAfter = async (
seq: number,
options?: { limit?: number; topic?: TopicPattern },
): Promise<PersistedEvent[]> => {
const limit = options?.limit ?? 1000;
const rows = readAfterStmt.all(seq, limit) as Record<string, unknown>[];
let events = rows.map(rowToEvent);
// Filter by topic pattern if specified
if (options?.topic) {
events = events.filter((e) => topicMatches(options.topic!, e.topic));
}
return events;
};
const readRange = async (
from: number,
to: number,
options?: { limit?: number; topic?: TopicPattern },
): Promise<PersistedEvent[]> => {
const limit = options?.limit ?? 1000;
const rows = readRangeStmt.all(from, to, limit) as Record<string, unknown>[];
let events = rows.map(rowToEvent);
// Filter by topic pattern if specified
if (options?.topic) {
events = events.filter((e) => topicMatches(options.topic!, e.topic));
}
return events;
};
const getLatestSeq = async (): Promise<number> => {
const row = latestSeqStmt.get() as { max_seq: number | null };
return row.max_seq ?? 0;
};
const prune = async (olderThan: number): Promise<number> => {
const result = pruneByTimeStmt.run(olderThan);
return Number(result.changes);
};
const close = (): void => {
if (pruneTimer) {
clearInterval(pruneTimer);
pruneTimer = null;
}
db.close();
};
const vacuum = (): void => {
db.exec("VACUUM");
};
return {
append,
readAfter,
readRange,
getLatestSeq,
prune,
close,
vacuum,
};
}
// ============================================================================
// Default Store Location
// ============================================================================
/**
* Get the default event store path
*/
export function getDefaultEventStorePath(): string {
const homeDir = process.env.HOME ?? process.env.USERPROFILE ?? ".";
return path.join(homeDir, ".clawdbot", "events.db");
}

281
src/events/types.ts Normal file
View File

@ -0,0 +1,281 @@
/**
* Event Bus - Core Types
*
* A unified event system for Moltbot that enables:
* - Cross-channel coordination (WhatsApp Discord notifications)
* - Async workflows with checkpoints
* - Plugin communication without tight coupling
* - External webhook integrations
*
* Design principles:
* - Type-safe with discriminated unions
* - Topic-based routing with wildcard support
* - Error isolation (handlers don't crash the bus)
* - Sequence numbering for ordering guarantees
* - Designed for future distribution (Redis/NATS compatible)
*/
// ============================================================================
// Core Event Types
// ============================================================================
/**
* Base event envelope - all events include these fields
*/
export type EventEnvelope<T extends string = string, P = unknown> = {
/** Event topic (e.g., "channel.message.received", "agent.tool.executed") */
topic: T;
/** Event payload - type depends on topic */
payload: P;
/** Monotonically increasing sequence number */
seq: number;
/** Unix timestamp (ms) when event was emitted */
ts: number;
/** Optional correlation ID for tracing related events */
correlationId?: string;
/** Optional source identifier (channel, plugin, agent) */
source?: string;
/** Optional session key for session-scoped events */
sessionKey?: string;
};
/**
* Event input - what callers provide (seq/ts added by bus)
*/
export type EventInput<T extends string = string, P = unknown> = {
topic: T;
payload: P;
correlationId?: string;
source?: string;
sessionKey?: string;
};
// ============================================================================
// Topic Patterns
// ============================================================================
/**
* Topic pattern for subscriptions - supports wildcards:
* - "channel.message.received" - exact match
* - "channel.message.*" - single segment wildcard
* - "channel.#" - multi-segment wildcard (matches channel.anything.here)
* - "*" - matches any single topic segment
* - "#" - matches zero or more topic segments
*/
export type TopicPattern = string;
/**
* Check if a topic matches a pattern
*/
export function topicMatches(pattern: TopicPattern, topic: string): boolean {
// Exact match fast path
if (pattern === topic) return true;
if (pattern === "#") return true;
const patternParts = pattern.split(".");
const topicParts = topic.split(".");
let pi = 0;
let ti = 0;
while (pi < patternParts.length && ti < topicParts.length) {
const pp = patternParts[pi];
if (pp === "#") {
// # at end matches everything remaining
if (pi === patternParts.length - 1) return true;
// # in middle - try matching rest at each position
for (let tryTi = ti; tryTi <= topicParts.length; tryTi++) {
if (topicMatches(patternParts.slice(pi + 1).join("."), topicParts.slice(tryTi).join("."))) {
return true;
}
}
return false;
}
if (pp === "*") {
// * matches exactly one segment
pi++;
ti++;
continue;
}
// Literal match required
if (pp !== topicParts[ti]) return false;
pi++;
ti++;
}
// Pattern exhausted - topic must also be exhausted
// Unless pattern ends with #
if (pi < patternParts.length) {
return patternParts[pi] === "#" && pi === patternParts.length - 1;
}
return ti === topicParts.length;
}
// ============================================================================
// Subscription Types
// ============================================================================
/**
* Event handler function
*/
export type EventHandler<E extends EventEnvelope = EventEnvelope> = (
event: E,
) => void | Promise<void>;
/**
* Subscription options
*/
export type SubscribeOptions = {
/** Only receive events matching this session key */
sessionKey?: string;
/** Only receive events from this source */
source?: string;
/** Priority for ordered execution (higher = earlier, default 0) */
priority?: number;
/** If true, errors in this handler propagate (default: false, errors logged) */
propagateErrors?: boolean;
};
/**
* Active subscription handle
*/
export type Subscription = {
/** Unique subscription ID */
id: string;
/** Pattern this subscription matches */
pattern: TopicPattern;
/** Unsubscribe from this subscription */
unsubscribe: () => void;
};
// ============================================================================
// Event Bus Interface
// ============================================================================
/**
* Core event bus interface - can be implemented with different backends
*/
export type EventBus = {
/**
* Emit an event to all matching subscribers
* Returns after all sync handlers complete, async handlers may still be running
*/
emit: <T extends string, P>(event: EventInput<T, P>) => EventEnvelope<T, P>;
/**
* Emit an event and wait for all handlers (sync + async) to complete
*/
emitAsync: <T extends string, P>(event: EventInput<T, P>) => Promise<EventEnvelope<T, P>>;
/**
* Subscribe to events matching a topic pattern
*/
subscribe: <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
) => Subscription;
/**
* Subscribe to a single event (auto-unsubscribes after first match)
*/
once: <E extends EventEnvelope = EventEnvelope>(
pattern: TopicPattern,
handler: EventHandler<E>,
options?: SubscribeOptions,
) => Subscription;
/**
* Unsubscribe by subscription ID
*/
unsubscribe: (subscriptionId: string) => boolean;
/**
* Unsubscribe all handlers for a pattern
*/
unsubscribeAll: (pattern: TopicPattern) => number;
/**
* Get current sequence number
*/
getSeq: () => number;
/**
* Check if there are any subscribers for a topic
*/
hasSubscribers: (topic: string) => boolean;
/**
* Get count of active subscriptions
*/
subscriptionCount: () => number;
/**
* Shutdown the bus, clearing all subscriptions
*/
shutdown: () => void;
};
// ============================================================================
// Event Persistence Types
// ============================================================================
/**
* Persisted event with metadata
*/
export type PersistedEvent<E extends EventEnvelope = EventEnvelope> = E & {
/** Persistence ID (may differ from seq for distributed systems) */
persistenceId: string;
/** When the event was persisted */
persistedAt: number;
};
/**
* Event store interface for persistence
*/
export type EventStore = {
/** Append an event to the store */
append: (event: EventEnvelope) => Promise<PersistedEvent>;
/** Read events after a sequence number */
readAfter: (
seq: number,
options?: { limit?: number; topic?: TopicPattern },
) => Promise<PersistedEvent[]>;
/** Read events in a time range */
readRange: (
from: number,
to: number,
options?: { limit?: number; topic?: TopicPattern },
) => Promise<PersistedEvent[]>;
/** Get the latest sequence number */
getLatestSeq: () => Promise<number>;
/** Prune events older than a timestamp */
prune: (olderThan: number) => Promise<number>;
};
// ============================================================================
// Bus Configuration
// ============================================================================
export type EventBusConfig = {
/** Optional event store for persistence */
store?: EventStore;
/** Max async handlers to run concurrently per event (default: 10) */
maxConcurrency?: number;
/** Timeout for async handlers in ms (default: 30000) */
handlerTimeout?: number;
/** Logger for errors and debug info */
logger?: {
error: (msg: string, ...args: unknown[]) => void;
warn: (msg: string, ...args: unknown[]) => void;
debug: (msg: string, ...args: unknown[]) => void;
};
};