diff --git a/extensions/nostr/src/channel.ts b/extensions/nostr/src/channel.ts index ac006ec57..092583f16 100644 --- a/extensions/nostr/src/channel.ts +++ b/extensions/nostr/src/channel.ts @@ -1,7 +1,10 @@ import { buildChannelConfigSchema, + createReplyPrefixContext, + createTypingCallbacks, DEFAULT_ACCOUNT_ID, formatPairingApproveHint, + logTypingFailure, type ChannelPlugin, } from "clawdbot/plugin-sdk"; @@ -218,19 +221,153 @@ export const nostrPlugin: ChannelPlugin = { accountId: account.accountId, privateKey: account.privateKey, relays: account.relays, - onMessage: async (senderPubkey, text, reply) => { + onMessage: async (senderPubkey, text, reply, eventId) => { ctx.log?.debug(`[${account.accountId}] DM from ${senderPubkey}: ${text.slice(0, 50)}...`); - // Forward to moltbot's message pipeline - await runtime.channel.reply.handleInboundMessage({ + const cfg = runtime.config.loadConfig(); + const route = runtime.channel.routing.resolveAgentRoute({ + cfg, channel: "nostr", accountId: account.accountId, - senderId: senderPubkey, - chatType: "direct", - chatId: senderPubkey, // For DMs, chatId is the sender's pubkey - text, - reply: async (responseText: string) => { - await reply(responseText); + peer: { kind: "dm", id: senderPubkey }, + }); + + ctx.log?.debug(`[${account.accountId}] Route resolved: sessionKey=${route.sessionKey}, agentId=${route.agentId}`); + + const storePath = runtime.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const envelopeOptions = runtime.channel.reply.resolveEnvelopeFormatOptions(cfg); + const previousTimestamp = runtime.channel.session.readSessionUpdatedAt({ + storePath, + sessionKey: route.sessionKey, + }); + const body = runtime.channel.reply.formatAgentEnvelope({ + channel: "Nostr", + from: senderPubkey, + timestamp: Date.now(), + previousTimestamp, + envelope: envelopeOptions, + body: text, + }); + + // Create typing callbacks for this conversation + // Note: busHandle is checked at invocation time (not creation time) + // to handle the race condition during startup + const typingCallbacks = createTypingCallbacks({ + start: async () => { + if (!busHandle) { + ctx.log?.debug(`[${account.accountId}] Skipping typing START (bus not ready)`); + return; + } + ctx.log?.debug(`[${account.accountId}] Sending typing START to ${senderPubkey.slice(0, 8)}`); + return busHandle.sendTypingStart(senderPubkey); + }, + stop: async () => { + if (!busHandle) { + ctx.log?.debug(`[${account.accountId}] Skipping typing STOP (bus not ready)`); + return; + } + ctx.log?.debug(`[${account.accountId}] Sending typing STOP to ${senderPubkey.slice(0, 8)}`); + return busHandle.sendTypingStop(senderPubkey); + }, + onStartError: (err) => + logTypingFailure({ + log: (msg) => ctx.log?.warn(msg), + channel: "nostr", + target: senderPubkey, + action: "start", + error: err, + }), + onStopError: (err) => + logTypingFailure({ + log: (msg) => ctx.log?.warn(msg), + channel: "nostr", + target: senderPubkey, + action: "stop", + error: err, + }), + }); + + // Build the inbound message context + const ctxPayload = runtime.channel.reply.finalizeInboundContext({ + Body: body, + RawBody: text, + CommandBody: text, + From: `nostr:${senderPubkey}`, + To: `nostr:${senderPubkey}`, + SessionKey: route.sessionKey, + AccountId: account.accountId, + ChatType: "direct", + ConversationLabel: senderPubkey, + SenderName: senderPubkey.slice(0, 8), + SenderId: senderPubkey, + Provider: "nostr" as const, + Surface: "nostr" as const, + Timestamp: Date.now(), + MessageSid: eventId, // Nostr event ID for deduplication + CommandAuthorized: true, // TODO: implement proper authorization + CommandSource: "text" as const, + OriginatingChannel: "nostr" as const, + OriginatingTo: `nostr:${senderPubkey}`, + }); + + await runtime.channel.session.recordInboundSession({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + updateLastRoute: { + sessionKey: route.mainSessionKey, + channel: "nostr", + to: `nostr:${senderPubkey}`, + accountId: route.accountId, + }, + onRecordError: (err) => { + ctx.log?.warn?.(`nostr: failed updating session meta: ${String(err)}`); + }, + }); + + // Get table mode for formatting + const tableMode = runtime.channel.text.resolveMarkdownTableMode({ + cfg, + channel: "nostr", + accountId: account.accountId, + }); + + // Create reply prefix context + const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId }); + + // Create the reply dispatcher + const { dispatcher, replyOptions, markDispatchIdle } = + runtime.channel.reply.createReplyDispatcherWithTyping({ + responsePrefix: prefixContext.responsePrefix, + responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, + humanDelay: runtime.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), + deliver: async (payload) => { + const message = runtime.channel.text.convertMarkdownTables( + payload.text ?? "", + tableMode + ); + if (!message) return; + ctx.log?.debug(`[${account.accountId}] Delivering reply to ${senderPubkey.slice(0, 8)}: ${message.slice(0, 50)}...`); + await reply(message); + ctx.log?.info(`[${account.accountId}] Reply delivered to ${senderPubkey.slice(0, 8)}`); + }, + onError: (err, info) => { + ctx.log?.error(`[${account.accountId}] nostr ${info.kind} reply failed: ${String(err)}`); + }, + onReplyStart: typingCallbacks?.onReplyStart, + onIdle: typingCallbacks?.onIdle, + }); + + // Dispatch the reply + const { queuedFinal, counts } = await runtime.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + onModelSelected: prefixContext.onModelSelected, }, }); }, diff --git a/extensions/nostr/src/nostr-bus.ts b/extensions/nostr/src/nostr-bus.ts index 25ae6f082..077f56862 100644 --- a/extensions/nostr/src/nostr-bus.ts +++ b/extensions/nostr/src/nostr-bus.ts @@ -678,7 +678,147 @@ async function sendEncryptedDm( } } - throw new Error(`Failed to publish to any relay: ${lastError?.message}`); + if (successCount === 0) { + throw new Error(`Failed to publish to any relay: ${lastError?.message}`); + } +} + +// ============================================================================ +// Typing Indicator (Kind 20001 Ephemeral Event) +// ============================================================================ + +/** + * Send a typing indicator event to a pubkey + * Uses kind 20001 (community convention for typing) + * Content is NIP-04 encrypted for privacy consistency with DMs + */ +async function sendTypingIndicator( + pool: SimplePool, + sk: Uint8Array, + toPubkey: string, + action: "start" | "stop", + relays: string[], + metrics: NostrMetrics, + circuitBreakers: Map, + healthTracker: RelayHealthTracker, + conversationEventId?: string, + onError?: (error: Error, context: string) => void +): Promise { + // Encrypt the action for privacy (consistent with DMs) + const ciphertext = await encrypt(sk, toPubkey, action); + + // Build tags + const tags: string[][] = [ + ["p", toPubkey], + ["t", "clawdbot-typing"], // Namespace tag for collision protection + ["expiration", String(Math.floor(Date.now() / 1000) + TYPING_TTL_SEC)], + ]; + + // Add conversation scope if provided + if (conversationEventId) { + tags.push(["e", conversationEventId]); + } + + const event = finalizeEvent( + { + kind: TYPING_KIND, + content: ciphertext, + tags, + created_at: Math.floor(Date.now() / 1000), + }, + sk + ); + + // Sort relays by health score + const sortedRelays = healthTracker.getSortedRelays(relays); + + // Try relays in order, respecting circuit breakers + let lastError: Error | undefined; + for (const relay of sortedRelays) { + const cb = circuitBreakers.get(relay); + if (cb && !cb.canAttempt()) { + continue; + } + + const startTime = Date.now(); + try { + await pool.publish([relay], event); + const latency = Date.now() - startTime; + cb?.recordSuccess(); + healthTracker.recordSuccess(relay, latency); + const metricName = action === "start" ? "typing.start.sent" : "typing.stop.sent"; + metrics.emit(metricName, 1, { relay }); + return; // Success - exit early + } catch (err) { + lastError = err as Error; + cb?.recordFailure(); + healthTracker.recordFailure(relay); + metrics.emit("typing.error", 1, { relay }); + onError?.(lastError, `typing ${action} to ${relay}`); + } + } + + // Don't throw for typing failures - they're non-critical + if (lastError) { + onError?.(lastError, `typing ${action} failed on all relays`); + } +} + +/** + * Create throttled typing indicator functions + * Returns start/stop functions that respect throttling (max 1 event per 5s per recipient) + */ +function createTypingController( + pool: SimplePool, + sk: Uint8Array, + relays: string[], + metrics: NostrMetrics, + circuitBreakers: Map, + healthTracker: RelayHealthTracker, + onError?: (error: Error, context: string) => void +): { + sendTypingStart: (toPubkey: string, conversationEventId?: string) => Promise; + sendTypingStop: (toPubkey: string, conversationEventId?: string) => Promise; +} { + // Track last send time per recipient for throttling + const lastSendTime = new Map(); + + const sendWithThrottle = async ( + toPubkey: string, + action: "start" | "stop", + conversationEventId?: string + ): Promise => { + const now = Date.now(); + const lastSent = lastSendTime.get(toPubkey) ?? 0; + + // Stop events bypass throttle for better UX + if (action === "start") { + if (now - lastSent < TYPING_THROTTLE_MS) { + return; // Throttled + } + lastSendTime.set(toPubkey, now); + } + + await sendTypingIndicator( + pool, + sk, + toPubkey, + action, + relays, + metrics, + circuitBreakers, + healthTracker, + conversationEventId, + onError + ); + }; + + return { + sendTypingStart: (toPubkey: string, conversationEventId?: string) => + sendWithThrottle(toPubkey, "start", conversationEventId), + sendTypingStop: (toPubkey: string, conversationEventId?: string) => + sendWithThrottle(toPubkey, "stop", conversationEventId), + }; } // ============================================================================