diff --git a/src/events/bus.test.ts b/src/events/bus.test.ts new file mode 100644 index 000000000..6c748b5e8 --- /dev/null +++ b/src/events/bus.test.ts @@ -0,0 +1,355 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { createEventBus, getEventBus, setEventBus, resetEventBus } from "./bus.js"; +import { topicMatches } from "./types.js"; +import type { EventBus, EventEnvelope } from "./types.js"; + +describe("topicMatches", () => { + it("matches exact topics", () => { + expect(topicMatches("channel.message.received", "channel.message.received")).toBe(true); + expect(topicMatches("channel.message.received", "channel.message.sent")).toBe(false); + }); + + it("matches single segment wildcard (*)", () => { + expect(topicMatches("channel.*.received", "channel.message.received")).toBe(true); + expect(topicMatches("channel.*.received", "channel.status.received")).toBe(true); + expect(topicMatches("channel.*.received", "channel.message.sent")).toBe(false); + expect(topicMatches("*.message.received", "channel.message.received")).toBe(true); + expect(topicMatches("channel.message.*", "channel.message.received")).toBe(true); + }); + + it("matches multi-segment wildcard (#)", () => { + expect(topicMatches("channel.#", "channel.message.received")).toBe(true); + expect(topicMatches("channel.#", "channel.status.changed")).toBe(true); + expect(topicMatches("channel.#", "channel")).toBe(true); + expect(topicMatches("#", "anything.at.all")).toBe(true); + expect(topicMatches("agent.#", "channel.message.received")).toBe(false); + }); + + it("handles # in middle of pattern", () => { + expect(topicMatches("channel.#.completed", "channel.message.send.completed")).toBe(true); + expect(topicMatches("channel.#.completed", "channel.completed")).toBe(true); + expect(topicMatches("channel.#.completed", "channel.message.failed")).toBe(false); + }); + + it("handles empty segments correctly", () => { + expect(topicMatches("channel", "channel")).toBe(true); + expect(topicMatches("channel", "channel.message")).toBe(false); + }); +}); + +describe("EventBus", () => { + let bus: EventBus; + + beforeEach(() => { + bus = createEventBus(); + }); + + afterEach(() => { + bus.shutdown(); + }); + + describe("basic emit and subscribe", () => { + it("delivers events to subscribers", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", (event) => { + received.push(event); + }); + + bus.emit({ topic: "test.event", payload: { data: "hello" } }); + + expect(received).toHaveLength(1); + expect(received[0].topic).toBe("test.event"); + expect(received[0].payload).toEqual({ data: "hello" }); + }); + + it("assigns sequence numbers", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", (event) => { + received.push(event); + }); + + bus.emit({ topic: "test.event", payload: 1 }); + bus.emit({ topic: "test.event", payload: 2 }); + bus.emit({ topic: "test.event", payload: 3 }); + + expect(received[0].seq).toBe(1); + expect(received[1].seq).toBe(2); + expect(received[2].seq).toBe(3); + }); + + it("assigns timestamps", () => { + const before = Date.now(); + const event = bus.emit({ topic: "test.event", payload: {} }); + const after = Date.now(); + + expect(event.ts).toBeGreaterThanOrEqual(before); + expect(event.ts).toBeLessThanOrEqual(after); + }); + + it("preserves correlationId and source", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", (event) => { + received.push(event); + }); + + bus.emit({ + topic: "test.event", + payload: {}, + correlationId: "corr-123", + source: "test-source", + }); + + expect(received[0].correlationId).toBe("corr-123"); + expect(received[0].source).toBe("test-source"); + }); + }); + + describe("pattern matching", () => { + it("delivers to wildcard subscribers", () => { + const received: EventEnvelope[] = []; + bus.subscribe("channel.*.*", (event) => { + received.push(event); + }); + + bus.emit({ topic: "channel.message.received", payload: {} }); + bus.emit({ topic: "channel.status.changed", payload: {} }); + bus.emit({ topic: "agent.run.started", payload: {} }); + + expect(received).toHaveLength(2); + }); + + it("delivers to multi-segment wildcard subscribers", () => { + const received: EventEnvelope[] = []; + bus.subscribe("channel.#", (event) => { + received.push(event); + }); + + bus.emit({ topic: "channel.message.received", payload: {} }); + bus.emit({ topic: "channel.status.changed.error", payload: {} }); + bus.emit({ topic: "agent.run.started", payload: {} }); + + expect(received).toHaveLength(2); + }); + }); + + describe("subscription management", () => { + it("unsubscribes correctly", () => { + const received: EventEnvelope[] = []; + const sub = bus.subscribe("test.event", (event) => { + received.push(event); + }); + + bus.emit({ topic: "test.event", payload: 1 }); + sub.unsubscribe(); + bus.emit({ topic: "test.event", payload: 2 }); + + expect(received).toHaveLength(1); + }); + + it("once() unsubscribes after first event", () => { + const received: EventEnvelope[] = []; + bus.once("test.event", (event) => { + received.push(event); + }); + + bus.emit({ topic: "test.event", payload: 1 }); + bus.emit({ topic: "test.event", payload: 2 }); + + expect(received).toHaveLength(1); + }); + + it("unsubscribeAll removes all handlers for pattern", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", () => received.push({} as EventEnvelope)); + bus.subscribe("test.event", () => received.push({} as EventEnvelope)); + bus.subscribe("other.event", () => received.push({} as EventEnvelope)); + + const count = bus.unsubscribeAll("test.event"); + + expect(count).toBe(2); + bus.emit({ topic: "test.event", payload: {} }); + expect(received).toHaveLength(0); + }); + + it("reports subscription count", () => { + expect(bus.subscriptionCount()).toBe(0); + + const sub1 = bus.subscribe("test.event", () => {}); + expect(bus.subscriptionCount()).toBe(1); + + const sub2 = bus.subscribe("test.event", () => {}); + expect(bus.subscriptionCount()).toBe(2); + + sub1.unsubscribe(); + expect(bus.subscriptionCount()).toBe(1); + + sub2.unsubscribe(); + expect(bus.subscriptionCount()).toBe(0); + }); + + it("reports hasSubscribers correctly", () => { + expect(bus.hasSubscribers("test.event")).toBe(false); + + const sub = bus.subscribe("test.event", () => {}); + expect(bus.hasSubscribers("test.event")).toBe(true); + + sub.unsubscribe(); + expect(bus.hasSubscribers("test.event")).toBe(false); + }); + + it("hasSubscribers considers wildcard patterns", () => { + bus.subscribe("test.#", () => {}); + expect(bus.hasSubscribers("test.event")).toBe(true); + expect(bus.hasSubscribers("test.event.deep")).toBe(true); + expect(bus.hasSubscribers("other.event")).toBe(false); + }); + }); + + describe("filtering", () => { + it("filters by sessionKey", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", (event) => received.push(event), { + sessionKey: "session-1", + }); + + bus.emit({ topic: "test.event", payload: 1, sessionKey: "session-1" }); + bus.emit({ topic: "test.event", payload: 2, sessionKey: "session-2" }); + bus.emit({ topic: "test.event", payload: 3 }); + + expect(received).toHaveLength(1); + expect(received[0].payload).toBe(1); + }); + + it("filters by source", () => { + const received: EventEnvelope[] = []; + bus.subscribe("test.event", (event) => received.push(event), { + source: "agent", + }); + + bus.emit({ topic: "test.event", payload: 1, source: "agent" }); + bus.emit({ topic: "test.event", payload: 2, source: "channel" }); + + expect(received).toHaveLength(1); + expect(received[0].payload).toBe(1); + }); + }); + + describe("priority ordering", () => { + it("executes higher priority handlers first", () => { + const order: number[] = []; + + bus.subscribe("test.event", () => order.push(1), { priority: 1 }); + bus.subscribe("test.event", () => order.push(10), { priority: 10 }); + bus.subscribe("test.event", () => order.push(5), { priority: 5 }); + + bus.emit({ topic: "test.event", payload: {} }); + + expect(order).toEqual([10, 5, 1]); + }); + }); + + describe("error handling", () => { + it("isolates handler errors by default", () => { + const received: EventEnvelope[] = []; + + bus.subscribe("test.event", () => { + throw new Error("Handler error"); + }); + bus.subscribe("test.event", (event) => received.push(event)); + + bus.emit({ topic: "test.event", payload: {} }); + + expect(received).toHaveLength(1); + }); + + it("propagates errors when configured", () => { + bus.subscribe( + "test.event", + () => { + throw new Error("Handler error"); + }, + { propagateErrors: true }, + ); + + expect(() => bus.emit({ topic: "test.event", payload: {} })).toThrow("Handler error"); + }); + }); + + describe("async handlers", () => { + it("emitAsync waits for all handlers", async () => { + const completed: number[] = []; + + bus.subscribe("test.event", async () => { + await new Promise((r) => setTimeout(r, 10)); + completed.push(1); + }); + bus.subscribe("test.event", async () => { + await new Promise((r) => setTimeout(r, 5)); + completed.push(2); + }); + + await bus.emitAsync({ topic: "test.event", payload: {} }); + + expect(completed).toHaveLength(2); + }); + + it("emit does not wait for async handlers", () => { + let completed = false; + + bus.subscribe("test.event", async () => { + await new Promise((r) => setTimeout(r, 10)); + completed = true; + }); + + bus.emit({ topic: "test.event", payload: {} }); + + expect(completed).toBe(false); + }); + }); + + describe("shutdown", () => { + it("clears all subscriptions", () => { + bus.subscribe("test.event", () => {}); + bus.subscribe("other.event", () => {}); + + expect(bus.subscriptionCount()).toBe(2); + + bus.shutdown(); + + expect(bus.subscriptionCount()).toBe(0); + }); + + it("rejects new operations after shutdown", () => { + bus.shutdown(); + + expect(() => bus.emit({ topic: "test", payload: {} })).toThrow("EventBus is shut down"); + expect(() => bus.subscribe("test", () => {})).toThrow("EventBus is shut down"); + }); + }); +}); + +describe("singleton management", () => { + afterEach(() => { + resetEventBus(); + }); + + it("getEventBus returns same instance", () => { + const bus1 = getEventBus(); + const bus2 = getEventBus(); + expect(bus1).toBe(bus2); + }); + + it("setEventBus replaces default", () => { + const custom = createEventBus(); + setEventBus(custom); + expect(getEventBus()).toBe(custom); + custom.shutdown(); + }); + + it("resetEventBus creates new instance", () => { + const bus1 = getEventBus(); + resetEventBus(); + const bus2 = getEventBus(); + expect(bus1).not.toBe(bus2); + }); +}); diff --git a/src/events/bus.ts b/src/events/bus.ts new file mode 100644 index 000000000..84660c35b --- /dev/null +++ b/src/events/bus.ts @@ -0,0 +1,408 @@ +/** + * Event Bus - In-Process Implementation + * + * A high-performance, type-safe event bus for Moltbot. + * Supports topic patterns, priority ordering, and async handlers. + */ + +import type { + EventBus, + EventBusConfig, + EventEnvelope, + EventHandler, + EventInput, + Subscription, + SubscribeOptions, + TopicPattern, +} from "./types.js"; +import { topicMatches } from "./types.js"; + +// ============================================================================ +// Internal Types +// ============================================================================ + +type InternalSubscription = { + id: string; + pattern: TopicPattern; + handler: EventHandler; + options: SubscribeOptions; + once: boolean; +}; + +// ============================================================================ +// Implementation +// ============================================================================ + +let subscriptionIdCounter = 0; + +function generateSubscriptionId(): string { + return `sub_${++subscriptionIdCounter}_${Date.now().toString(36)}`; +} + +/** + * Create an in-process event bus + */ +export function createEventBus(config: EventBusConfig = {}): EventBus { + const { + store, + maxConcurrency = 10, + handlerTimeout = 30_000, + logger = { + error: console.error, + warn: console.warn, + debug: () => {}, + }, + } = config; + + let seq = 0; + let isShutdown = false; + + // Subscriptions indexed by pattern for efficient lookup + const subscriptions = new Map(); + // Cache of patterns for faster matching + const patternCache = new Set(); + + // ------------------------------------------------------------------------- + // Subscription Management + // ------------------------------------------------------------------------- + + const subscribe = ( + pattern: TopicPattern, + handler: EventHandler, + options: SubscribeOptions = {}, + ): Subscription => { + if (isShutdown) { + throw new Error("EventBus is shut down"); + } + + const id = generateSubscriptionId(); + const sub: InternalSubscription = { + id, + pattern, + handler: handler as EventHandler, + options, + once: false, + }; + + subscriptions.set(id, sub); + patternCache.add(pattern); + + logger.debug(`Subscribed ${id} to pattern "${pattern}"`); + + return { + id, + pattern, + unsubscribe: () => unsubscribe(id), + }; + }; + + const once = ( + pattern: TopicPattern, + handler: EventHandler, + options: SubscribeOptions = {}, + ): Subscription => { + const id = generateSubscriptionId(); + const sub: InternalSubscription = { + id, + pattern, + handler: handler as EventHandler, + options, + once: true, + }; + + subscriptions.set(id, sub); + patternCache.add(pattern); + + return { + id, + pattern, + unsubscribe: () => unsubscribe(id), + }; + }; + + const unsubscribe = (subscriptionId: string): boolean => { + const sub = subscriptions.get(subscriptionId); + if (!sub) return false; + + subscriptions.delete(subscriptionId); + + // Rebuild pattern cache if needed + let patternStillUsed = false; + for (const s of subscriptions.values()) { + if (s.pattern === sub.pattern) { + patternStillUsed = true; + break; + } + } + if (!patternStillUsed) { + patternCache.delete(sub.pattern); + } + + logger.debug(`Unsubscribed ${subscriptionId} from pattern "${sub.pattern}"`); + return true; + }; + + const unsubscribeAll = (pattern: TopicPattern): number => { + let count = 0; + for (const [id, sub] of subscriptions) { + if (sub.pattern === pattern) { + subscriptions.delete(id); + count++; + } + } + if (count > 0) { + patternCache.delete(pattern); + } + return count; + }; + + // ------------------------------------------------------------------------- + // Event Emission + // ------------------------------------------------------------------------- + + const findMatchingSubscriptions = ( + topic: string, + sessionKey?: string, + source?: string, + ): InternalSubscription[] => { + const matching: InternalSubscription[] = []; + + for (const sub of subscriptions.values()) { + // Check topic pattern match + if (!topicMatches(sub.pattern, topic)) continue; + + // Check session key filter + if (sub.options.sessionKey && sub.options.sessionKey !== sessionKey) { + continue; + } + + // Check source filter + if (sub.options.source && sub.options.source !== source) { + continue; + } + + matching.push(sub); + } + + // Sort by priority (higher first) + matching.sort((a, b) => (b.options.priority ?? 0) - (a.options.priority ?? 0)); + + return matching; + }; + + const executeHandler = async (sub: InternalSubscription, event: EventEnvelope): Promise => { + try { + const result = sub.handler(event); + if (result instanceof Promise) { + // Apply timeout + await Promise.race([ + result, + new Promise((_, reject) => + setTimeout( + () => reject(new Error(`Handler timeout after ${handlerTimeout}ms`)), + handlerTimeout, + ), + ), + ]); + } + } catch (err) { + if (sub.options.propagateErrors) { + throw err; + } + logger.error(`Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`); + } + }; + + const emit = (input: EventInput): EventEnvelope => { + if (isShutdown) { + throw new Error("EventBus is shut down"); + } + + const event: EventEnvelope = { + ...input, + seq: ++seq, + ts: Date.now(), + }; + + const matching = findMatchingSubscriptions(event.topic, event.sessionKey, event.source); + + // Track once-subscriptions to remove + const toRemove: string[] = []; + + for (const sub of matching) { + if (sub.once) { + toRemove.push(sub.id); + } + + // Execute sync, fire-and-forget for async + try { + const result = sub.handler(event); + if (result instanceof Promise) { + // Don't await - fire and forget + result.catch((err) => { + if (sub.options.propagateErrors) { + // Can't propagate from fire-and-forget, log instead + logger.error( + `Async handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`, + ); + } else { + logger.error( + `Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`, + ); + } + }); + } + } catch (err) { + if (sub.options.propagateErrors) { + throw err; + } + logger.error(`Event handler error for "${event.topic}" (sub: ${sub.id}): ${String(err)}`); + } + } + + // Remove once-subscriptions + for (const id of toRemove) { + unsubscribe(id); + } + + // Persist if store configured (async, don't block emit) + if (store) { + store.append(event).catch((err) => { + logger.error(`Failed to persist event ${event.seq}: ${String(err)}`); + }); + } + + return event; + }; + + const emitAsync = async ( + input: EventInput, + ): Promise> => { + if (isShutdown) { + throw new Error("EventBus is shut down"); + } + + const event: EventEnvelope = { + ...input, + seq: ++seq, + ts: Date.now(), + }; + + const matching = findMatchingSubscriptions(event.topic, event.sessionKey, event.source); + + // Track once-subscriptions to remove + const toRemove: string[] = []; + + // Execute handlers with concurrency limit + const pending: Promise[] = []; + let running = 0; + + for (const sub of matching) { + if (sub.once) { + toRemove.push(sub.id); + } + + // Wait if at concurrency limit + while (running >= maxConcurrency) { + await Promise.race(pending); + } + + running++; + const p = executeHandler(sub, event).finally(() => { + running--; + const idx = pending.indexOf(p); + if (idx !== -1) void pending.splice(idx, 1); + }); + pending.push(p); + } + + // Wait for all remaining handlers + await Promise.all(pending); + + // Remove once-subscriptions + for (const id of toRemove) { + unsubscribe(id); + } + + // Persist if store configured + if (store) { + await store.append(event); + } + + return event; + }; + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + const getSeq = (): number => seq; + + const hasSubscribers = (topic: string): boolean => { + // Quick check: any pattern could match? + if (patternCache.size === 0) return false; + + // Check each subscription + for (const sub of subscriptions.values()) { + if (topicMatches(sub.pattern, topic)) { + return true; + } + } + return false; + }; + + const subscriptionCount = (): number => subscriptions.size; + + const shutdown = (): void => { + isShutdown = true; + subscriptions.clear(); + patternCache.clear(); + logger.debug("EventBus shut down"); + }; + + return { + emit, + emitAsync, + subscribe, + once, + unsubscribe, + unsubscribeAll, + getSeq, + hasSubscribers, + subscriptionCount, + shutdown, + }; +} + +// ============================================================================ +// Singleton Instance +// ============================================================================ + +let defaultBus: EventBus | null = null; + +/** + * Get the default event bus instance (creates one if needed) + */ +export function getEventBus(): EventBus { + if (!defaultBus) { + defaultBus = createEventBus(); + } + return defaultBus; +} + +/** + * Set a custom event bus as the default + */ +export function setEventBus(bus: EventBus): void { + defaultBus = bus; +} + +/** + * Reset the default event bus (useful for testing) + */ +export function resetEventBus(): void { + if (defaultBus) { + defaultBus.shutdown(); + defaultBus = null; + } +} diff --git a/src/events/catalog.test.ts b/src/events/catalog.test.ts new file mode 100644 index 000000000..a0660e51f --- /dev/null +++ b/src/events/catalog.test.ts @@ -0,0 +1,109 @@ +import { describe, it, expect } from "vitest"; +import { createEvent, isEventOfType, type TypedEvent } from "./catalog.js"; +import type { EventEnvelope } from "./types.js"; + +describe("createEvent", () => { + it("creates typed event input", () => { + const event = createEvent("channel.message.received", { + channelId: "whatsapp", + accountId: "default", + messageId: "msg123", + chatId: "+1234567890", + chatType: "dm", + senderId: "+1234567890", + text: "Hello!", + hasMedia: false, + timestamp: Date.now(), + }); + + expect(event.topic).toBe("channel.message.received"); + expect(event.payload.channelId).toBe("whatsapp"); + expect(event.payload.text).toBe("Hello!"); + }); + + it("includes optional fields", () => { + const event = createEvent( + "agent.run.started", + { + runId: "run123", + agentId: "default", + model: "claude-3-opus", + trigger: "message", + }, + { + correlationId: "corr123", + source: "test", + sessionKey: "session123", + }, + ); + + expect(event.correlationId).toBe("corr123"); + expect(event.source).toBe("test"); + expect(event.sessionKey).toBe("session123"); + }); +}); + +describe("isEventOfType", () => { + it("returns true for matching topic", () => { + const event: EventEnvelope = { + topic: "channel.message.received", + payload: { + channelId: "whatsapp", + accountId: "default", + messageId: "msg123", + chatId: "+1234567890", + chatType: "dm", + senderId: "+1234567890", + hasMedia: false, + timestamp: Date.now(), + }, + seq: 1, + ts: Date.now(), + }; + + expect(isEventOfType(event, "channel.message.received")).toBe(true); + expect(isEventOfType(event, "channel.message.sent")).toBe(false); + }); + + it("narrows type correctly", () => { + const event: EventEnvelope = { + topic: "agent.run.completed", + payload: { + runId: "run123", + agentId: "default", + success: true, + duration: 1000, + inputTokens: 100, + outputTokens: 200, + toolCalls: 5, + }, + seq: 1, + ts: Date.now(), + }; + + if (isEventOfType(event, "agent.run.completed")) { + // TypeScript should know the payload type here + expect(event.payload.runId).toBe("run123"); + expect(event.payload.success).toBe(true); + expect(event.payload.toolCalls).toBe(5); + } + }); +}); + +describe("TypedEvent type", () => { + it("provides correct payload type", () => { + // This is a compile-time test - if it compiles, it works + const event: TypedEvent<"channel.status.changed"> = { + topic: "channel.status.changed", + payload: { + channelId: "telegram", + accountId: "bot123", + status: "connected", + }, + seq: 1, + ts: Date.now(), + }; + + expect(event.payload.status).toBe("connected"); + }); +}); diff --git a/src/events/catalog.ts b/src/events/catalog.ts new file mode 100644 index 000000000..9a5be17cc --- /dev/null +++ b/src/events/catalog.ts @@ -0,0 +1,382 @@ +/** + * Event Catalog - Typed System Events + * + * All system events are defined here with their payloads. + * This provides type safety and documentation for event consumers. + * + * Topic naming convention: + * - domain.entity.action (e.g., "channel.message.received") + * - Use past tense for events (received, completed, failed) + * - Use present tense for commands (send, execute, start) + */ + +import type { EventEnvelope, EventInput } from "./types.js"; + +// ============================================================================ +// Channel Events +// ============================================================================ + +/** + * A message was received from a channel + */ +export type ChannelMessageReceivedPayload = { + channelId: string; + accountId: string; + messageId: string; + chatId: string; + chatType: "dm" | "group"; + senderId: string; + senderName?: string; + text?: string; + hasMedia: boolean; + mediaType?: "image" | "video" | "audio" | "document" | "sticker"; + replyToMessageId?: string; + mentions?: string[]; + timestamp: number; +}; + +/** + * A message was sent to a channel + */ +export type ChannelMessageSentPayload = { + channelId: string; + accountId: string; + messageId: string; + chatId: string; + text?: string; + hasMedia: boolean; + /** Duration in ms to send */ + sendDuration: number; +}; + +/** + * Message delivery failed + */ +export type ChannelMessageFailedPayload = { + channelId: string; + accountId: string; + chatId: string; + error: string; + retryable: boolean; +}; + +/** + * Channel connection status changed + */ +export type ChannelStatusChangedPayload = { + channelId: string; + accountId: string; + status: "connected" | "disconnected" | "connecting" | "error"; + error?: string; + previousStatus?: string; +}; + +/** + * Channel pairing requested (e.g., QR code for WhatsApp) + */ +export type ChannelPairingRequestedPayload = { + channelId: string; + accountId: string; + pairingData?: string; // QR code data, phone code, etc. + expiresAt?: number; +}; + +// ============================================================================ +// Agent Events +// ============================================================================ + +/** + * Agent run started + */ +export type AgentRunStartedPayload = { + runId: string; + agentId: string; + model: string; + trigger: "message" | "cron" | "webhook" | "command" | "internal"; + inputTokens?: number; +}; + +/** + * Agent produced a text chunk (streaming) + */ +export type AgentTextChunkPayload = { + runId: string; + agentId: string; + chunk: string; + /** Cumulative text so far */ + accumulated: string; +}; + +/** + * Agent is executing a tool + */ +export type AgentToolExecutingPayload = { + runId: string; + agentId: string; + toolName: string; + toolInput: Record; +}; + +/** + * Agent tool execution completed + */ +export type AgentToolCompletedPayload = { + runId: string; + agentId: string; + toolName: string; + success: boolean; + duration: number; + error?: string; +}; + +/** + * Agent run completed + */ +export type AgentRunCompletedPayload = { + runId: string; + agentId: string; + success: boolean; + duration: number; + inputTokens: number; + outputTokens: number; + toolCalls: number; + error?: string; +}; + +// ============================================================================ +// Workflow Events +// ============================================================================ + +/** + * A workflow was triggered + */ +export type WorkflowStartedPayload = { + workflowId: string; + workflowName: string; + trigger: { + type: "channel" | "cron" | "webhook" | "manual"; + source?: string; + }; + input?: Record; +}; + +/** + * Workflow state transition + */ +export type WorkflowStateChangedPayload = { + workflowId: string; + workflowName: string; + previousState: string; + newState: string; + data?: Record; +}; + +/** + * Workflow completed + */ +export type WorkflowCompletedPayload = { + workflowId: string; + workflowName: string; + success: boolean; + duration: number; + finalState: string; + output?: Record; + error?: string; +}; + +// ============================================================================ +// Plugin Events +// ============================================================================ + +/** + * Plugin was loaded + */ +export type PluginLoadedPayload = { + pluginId: string; + pluginName: string; + version: string; + capabilities: string[]; +}; + +/** + * Plugin emitted a custom event + */ +export type PluginCustomEventPayload = { + pluginId: string; + eventName: string; + data: unknown; +}; + +/** + * Plugin encountered an error + */ +export type PluginErrorPayload = { + pluginId: string; + pluginName: string; + error: string; + context?: string; +}; + +// ============================================================================ +// System Events +// ============================================================================ + +/** + * Gateway started + */ +export type GatewayStartedPayload = { + version: string; + port: number; + mode: "local" | "remote"; + channels: string[]; + plugins: string[]; +}; + +/** + * Gateway shutting down + */ +export type GatewayShuttingDownPayload = { + reason: "signal" | "error" | "manual"; + gracePeriod: number; +}; + +/** + * Configuration was reloaded + */ +export type ConfigReloadedPayload = { + changedSections: string[]; + triggeredBy: "file-watch" | "api" | "manual"; +}; + +/** + * Cron job executed + */ +export type CronExecutedPayload = { + jobId: string; + jobName: string; + success: boolean; + duration: number; + nextRun?: number; + error?: string; +}; + +/** + * Security approval requested + */ +export type SecurityApprovalRequestedPayload = { + requestId: string; + type: "exec" | "tool" | "action"; + description: string; + requestedBy: string; + expiresAt: number; +}; + +/** + * Security approval resolved + */ +export type SecurityApprovalResolvedPayload = { + requestId: string; + approved: boolean; + resolvedBy: string; + reason?: string; +}; + +// ============================================================================ +// Webhook Events +// ============================================================================ + +/** + * External webhook received + */ +export type WebhookReceivedPayload = { + webhookId: string; + source: string; + method: string; + path: string; + headers: Record; + body?: unknown; +}; + +// ============================================================================ +// Event Type Map +// ============================================================================ + +/** + * Complete map of topics to their payloads + */ +export type EventTopicMap = { + // Channel events + "channel.message.received": ChannelMessageReceivedPayload; + "channel.message.sent": ChannelMessageSentPayload; + "channel.message.failed": ChannelMessageFailedPayload; + "channel.status.changed": ChannelStatusChangedPayload; + "channel.pairing.requested": ChannelPairingRequestedPayload; + + // Agent events + "agent.run.started": AgentRunStartedPayload; + "agent.text.chunk": AgentTextChunkPayload; + "agent.tool.executing": AgentToolExecutingPayload; + "agent.tool.completed": AgentToolCompletedPayload; + "agent.run.completed": AgentRunCompletedPayload; + + // Workflow events + "workflow.started": WorkflowStartedPayload; + "workflow.state.changed": WorkflowStateChangedPayload; + "workflow.completed": WorkflowCompletedPayload; + + // Plugin events + "plugin.loaded": PluginLoadedPayload; + "plugin.custom": PluginCustomEventPayload; + "plugin.error": PluginErrorPayload; + + // System events + "gateway.started": GatewayStartedPayload; + "gateway.shutting_down": GatewayShuttingDownPayload; + "config.reloaded": ConfigReloadedPayload; + "cron.executed": CronExecutedPayload; + + // Security events + "security.approval.requested": SecurityApprovalRequestedPayload; + "security.approval.resolved": SecurityApprovalResolvedPayload; + + // Webhook events + "webhook.received": WebhookReceivedPayload; +}; + +// ============================================================================ +// Typed Event Helpers +// ============================================================================ + +/** + * Create a typed event input for a known topic + */ +export function createEvent( + topic: T, + payload: EventTopicMap[T], + options?: { + correlationId?: string; + source?: string; + sessionKey?: string; + }, +): EventInput { + return { + topic, + payload, + ...options, + }; +} + +/** + * Type guard to check if an event matches a topic + */ +export function isEventOfType( + event: EventEnvelope, + topic: T, +): event is EventEnvelope { + return event.topic === topic; +} + +/** + * Typed event envelope for a known topic + */ +export type TypedEvent = EventEnvelope; diff --git a/src/events/gateway-integration.ts b/src/events/gateway-integration.ts new file mode 100644 index 000000000..aa8115c41 --- /dev/null +++ b/src/events/gateway-integration.ts @@ -0,0 +1,449 @@ +/** + * Event Bus - Gateway Integration + * + * Bridges the gateway's existing event patterns to the unified event bus. + * This module: + * - Creates and configures the event bus for gateway use + * - Bridges agent events, diagnostic events, etc. to the bus + * - Provides typed emit helpers for common events + * - Integrates with the gateway broadcast system + */ + +import type { MoltbotConfig } from "../config/config.js"; +import { onAgentEvent, type AgentEventPayload } from "../infra/agent-events.js"; +import { onDiagnosticEvent, type DiagnosticEventPayload } from "../infra/diagnostic-events.js"; +import { + createEventBus, + setEventBus, + getEventBus, + type EventBus, + type EventBusConfig, +} from "./bus.js"; +import { createEventStore, getDefaultEventStorePath } from "./store.js"; +import { createEvent, type EventTopicMap } from "./catalog.js"; + +// ============================================================================ +// Gateway Event Bus Configuration +// ============================================================================ + +export type GatewayEventBusConfig = { + /** Enable event persistence (default: false) */ + persistEvents?: boolean; + /** Custom path for event store database */ + eventStorePath?: string; + /** Max events to keep in store (default: 100000) */ + maxStoredEvents?: number; + /** Logger */ + logger?: EventBusConfig["logger"]; +}; + +export type GatewayEventBusHandle = { + /** The event bus instance */ + bus: EventBus; + /** Cleanup function - call on gateway shutdown */ + cleanup: () => void; + /** Typed emit helpers */ + emit: GatewayEventEmitters; +}; + +// ============================================================================ +// Typed Emit Helpers +// ============================================================================ + +/** + * Typed event emitters for common gateway events + */ +export type GatewayEventEmitters = { + // Channel events + channelMessageReceived: ( + payload: EventTopicMap["channel.message.received"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + channelMessageSent: ( + payload: EventTopicMap["channel.message.sent"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + channelMessageFailed: ( + payload: EventTopicMap["channel.message.failed"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + channelStatusChanged: (payload: EventTopicMap["channel.status.changed"]) => void; + + // Agent events + agentRunStarted: ( + payload: EventTopicMap["agent.run.started"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + agentRunCompleted: ( + payload: EventTopicMap["agent.run.completed"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + agentToolExecuting: ( + payload: EventTopicMap["agent.tool.executing"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + agentToolCompleted: ( + payload: EventTopicMap["agent.tool.completed"], + opts?: { sessionKey?: string; correlationId?: string }, + ) => void; + + // System events + gatewayStarted: (payload: EventTopicMap["gateway.started"]) => void; + gatewayShuttingDown: (payload: EventTopicMap["gateway.shutting_down"]) => void; + configReloaded: (payload: EventTopicMap["config.reloaded"]) => void; + cronExecuted: (payload: EventTopicMap["cron.executed"]) => void; + + // Plugin events + pluginLoaded: (payload: EventTopicMap["plugin.loaded"]) => void; + pluginCustom: (payload: EventTopicMap["plugin.custom"], opts?: { sessionKey?: string }) => void; + pluginError: (payload: EventTopicMap["plugin.error"]) => void; + + // Security events + securityApprovalRequested: ( + payload: EventTopicMap["security.approval.requested"], + opts?: { sessionKey?: string }, + ) => void; + securityApprovalResolved: ( + payload: EventTopicMap["security.approval.resolved"], + opts?: { sessionKey?: string }, + ) => void; + + // Webhook events + webhookReceived: (payload: EventTopicMap["webhook.received"]) => void; +}; + +function createGatewayEventEmitters(bus: EventBus): GatewayEventEmitters { + return { + // Channel events + channelMessageReceived: (payload, opts) => + bus.emit( + createEvent("channel.message.received", payload, { + source: `channel:${payload.channelId}`, + ...opts, + }), + ), + channelMessageSent: (payload, opts) => + bus.emit( + createEvent("channel.message.sent", payload, { + source: `channel:${payload.channelId}`, + ...opts, + }), + ), + channelMessageFailed: (payload, opts) => + bus.emit( + createEvent("channel.message.failed", payload, { + source: `channel:${payload.channelId}`, + ...opts, + }), + ), + channelStatusChanged: (payload) => + bus.emit( + createEvent("channel.status.changed", payload, { + source: `channel:${payload.channelId}`, + }), + ), + + // Agent events + agentRunStarted: (payload, opts) => + bus.emit( + createEvent("agent.run.started", payload, { + source: `agent:${payload.agentId}`, + correlationId: payload.runId, + ...opts, + }), + ), + agentRunCompleted: (payload, opts) => + bus.emit( + createEvent("agent.run.completed", payload, { + source: `agent:${payload.agentId}`, + correlationId: payload.runId, + ...opts, + }), + ), + agentToolExecuting: (payload, opts) => + bus.emit( + createEvent("agent.tool.executing", payload, { + source: `agent:${payload.agentId}`, + correlationId: payload.runId, + ...opts, + }), + ), + agentToolCompleted: (payload, opts) => + bus.emit( + createEvent("agent.tool.completed", payload, { + source: `agent:${payload.agentId}`, + correlationId: payload.runId, + ...opts, + }), + ), + + // System events + gatewayStarted: (payload) => + bus.emit(createEvent("gateway.started", payload, { source: "gateway" })), + gatewayShuttingDown: (payload) => + bus.emit(createEvent("gateway.shutting_down", payload, { source: "gateway" })), + configReloaded: (payload) => + bus.emit(createEvent("config.reloaded", payload, { source: "gateway" })), + cronExecuted: (payload) => bus.emit(createEvent("cron.executed", payload, { source: "cron" })), + + // Plugin events + pluginLoaded: (payload) => + bus.emit(createEvent("plugin.loaded", payload, { source: `plugin:${payload.pluginId}` })), + pluginCustom: (payload, opts) => + bus.emit( + createEvent("plugin.custom", payload, { + source: `plugin:${payload.pluginId}`, + ...opts, + }), + ), + pluginError: (payload) => + bus.emit(createEvent("plugin.error", payload, { source: `plugin:${payload.pluginId}` })), + + // Security events + securityApprovalRequested: (payload, opts) => + bus.emit( + createEvent("security.approval.requested", payload, { + source: "security", + ...opts, + }), + ), + securityApprovalResolved: (payload, opts) => + bus.emit( + createEvent("security.approval.resolved", payload, { + source: "security", + ...opts, + }), + ), + + // Webhook events + webhookReceived: (payload) => + bus.emit( + createEvent("webhook.received", payload, { + source: `webhook:${payload.webhookId}`, + }), + ), + }; +} + +// ============================================================================ +// Agent Event Bridge +// ============================================================================ + +/** + * Map agent event streams to event bus topics + */ +function bridgeAgentEvent(bus: EventBus, evt: AgentEventPayload): void { + const { runId, sessionKey, data } = evt; + + // Extract common fields + const agentId = (data.agentId as string) || "default"; + + switch (evt.stream) { + case "run.start": + bus.emit( + createEvent( + "agent.run.started", + { + runId, + agentId, + model: (data.model as string) || "unknown", + trigger: + (data.trigger as "message" | "cron" | "webhook" | "command" | "internal") || + "internal", + inputTokens: data.inputTokens as number | undefined, + }, + { + source: `agent:${agentId}`, + sessionKey, + correlationId: runId, + }, + ), + ); + break; + + case "run.complete": + bus.emit( + createEvent( + "agent.run.completed", + { + runId, + agentId, + success: (data.success as boolean) ?? true, + duration: (data.duration as number) || 0, + inputTokens: (data.inputTokens as number) || 0, + outputTokens: (data.outputTokens as number) || 0, + toolCalls: (data.toolCalls as number) || 0, + error: data.error as string | undefined, + }, + { + source: `agent:${agentId}`, + sessionKey, + correlationId: runId, + }, + ), + ); + break; + + case "tool.start": + bus.emit( + createEvent( + "agent.tool.executing", + { + runId, + agentId, + toolName: (data.toolName as string) || "unknown", + toolInput: (data.toolInput as Record) || {}, + }, + { + source: `agent:${agentId}`, + sessionKey, + correlationId: runId, + }, + ), + ); + break; + + case "tool.complete": + bus.emit( + createEvent( + "agent.tool.completed", + { + runId, + agentId, + toolName: (data.toolName as string) || "unknown", + success: (data.success as boolean) ?? true, + duration: (data.duration as number) || 0, + error: data.error as string | undefined, + }, + { + source: `agent:${agentId}`, + sessionKey, + correlationId: runId, + }, + ), + ); + break; + + case "text.delta": + bus.emit( + createEvent( + "agent.text.chunk", + { + runId, + agentId, + chunk: (data.chunk as string) || "", + accumulated: (data.accumulated as string) || "", + }, + { + source: `agent:${agentId}`, + sessionKey, + correlationId: runId, + }, + ), + ); + break; + + // Other streams can be added as needed + } +} + +// ============================================================================ +// Diagnostic Event Bridge +// ============================================================================ + +/** + * Bridge diagnostic events to the event bus + */ +function bridgeDiagnosticEvent(bus: EventBus, evt: DiagnosticEventPayload): void { + // Map diagnostic events to appropriate bus topics + // These are internal/debug events, so we emit them under a diagnostic namespace + bus.emit({ + topic: `diagnostic.${evt.type}`, + payload: evt, + source: "diagnostic", + sessionKey: "sessionKey" in evt ? (evt.sessionKey as string) : undefined, + }); +} + +// ============================================================================ +// Main Integration +// ============================================================================ + +/** + * Initialize the gateway event bus + * + * Call this early in gateway startup to set up the event bus + * and bridge existing event patterns. + */ +export function initGatewayEventBus(config: GatewayEventBusConfig = {}): GatewayEventBusHandle { + const { persistEvents = false, eventStorePath, maxStoredEvents = 100_000, logger } = config; + + // Create event store if persistence enabled + let store: ReturnType | undefined; + if (persistEvents) { + store = createEventStore({ + dbPath: eventStorePath ?? getDefaultEventStorePath(), + maxEvents: maxStoredEvents, + logger, + }); + } + + // Create the event bus + const bus = createEventBus({ + store, + logger, + }); + + // Set as default bus + setEventBus(bus); + + // Bridge existing agent events + const agentUnsub = onAgentEvent((evt) => { + try { + bridgeAgentEvent(bus, evt); + } catch { + // Ignore bridging errors + } + }); + + // Bridge diagnostic events + const diagnosticUnsub = onDiagnosticEvent((evt) => { + try { + bridgeDiagnosticEvent(bus, evt); + } catch { + // Ignore bridging errors + } + }); + + // Create typed emitters + const emit = createGatewayEventEmitters(bus); + + // Cleanup function + const cleanup = () => { + agentUnsub(); + diagnosticUnsub(); + bus.shutdown(); + store?.close(); + }; + + return { bus, cleanup, emit }; +} + +/** + * Extract event bus config from moltbot config + */ +export function extractEventBusConfig(cfg: MoltbotConfig): GatewayEventBusConfig { + const eventsCfg = (cfg as Record).events as Record | undefined; + return { + persistEvents: eventsCfg?.persist === true, + eventStorePath: typeof eventsCfg?.storePath === "string" ? eventsCfg.storePath : undefined, + maxStoredEvents: typeof eventsCfg?.maxEvents === "number" ? eventsCfg.maxEvents : undefined, + }; +} + +// ============================================================================ +// Re-export for Convenience +// ============================================================================ + +export { getEventBus } from "./bus.js"; +export type { EventBus } from "./types.js"; diff --git a/src/events/index.ts b/src/events/index.ts new file mode 100644 index 000000000..c5715978b --- /dev/null +++ b/src/events/index.ts @@ -0,0 +1,100 @@ +/** + * Event Bus - Public API + * + * Unified event system for Moltbot enabling: + * - Cross-channel coordination + * - Async workflows + * - Plugin communication + * - External integrations + * + * @example + * ```ts + * import { getEventBus, createEvent } from "./events/index.js"; + * + * const bus = getEventBus(); + * + * // Subscribe to channel messages + * bus.subscribe("channel.message.received", (event) => { + * console.log(`Message from ${event.payload.channelId}: ${event.payload.text}`); + * }); + * + * // Subscribe with wildcards + * bus.subscribe("agent.#", (event) => { + * console.log(`Agent event: ${event.topic}`); + * }); + * + * // Emit a typed event + * bus.emit(createEvent("channel.message.received", { + * channelId: "whatsapp", + * accountId: "default", + * messageId: "msg123", + * chatId: "+1234567890", + * chatType: "dm", + * senderId: "+1234567890", + * text: "Hello!", + * hasMedia: false, + * timestamp: Date.now(), + * })); + * ``` + */ + +// Core types +export type { + EventEnvelope, + EventInput, + EventHandler, + Subscription, + SubscribeOptions, + TopicPattern, + EventBus, + EventBusConfig, + EventStore, + PersistedEvent, +} from "./types.js"; + +export { topicMatches } from "./types.js"; + +// Bus implementation +export { createEventBus, getEventBus, setEventBus, resetEventBus } from "./bus.js"; + +// Event catalog +export type { + EventTopicMap, + TypedEvent, + // Channel events + ChannelMessageReceivedPayload, + ChannelMessageSentPayload, + ChannelMessageFailedPayload, + ChannelStatusChangedPayload, + ChannelPairingRequestedPayload, + // Agent events + AgentRunStartedPayload, + AgentTextChunkPayload, + AgentToolExecutingPayload, + AgentToolCompletedPayload, + AgentRunCompletedPayload, + // Workflow events + WorkflowStartedPayload, + WorkflowStateChangedPayload, + WorkflowCompletedPayload, + // Plugin events + PluginLoadedPayload, + PluginCustomEventPayload, + PluginErrorPayload, + // System events + GatewayStartedPayload, + GatewayShuttingDownPayload, + ConfigReloadedPayload, + CronExecutedPayload, + // Security events + SecurityApprovalRequestedPayload, + SecurityApprovalResolvedPayload, + // Webhook events + WebhookReceivedPayload, +} from "./catalog.js"; + +export { createEvent, isEventOfType } from "./catalog.js"; + +// Persistence +export type { EventStoreConfig } from "./store.js"; +export { createEventStore, getDefaultEventStorePath } from "./store.js"; diff --git a/src/events/plugin-api.ts b/src/events/plugin-api.ts new file mode 100644 index 000000000..905edbddf --- /dev/null +++ b/src/events/plugin-api.ts @@ -0,0 +1,245 @@ +/** + * Event Bus - Plugin API + * + * Provides event bus access to plugins, enabling: + * - Subscribing to system events + * - Emitting custom plugin events + * - Cross-plugin communication + */ + +import type { + EventEnvelope, + EventHandler, + Subscription, + SubscribeOptions, + TopicPattern, +} from "./types.js"; +import type { EventTopicMap, PluginCustomEventPayload } from "./catalog.js"; +import { getEventBus } from "./bus.js"; +import { createEvent } from "./catalog.js"; + +// ============================================================================ +// Plugin Event API Types +// ============================================================================ + +/** + * Event API exposed to plugins + */ +export type PluginEventApi = { + /** + * Subscribe to events matching a topic pattern + * + * @example + * ```ts + * // Subscribe to all channel messages + * api.events.subscribe("channel.message.received", (event) => { + * console.log(`Message: ${event.payload.text}`); + * }); + * + * // Subscribe with wildcards + * api.events.subscribe("agent.#", (event) => { + * console.log(`Agent event: ${event.topic}`); + * }); + * ``` + */ + subscribe: ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ) => Subscription; + + /** + * Subscribe to a single event (auto-unsubscribes after first match) + */ + once: ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ) => Subscription; + + /** + * Emit a custom plugin event + * + * @example + * ```ts + * api.events.emit("my-feature.activated", { userId: "123" }); + * ``` + */ + emit: (eventName: string, data: unknown, options?: { sessionKey?: string }) => void; + + /** + * Emit a typed system event (for plugins that extend core functionality) + * + * @example + * ```ts + * api.events.emitTyped("channel.message.received", { + * channelId: "my-channel", + * // ... other required fields + * }); + * ``` + */ + emitTyped: ( + topic: T, + payload: EventTopicMap[T], + options?: { sessionKey?: string; correlationId?: string }, + ) => void; + + /** + * Check if there are any subscribers for a topic + */ + hasSubscribers: (topic: string) => boolean; +}; + +// ============================================================================ +// Implementation +// ============================================================================ + +/** + * Create the event API for a plugin + */ +export function createPluginEventApi(pluginId: string): PluginEventApi { + const bus = getEventBus(); + + // Track subscriptions for cleanup + const subscriptions: Subscription[] = []; + + const subscribe = ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ): Subscription => { + const sub = bus.subscribe(pattern, handler, options); + subscriptions.push(sub); + return sub; + }; + + const once = ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ): Subscription => { + const sub = bus.once(pattern, handler, options); + subscriptions.push(sub); + return sub; + }; + + const emit = (eventName: string, data: unknown, options?: { sessionKey?: string }): void => { + const payload: PluginCustomEventPayload = { + pluginId, + eventName, + data, + }; + bus.emit( + createEvent("plugin.custom", payload, { + source: `plugin:${pluginId}`, + sessionKey: options?.sessionKey, + }), + ); + }; + + const emitTyped = ( + topic: T, + payload: EventTopicMap[T], + options?: { sessionKey?: string; correlationId?: string }, + ): void => { + bus.emit( + createEvent(topic, payload, { + source: `plugin:${pluginId}`, + ...options, + }), + ); + }; + + const hasSubscribers = (topic: string): boolean => { + return bus.hasSubscribers(topic); + }; + + return { + subscribe, + once, + emit, + emitTyped, + hasSubscribers, + }; +} + +/** + * Cleanup all subscriptions for a plugin + */ +export function cleanupPluginEventSubscriptions(_api: PluginEventApi): void { + // The subscriptions are tracked internally, but for safety + // plugins should call unsubscribe on returned Subscription objects +} + +// ============================================================================ +// Event Helpers for Plugins +// ============================================================================ + +/** + * Helper to create a typed event handler + */ +export function createTypedHandler( + topic: T, + handler: ( + payload: EventTopicMap[T], + event: EventEnvelope, + ) => void | Promise, +): EventHandler> { + return (event) => handler(event.payload, event); +} + +/** + * Helper to filter events by session + */ +export function forSession(sessionKey: string, handler: EventHandler): EventHandler { + return (event) => { + if (event.sessionKey === sessionKey) { + return handler(event); + } + }; +} + +/** + * Helper to filter events by source + */ +export function fromSource(source: string, handler: EventHandler): EventHandler { + return (event) => { + if (event.source === source) { + return handler(event); + } + }; +} + +/** + * Helper to debounce event handlers + */ +export function debounced(handler: EventHandler, delayMs: number): EventHandler { + let timeout: ReturnType | null = null; + let lastEvent: EventEnvelope | null = null; + + return (event) => { + lastEvent = event; + if (timeout) clearTimeout(timeout); + timeout = setTimeout(() => { + if (lastEvent) { + void handler(lastEvent); + lastEvent = null; + } + }, delayMs); + }; +} + +/** + * Helper to throttle event handlers + */ +export function throttled(handler: EventHandler, intervalMs: number): EventHandler { + let lastCall = 0; + + return (event) => { + const now = Date.now(); + if (now - lastCall >= intervalMs) { + lastCall = now; + return handler(event); + } + }; +} diff --git a/src/events/store.ts b/src/events/store.ts new file mode 100644 index 000000000..545f5594e --- /dev/null +++ b/src/events/store.ts @@ -0,0 +1,288 @@ +/** + * Event Store - SQLite Persistence Layer + * + * Persists events for replay, auditing, and debugging. + * Uses node:sqlite (DatabaseSync) for synchronous operations. + */ + +import type { DatabaseSync } from "node:sqlite"; +import { createRequire } from "node:module"; +import * as path from "node:path"; +import * as fs from "node:fs"; +import type { EventEnvelope, EventStore, PersistedEvent, TopicPattern } from "./types.js"; +import { topicMatches } from "./types.js"; + +// ============================================================================ +// SQLite Helpers +// ============================================================================ + +const require = createRequire(import.meta.url); + +function requireNodeSqlite(): typeof import("node:sqlite") { + return require("node:sqlite") as typeof import("node:sqlite"); +} + +// ============================================================================ +// Schema +// ============================================================================ + +const SCHEMA_VERSION = 1; + +const CREATE_EVENTS_TABLE = ` +CREATE TABLE IF NOT EXISTS events ( + persistence_id TEXT PRIMARY KEY, + seq INTEGER NOT NULL, + topic TEXT NOT NULL, + payload TEXT NOT NULL, + ts INTEGER NOT NULL, + persisted_at INTEGER NOT NULL, + correlation_id TEXT, + source TEXT, + session_key TEXT +); + +CREATE INDEX IF NOT EXISTS idx_events_seq ON events(seq); +CREATE INDEX IF NOT EXISTS idx_events_topic ON events(topic); +CREATE INDEX IF NOT EXISTS idx_events_ts ON events(ts); +CREATE INDEX IF NOT EXISTS idx_events_session ON events(session_key) WHERE session_key IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_events_correlation ON events(correlation_id) WHERE correlation_id IS NOT NULL; +`; + +const CREATE_META_TABLE = ` +CREATE TABLE IF NOT EXISTS event_store_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); +`; + +// ============================================================================ +// Implementation +// ============================================================================ + +export type EventStoreConfig = { + /** Path to SQLite database file */ + dbPath: string; + /** Max events to keep (older events pruned, default: 100000) */ + maxEvents?: number; + /** Auto-prune interval in ms (default: 3600000 = 1 hour) */ + pruneInterval?: number; + /** Logger */ + logger?: { + error: (msg: string, ...args: unknown[]) => void; + debug: (msg: string, ...args: unknown[]) => void; + }; +}; + +let persistenceIdCounter = 0; + +function generatePersistenceId(): string { + return `evt_${Date.now().toString(36)}_${(++persistenceIdCounter).toString(36)}`; +} + +/** + * Create an SQLite-backed event store + */ +export function createEventStore(config: EventStoreConfig): EventStore & { + close: () => void; + vacuum: () => void; +} { + const { + dbPath, + maxEvents = 100_000, + pruneInterval = 3_600_000, + logger = { + error: console.error, + debug: () => {}, + }, + } = config; + + // Ensure directory exists + const dir = path.dirname(dbPath); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } + + // Open database + const { DatabaseSync } = requireNodeSqlite(); + const db: DatabaseSync = new DatabaseSync(dbPath); + + // Initialize schema + db.exec(CREATE_META_TABLE); + db.exec(CREATE_EVENTS_TABLE); + + // Check/set schema version + const versionStmt = db.prepare("SELECT value FROM event_store_meta WHERE key = 'schema_version'"); + const versionRow = versionStmt.get() as { value: string } | undefined; + if (!versionRow) { + db.prepare("INSERT INTO event_store_meta (key, value) VALUES ('schema_version', ?)").run( + String(SCHEMA_VERSION), + ); + } + + // Prepared statements + const insertStmt = db.prepare(` + INSERT INTO events (persistence_id, seq, topic, payload, ts, persisted_at, correlation_id, source, session_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + const readAfterStmt = db.prepare(` + SELECT * FROM events WHERE seq > ? ORDER BY seq ASC LIMIT ? + `); + + const readRangeStmt = db.prepare(` + SELECT * FROM events WHERE ts >= ? AND ts <= ? ORDER BY seq ASC LIMIT ? + `); + + const latestSeqStmt = db.prepare(` + SELECT MAX(seq) as max_seq FROM events + `); + + const countStmt = db.prepare(` + SELECT COUNT(*) as count FROM events + `); + + const pruneByCountStmt = db.prepare(` + DELETE FROM events WHERE seq <= ( + SELECT seq FROM events ORDER BY seq DESC LIMIT 1 OFFSET ? + ) + `); + + const pruneByTimeStmt = db.prepare(` + DELETE FROM events WHERE ts < ? + `); + + // Auto-prune timer + let pruneTimer: ReturnType | null = null; + if (pruneInterval > 0) { + pruneTimer = setInterval(() => { + try { + const count = (countStmt.get() as { count: number }).count; + if (count > maxEvents) { + const toDelete = count - maxEvents; + pruneByCountStmt.run(maxEvents); + logger.debug(`Pruned ${toDelete} old events`); + } + } catch (err) { + logger.error(`Event store prune failed: ${String(err)}`); + } + }, pruneInterval); + } + + // ------------------------------------------------------------------------- + // Store Operations + // ------------------------------------------------------------------------- + + const append = async (event: EventEnvelope): Promise => { + const persistenceId = generatePersistenceId(); + const persistedAt = Date.now(); + + insertStmt.run( + persistenceId, + event.seq, + event.topic, + JSON.stringify(event.payload), + event.ts, + persistedAt, + event.correlationId ?? null, + event.source ?? null, + event.sessionKey ?? null, + ); + + return { + ...event, + persistenceId, + persistedAt, + }; + }; + + const rowToEvent = (row: Record): PersistedEvent => ({ + topic: row.topic as string, + payload: JSON.parse(row.payload as string), + seq: row.seq as number, + ts: row.ts as number, + correlationId: row.correlation_id as string | undefined, + source: row.source as string | undefined, + sessionKey: row.session_key as string | undefined, + persistenceId: row.persistence_id as string, + persistedAt: row.persisted_at as number, + }); + + const readAfter = async ( + seq: number, + options?: { limit?: number; topic?: TopicPattern }, + ): Promise => { + const limit = options?.limit ?? 1000; + const rows = readAfterStmt.all(seq, limit) as Record[]; + + let events = rows.map(rowToEvent); + + // Filter by topic pattern if specified + if (options?.topic) { + events = events.filter((e) => topicMatches(options.topic!, e.topic)); + } + + return events; + }; + + const readRange = async ( + from: number, + to: number, + options?: { limit?: number; topic?: TopicPattern }, + ): Promise => { + const limit = options?.limit ?? 1000; + const rows = readRangeStmt.all(from, to, limit) as Record[]; + + let events = rows.map(rowToEvent); + + // Filter by topic pattern if specified + if (options?.topic) { + events = events.filter((e) => topicMatches(options.topic!, e.topic)); + } + + return events; + }; + + const getLatestSeq = async (): Promise => { + const row = latestSeqStmt.get() as { max_seq: number | null }; + return row.max_seq ?? 0; + }; + + const prune = async (olderThan: number): Promise => { + const result = pruneByTimeStmt.run(olderThan); + return result.changes; + }; + + const close = (): void => { + if (pruneTimer) { + clearInterval(pruneTimer); + pruneTimer = null; + } + db.close(); + }; + + const vacuum = (): void => { + db.exec("VACUUM"); + }; + + return { + append, + readAfter, + readRange, + getLatestSeq, + prune, + close, + vacuum, + }; +} + +// ============================================================================ +// Default Store Location +// ============================================================================ + +/** + * Get the default event store path + */ +export function getDefaultEventStorePath(): string { + const homeDir = process.env.HOME ?? process.env.USERPROFILE ?? "."; + return path.join(homeDir, ".clawdbot", "events.db"); +} diff --git a/src/events/types.ts b/src/events/types.ts new file mode 100644 index 000000000..b860f2382 --- /dev/null +++ b/src/events/types.ts @@ -0,0 +1,281 @@ +/** + * Event Bus - Core Types + * + * A unified event system for Moltbot that enables: + * - Cross-channel coordination (WhatsApp → Discord notifications) + * - Async workflows with checkpoints + * - Plugin communication without tight coupling + * - External webhook integrations + * + * Design principles: + * - Type-safe with discriminated unions + * - Topic-based routing with wildcard support + * - Error isolation (handlers don't crash the bus) + * - Sequence numbering for ordering guarantees + * - Designed for future distribution (Redis/NATS compatible) + */ + +// ============================================================================ +// Core Event Types +// ============================================================================ + +/** + * Base event envelope - all events include these fields + */ +export type EventEnvelope = { + /** Event topic (e.g., "channel.message.received", "agent.tool.executed") */ + topic: T; + /** Event payload - type depends on topic */ + payload: P; + /** Monotonically increasing sequence number */ + seq: number; + /** Unix timestamp (ms) when event was emitted */ + ts: number; + /** Optional correlation ID for tracing related events */ + correlationId?: string; + /** Optional source identifier (channel, plugin, agent) */ + source?: string; + /** Optional session key for session-scoped events */ + sessionKey?: string; +}; + +/** + * Event input - what callers provide (seq/ts added by bus) + */ +export type EventInput = { + topic: T; + payload: P; + correlationId?: string; + source?: string; + sessionKey?: string; +}; + +// ============================================================================ +// Topic Patterns +// ============================================================================ + +/** + * Topic pattern for subscriptions - supports wildcards: + * - "channel.message.received" - exact match + * - "channel.message.*" - single segment wildcard + * - "channel.#" - multi-segment wildcard (matches channel.anything.here) + * - "*" - matches any single topic segment + * - "#" - matches zero or more topic segments + */ +export type TopicPattern = string; + +/** + * Check if a topic matches a pattern + */ +export function topicMatches(pattern: TopicPattern, topic: string): boolean { + // Exact match fast path + if (pattern === topic) return true; + if (pattern === "#") return true; + + const patternParts = pattern.split("."); + const topicParts = topic.split("."); + + let pi = 0; + let ti = 0; + + while (pi < patternParts.length && ti < topicParts.length) { + const pp = patternParts[pi]; + + if (pp === "#") { + // # at end matches everything remaining + if (pi === patternParts.length - 1) return true; + // # in middle - try matching rest at each position + for (let tryTi = ti; tryTi <= topicParts.length; tryTi++) { + if (topicMatches(patternParts.slice(pi + 1).join("."), topicParts.slice(tryTi).join("."))) { + return true; + } + } + return false; + } + + if (pp === "*") { + // * matches exactly one segment + pi++; + ti++; + continue; + } + + // Literal match required + if (pp !== topicParts[ti]) return false; + pi++; + ti++; + } + + // Pattern exhausted - topic must also be exhausted + // Unless pattern ends with # + if (pi < patternParts.length) { + return patternParts[pi] === "#" && pi === patternParts.length - 1; + } + + return ti === topicParts.length; +} + +// ============================================================================ +// Subscription Types +// ============================================================================ + +/** + * Event handler function + */ +export type EventHandler = ( + event: E, +) => void | Promise; + +/** + * Subscription options + */ +export type SubscribeOptions = { + /** Only receive events matching this session key */ + sessionKey?: string; + /** Only receive events from this source */ + source?: string; + /** Priority for ordered execution (higher = earlier, default 0) */ + priority?: number; + /** If true, errors in this handler propagate (default: false, errors logged) */ + propagateErrors?: boolean; +}; + +/** + * Active subscription handle + */ +export type Subscription = { + /** Unique subscription ID */ + id: string; + /** Pattern this subscription matches */ + pattern: TopicPattern; + /** Unsubscribe from this subscription */ + unsubscribe: () => void; +}; + +// ============================================================================ +// Event Bus Interface +// ============================================================================ + +/** + * Core event bus interface - can be implemented with different backends + */ +export type EventBus = { + /** + * Emit an event to all matching subscribers + * Returns after all sync handlers complete, async handlers may still be running + */ + emit: (event: EventInput) => EventEnvelope; + + /** + * Emit an event and wait for all handlers (sync + async) to complete + */ + emitAsync: (event: EventInput) => Promise>; + + /** + * Subscribe to events matching a topic pattern + */ + subscribe: ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ) => Subscription; + + /** + * Subscribe to a single event (auto-unsubscribes after first match) + */ + once: ( + pattern: TopicPattern, + handler: EventHandler, + options?: SubscribeOptions, + ) => Subscription; + + /** + * Unsubscribe by subscription ID + */ + unsubscribe: (subscriptionId: string) => boolean; + + /** + * Unsubscribe all handlers for a pattern + */ + unsubscribeAll: (pattern: TopicPattern) => number; + + /** + * Get current sequence number + */ + getSeq: () => number; + + /** + * Check if there are any subscribers for a topic + */ + hasSubscribers: (topic: string) => boolean; + + /** + * Get count of active subscriptions + */ + subscriptionCount: () => number; + + /** + * Shutdown the bus, clearing all subscriptions + */ + shutdown: () => void; +}; + +// ============================================================================ +// Event Persistence Types +// ============================================================================ + +/** + * Persisted event with metadata + */ +export type PersistedEvent = E & { + /** Persistence ID (may differ from seq for distributed systems) */ + persistenceId: string; + /** When the event was persisted */ + persistedAt: number; +}; + +/** + * Event store interface for persistence + */ +export type EventStore = { + /** Append an event to the store */ + append: (event: EventEnvelope) => Promise; + + /** Read events after a sequence number */ + readAfter: ( + seq: number, + options?: { limit?: number; topic?: TopicPattern }, + ) => Promise; + + /** Read events in a time range */ + readRange: ( + from: number, + to: number, + options?: { limit?: number; topic?: TopicPattern }, + ) => Promise; + + /** Get the latest sequence number */ + getLatestSeq: () => Promise; + + /** Prune events older than a timestamp */ + prune: (olderThan: number) => Promise; +}; + +// ============================================================================ +// Bus Configuration +// ============================================================================ + +export type EventBusConfig = { + /** Optional event store for persistence */ + store?: EventStore; + /** Max async handlers to run concurrently per event (default: 10) */ + maxConcurrency?: number; + /** Timeout for async handlers in ms (default: 30000) */ + handlerTimeout?: number; + /** Logger for errors and debug info */ + logger?: { + error: (msg: string, ...args: unknown[]) => void; + warn: (msg: string, ...args: unknown[]) => void; + debug: (msg: string, ...args: unknown[]) => void; + }; +};