From f00cc5464e96e820c3c72b5fc05a74eb6194c39d Mon Sep 17 00:00:00 2001 From: Joel Klabo Date: Mon, 26 Jan 2026 16:54:36 -0800 Subject: [PATCH] Nostr: fix typing bus + metrics --- extensions/nostr/src/metrics.ts | 35 +++++++++++++++++++++++++++ extensions/nostr/src/nostr-bus.ts | 39 +++++++++++++++++++++++-------- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/extensions/nostr/src/metrics.ts b/extensions/nostr/src/metrics.ts index e9e0e7fb7..ba5bd1636 100644 --- a/extensions/nostr/src/metrics.ts +++ b/extensions/nostr/src/metrics.ts @@ -41,6 +41,11 @@ export type RateLimitMetricName = "rate_limit.per_sender" | "rate_limit.global"; export type DecryptMetricName = "decrypt.success" | "decrypt.failure"; +export type TypingMetricName = + | "typing.start.sent" + | "typing.stop.sent" + | "typing.error"; + export type MemoryMetricName = | "memory.seen_tracker_size" | "memory.rate_limiter_entries"; @@ -50,6 +55,7 @@ export type MetricName = | RelayMetricName | RateLimitMetricName | DecryptMetricName + | TypingMetricName | MemoryMetricName; // ============================================================================ @@ -128,6 +134,13 @@ export interface MetricsSnapshot { failure: number; }; + /** Typing indicator stats */ + typing: { + startSent: number; + stopSent: number; + errors: number; + }; + /** Memory/capacity stats */ memory: { seenTrackerSize: number; @@ -213,6 +226,13 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics { failure: 0, }; + // Typing indicator stats + const typing = { + startSent: 0, + stopSent: 0, + errors: 0, + }; + // Memory stats (updated via gauge-style metrics) const memory = { seenTrackerSize: 0, @@ -371,6 +391,16 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics { decrypt.failure += value; break; + case "typing.start.sent": + typing.startSent += value; + break; + case "typing.stop.sent": + typing.stopSent += value; + break; + case "typing.error": + typing.errors += value; + break; + // Memory (gauge-style - value replaces, not adds) case "memory.seen_tracker_size": memory.seenTrackerSize = value; @@ -396,6 +426,7 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics { relays: relaysObj, rateLimiting: { ...rateLimiting }, decrypt: { ...decrypt }, + typing: { ...typing }, memory: { ...memory }, snapshotAt: Date.now(), }; @@ -422,6 +453,9 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics { rateLimiting.globalHits = 0; decrypt.success = 0; decrypt.failure = 0; + typing.startSent = 0; + typing.stopSent = 0; + typing.errors = 0; memory.seenTrackerSize = 0; memory.rateLimiterEntries = 0; } @@ -452,6 +486,7 @@ export function createNoopMetrics(): NostrMetrics { relays: {}, rateLimiting: { perSenderHits: 0, globalHits: 0 }, decrypt: { success: 0, failure: 0 }, + typing: { startSent: 0, stopSent: 0, errors: 0 }, memory: { seenTrackerSize: 0, rateLimiterEntries: 0 }, snapshotAt: 0, }; diff --git a/extensions/nostr/src/nostr-bus.ts b/extensions/nostr/src/nostr-bus.ts index 077f56862..89afb04f0 100644 --- a/extensions/nostr/src/nostr-bus.ts +++ b/extensions/nostr/src/nostr-bus.ts @@ -51,6 +51,11 @@ const CIRCUIT_BREAKER_RESET_MS = 30000; // 30 seconds before half-open // Health tracker configuration const HEALTH_WINDOW_MS = 60000; // 1 minute window for health stats +// Typing indicator configuration (NIP-01 ephemeral events) +const TYPING_KIND = 20001; // Community convention for typing indicators +const TYPING_TTL_SEC = 30; // 30 second expiration +const TYPING_THROTTLE_MS = 5000; // Max 1 event per 5 seconds per recipient + // ============================================================================ // Types // ============================================================================ @@ -66,7 +71,8 @@ export interface NostrBusOptions { onMessage: ( pubkey: string, text: string, - reply: (text: string) => Promise + reply: (text: string) => Promise, + eventId: string ) => Promise; /** Called on errors (optional) */ onError?: (error: Error, context: string) => void; @@ -101,6 +107,10 @@ export interface NostrBusHandle { lastPublishedEventId: string | null; lastPublishResults: Record | null; }>; + /** Send typing indicator start (kind 20001) */ + sendTypingStart: (toPubkey: string, conversationEventId?: string) => Promise; + /** Send typing indicator stop (kind 20001) */ + sendTypingStop: (toPubkey: string, conversationEventId?: string) => Promise; } // ============================================================================ @@ -496,7 +506,7 @@ export async function startNostrBus( }; // Call the message handler - await onMessage(event.pubkey, plaintext, replyTo); + await onMessage(event.pubkey, plaintext, replyTo, event.id); // Mark as processed metrics.emit("event.processed"); @@ -512,26 +522,21 @@ export async function startNostrBus( const sub = pool.subscribeMany( relays, - [{ kinds: [4], "#p": [pk], since }], + { kinds: [4], "#p": [pk], since }, { onevent: handleEvent, oneose: () => { - // EOSE handler - called when all stored events have been received for (const relay of relays) { metrics.emit("relay.message.eose", 1, { relay }); } onEose?.(relays.join(", ")); }, onclose: (reason) => { - // Handle subscription close for (const relay of relays) { metrics.emit("relay.message.closed", 1, { relay }); options.onDisconnect?.(relay); } - onError?.( - new Error(`Subscription closed: ${reason}`), - "subscription" - ); + onError?.(new Error(`Subscription closed: ${reason}`), "subscription"); }, } ); @@ -590,6 +595,17 @@ export async function startNostrBus( }; }; + // Create typing controller for throttled typing indicators + const typingController = createTypingController( + pool, + sk, + relays, + metrics, + circuitBreakers, + healthTracker, + onError + ); + return { close: () => { sub.close(); @@ -610,6 +626,8 @@ export async function startNostrBus( getMetrics: () => metrics.getSnapshot(), publishProfile, getProfileState, + sendTypingStart: typingController.sendTypingStart, + sendTypingStop: typingController.sendTypingStop, }; } @@ -646,6 +664,7 @@ async function sendEncryptedDm( const sortedRelays = healthTracker.getSortedRelays(relays); // Try relays in order of health, respecting circuit breakers + let successCount = 0; let lastError: Error | undefined; for (const relay of sortedRelays) { const cb = circuitBreakers.get(relay); @@ -663,7 +682,7 @@ async function sendEncryptedDm( // Record success cb?.recordSuccess(); healthTracker.recordSuccess(relay, latency); - + successCount++; return; // Success - exit early } catch (err) { lastError = err as Error;