Nostr: fix typing bus + metrics

This commit is contained in:
Joel Klabo 2026-01-26 16:54:36 -08:00
parent 1f2013299c
commit f00cc5464e
2 changed files with 64 additions and 10 deletions

View File

@ -41,6 +41,11 @@ export type RateLimitMetricName = "rate_limit.per_sender" | "rate_limit.global";
export type DecryptMetricName = "decrypt.success" | "decrypt.failure";
export type TypingMetricName =
| "typing.start.sent"
| "typing.stop.sent"
| "typing.error";
export type MemoryMetricName =
| "memory.seen_tracker_size"
| "memory.rate_limiter_entries";
@ -50,6 +55,7 @@ export type MetricName =
| RelayMetricName
| RateLimitMetricName
| DecryptMetricName
| TypingMetricName
| MemoryMetricName;
// ============================================================================
@ -128,6 +134,13 @@ export interface MetricsSnapshot {
failure: number;
};
/** Typing indicator stats */
typing: {
startSent: number;
stopSent: number;
errors: number;
};
/** Memory/capacity stats */
memory: {
seenTrackerSize: number;
@ -213,6 +226,13 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics {
failure: 0,
};
// Typing indicator stats
const typing = {
startSent: 0,
stopSent: 0,
errors: 0,
};
// Memory stats (updated via gauge-style metrics)
const memory = {
seenTrackerSize: 0,
@ -371,6 +391,16 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics {
decrypt.failure += value;
break;
case "typing.start.sent":
typing.startSent += value;
break;
case "typing.stop.sent":
typing.stopSent += value;
break;
case "typing.error":
typing.errors += value;
break;
// Memory (gauge-style - value replaces, not adds)
case "memory.seen_tracker_size":
memory.seenTrackerSize = value;
@ -396,6 +426,7 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics {
relays: relaysObj,
rateLimiting: { ...rateLimiting },
decrypt: { ...decrypt },
typing: { ...typing },
memory: { ...memory },
snapshotAt: Date.now(),
};
@ -422,6 +453,9 @@ export function createMetrics(onMetric?: OnMetricCallback): NostrMetrics {
rateLimiting.globalHits = 0;
decrypt.success = 0;
decrypt.failure = 0;
typing.startSent = 0;
typing.stopSent = 0;
typing.errors = 0;
memory.seenTrackerSize = 0;
memory.rateLimiterEntries = 0;
}
@ -452,6 +486,7 @@ export function createNoopMetrics(): NostrMetrics {
relays: {},
rateLimiting: { perSenderHits: 0, globalHits: 0 },
decrypt: { success: 0, failure: 0 },
typing: { startSent: 0, stopSent: 0, errors: 0 },
memory: { seenTrackerSize: 0, rateLimiterEntries: 0 },
snapshotAt: 0,
};

View File

@ -51,6 +51,11 @@ const CIRCUIT_BREAKER_RESET_MS = 30000; // 30 seconds before half-open
// Health tracker configuration
const HEALTH_WINDOW_MS = 60000; // 1 minute window for health stats
// Typing indicator configuration (NIP-01 ephemeral events)
const TYPING_KIND = 20001; // Community convention for typing indicators
const TYPING_TTL_SEC = 30; // 30 second expiration
const TYPING_THROTTLE_MS = 5000; // Max 1 event per 5 seconds per recipient
// ============================================================================
// Types
// ============================================================================
@ -66,7 +71,8 @@ export interface NostrBusOptions {
onMessage: (
pubkey: string,
text: string,
reply: (text: string) => Promise<void>
reply: (text: string) => Promise<void>,
eventId: string
) => Promise<void>;
/** Called on errors (optional) */
onError?: (error: Error, context: string) => void;
@ -101,6 +107,10 @@ export interface NostrBusHandle {
lastPublishedEventId: string | null;
lastPublishResults: Record<string, "ok" | "failed" | "timeout"> | null;
}>;
/** Send typing indicator start (kind 20001) */
sendTypingStart: (toPubkey: string, conversationEventId?: string) => Promise<void>;
/** Send typing indicator stop (kind 20001) */
sendTypingStop: (toPubkey: string, conversationEventId?: string) => Promise<void>;
}
// ============================================================================
@ -496,7 +506,7 @@ export async function startNostrBus(
};
// Call the message handler
await onMessage(event.pubkey, plaintext, replyTo);
await onMessage(event.pubkey, plaintext, replyTo, event.id);
// Mark as processed
metrics.emit("event.processed");
@ -512,26 +522,21 @@ export async function startNostrBus(
const sub = pool.subscribeMany(
relays,
[{ kinds: [4], "#p": [pk], since }],
{ kinds: [4], "#p": [pk], since },
{
onevent: handleEvent,
oneose: () => {
// EOSE handler - called when all stored events have been received
for (const relay of relays) {
metrics.emit("relay.message.eose", 1, { relay });
}
onEose?.(relays.join(", "));
},
onclose: (reason) => {
// Handle subscription close
for (const relay of relays) {
metrics.emit("relay.message.closed", 1, { relay });
options.onDisconnect?.(relay);
}
onError?.(
new Error(`Subscription closed: ${reason}`),
"subscription"
);
onError?.(new Error(`Subscription closed: ${reason}`), "subscription");
},
}
);
@ -590,6 +595,17 @@ export async function startNostrBus(
};
};
// Create typing controller for throttled typing indicators
const typingController = createTypingController(
pool,
sk,
relays,
metrics,
circuitBreakers,
healthTracker,
onError
);
return {
close: () => {
sub.close();
@ -610,6 +626,8 @@ export async function startNostrBus(
getMetrics: () => metrics.getSnapshot(),
publishProfile,
getProfileState,
sendTypingStart: typingController.sendTypingStart,
sendTypingStop: typingController.sendTypingStop,
};
}
@ -646,6 +664,7 @@ async function sendEncryptedDm(
const sortedRelays = healthTracker.getSortedRelays(relays);
// Try relays in order of health, respecting circuit breakers
let successCount = 0;
let lastError: Error | undefined;
for (const relay of sortedRelays) {
const cb = circuitBreakers.get(relay);
@ -663,7 +682,7 @@ async function sendEncryptedDm(
// Record success
cb?.recordSuccess();
healthTracker.recordSuccess(relay, latency);
successCount++;
return; // Success - exit early
} catch (err) {
lastError = err as Error;