Nostr: restore session tracking + reply routing

This commit is contained in:
Joel Klabo 2026-01-26 16:36:10 -08:00
parent f24ab93e8a
commit 3f78748dc9
2 changed files with 287 additions and 10 deletions

View File

@ -1,7 +1,10 @@
import {
buildChannelConfigSchema,
createReplyPrefixContext,
createTypingCallbacks,
DEFAULT_ACCOUNT_ID,
formatPairingApproveHint,
logTypingFailure,
type ChannelPlugin,
} from "clawdbot/plugin-sdk";
@ -218,19 +221,153 @@ export const nostrPlugin: ChannelPlugin<ResolvedNostrAccount> = {
accountId: account.accountId,
privateKey: account.privateKey,
relays: account.relays,
onMessage: async (senderPubkey, text, reply) => {
onMessage: async (senderPubkey, text, reply, eventId) => {
ctx.log?.debug(`[${account.accountId}] DM from ${senderPubkey}: ${text.slice(0, 50)}...`);
// Forward to moltbot's message pipeline
await runtime.channel.reply.handleInboundMessage({
const cfg = runtime.config.loadConfig();
const route = runtime.channel.routing.resolveAgentRoute({
cfg,
channel: "nostr",
accountId: account.accountId,
senderId: senderPubkey,
chatType: "direct",
chatId: senderPubkey, // For DMs, chatId is the sender's pubkey
text,
reply: async (responseText: string) => {
await reply(responseText);
peer: { kind: "dm", id: senderPubkey },
});
ctx.log?.debug(`[${account.accountId}] Route resolved: sessionKey=${route.sessionKey}, agentId=${route.agentId}`);
const storePath = runtime.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = runtime.channel.reply.resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = runtime.channel.session.readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const body = runtime.channel.reply.formatAgentEnvelope({
channel: "Nostr",
from: senderPubkey,
timestamp: Date.now(),
previousTimestamp,
envelope: envelopeOptions,
body: text,
});
// Create typing callbacks for this conversation
// Note: busHandle is checked at invocation time (not creation time)
// to handle the race condition during startup
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!busHandle) {
ctx.log?.debug(`[${account.accountId}] Skipping typing START (bus not ready)`);
return;
}
ctx.log?.debug(`[${account.accountId}] Sending typing START to ${senderPubkey.slice(0, 8)}`);
return busHandle.sendTypingStart(senderPubkey);
},
stop: async () => {
if (!busHandle) {
ctx.log?.debug(`[${account.accountId}] Skipping typing STOP (bus not ready)`);
return;
}
ctx.log?.debug(`[${account.accountId}] Sending typing STOP to ${senderPubkey.slice(0, 8)}`);
return busHandle.sendTypingStop(senderPubkey);
},
onStartError: (err) =>
logTypingFailure({
log: (msg) => ctx.log?.warn(msg),
channel: "nostr",
target: senderPubkey,
action: "start",
error: err,
}),
onStopError: (err) =>
logTypingFailure({
log: (msg) => ctx.log?.warn(msg),
channel: "nostr",
target: senderPubkey,
action: "stop",
error: err,
}),
});
// Build the inbound message context
const ctxPayload = runtime.channel.reply.finalizeInboundContext({
Body: body,
RawBody: text,
CommandBody: text,
From: `nostr:${senderPubkey}`,
To: `nostr:${senderPubkey}`,
SessionKey: route.sessionKey,
AccountId: account.accountId,
ChatType: "direct",
ConversationLabel: senderPubkey,
SenderName: senderPubkey.slice(0, 8),
SenderId: senderPubkey,
Provider: "nostr" as const,
Surface: "nostr" as const,
Timestamp: Date.now(),
MessageSid: eventId, // Nostr event ID for deduplication
CommandAuthorized: true, // TODO: implement proper authorization
CommandSource: "text" as const,
OriginatingChannel: "nostr" as const,
OriginatingTo: `nostr:${senderPubkey}`,
});
await runtime.channel.session.recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: {
sessionKey: route.mainSessionKey,
channel: "nostr",
to: `nostr:${senderPubkey}`,
accountId: route.accountId,
},
onRecordError: (err) => {
ctx.log?.warn?.(`nostr: failed updating session meta: ${String(err)}`);
},
});
// Get table mode for formatting
const tableMode = runtime.channel.text.resolveMarkdownTableMode({
cfg,
channel: "nostr",
accountId: account.accountId,
});
// Create reply prefix context
const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId });
// Create the reply dispatcher
const { dispatcher, replyOptions, markDispatchIdle } =
runtime.channel.reply.createReplyDispatcherWithTyping({
responsePrefix: prefixContext.responsePrefix,
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: runtime.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => {
const message = runtime.channel.text.convertMarkdownTables(
payload.text ?? "",
tableMode
);
if (!message) return;
ctx.log?.debug(`[${account.accountId}] Delivering reply to ${senderPubkey.slice(0, 8)}: ${message.slice(0, 50)}...`);
await reply(message);
ctx.log?.info(`[${account.accountId}] Reply delivered to ${senderPubkey.slice(0, 8)}`);
},
onError: (err, info) => {
ctx.log?.error(`[${account.accountId}] nostr ${info.kind} reply failed: ${String(err)}`);
},
onReplyStart: typingCallbacks?.onReplyStart,
onIdle: typingCallbacks?.onIdle,
});
// Dispatch the reply
const { queuedFinal, counts } = await runtime.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
onModelSelected: prefixContext.onModelSelected,
},
});
},

View File

@ -678,7 +678,147 @@ async function sendEncryptedDm(
}
}
throw new Error(`Failed to publish to any relay: ${lastError?.message}`);
if (successCount === 0) {
throw new Error(`Failed to publish to any relay: ${lastError?.message}`);
}
}
// ============================================================================
// Typing Indicator (Kind 20001 Ephemeral Event)
// ============================================================================
/**
* Send a typing indicator event to a pubkey
* Uses kind 20001 (community convention for typing)
* Content is NIP-04 encrypted for privacy consistency with DMs
*/
async function sendTypingIndicator(
pool: SimplePool,
sk: Uint8Array,
toPubkey: string,
action: "start" | "stop",
relays: string[],
metrics: NostrMetrics,
circuitBreakers: Map<string, CircuitBreaker>,
healthTracker: RelayHealthTracker,
conversationEventId?: string,
onError?: (error: Error, context: string) => void
): Promise<void> {
// Encrypt the action for privacy (consistent with DMs)
const ciphertext = await encrypt(sk, toPubkey, action);
// Build tags
const tags: string[][] = [
["p", toPubkey],
["t", "clawdbot-typing"], // Namespace tag for collision protection
["expiration", String(Math.floor(Date.now() / 1000) + TYPING_TTL_SEC)],
];
// Add conversation scope if provided
if (conversationEventId) {
tags.push(["e", conversationEventId]);
}
const event = finalizeEvent(
{
kind: TYPING_KIND,
content: ciphertext,
tags,
created_at: Math.floor(Date.now() / 1000),
},
sk
);
// Sort relays by health score
const sortedRelays = healthTracker.getSortedRelays(relays);
// Try relays in order, respecting circuit breakers
let lastError: Error | undefined;
for (const relay of sortedRelays) {
const cb = circuitBreakers.get(relay);
if (cb && !cb.canAttempt()) {
continue;
}
const startTime = Date.now();
try {
await pool.publish([relay], event);
const latency = Date.now() - startTime;
cb?.recordSuccess();
healthTracker.recordSuccess(relay, latency);
const metricName = action === "start" ? "typing.start.sent" : "typing.stop.sent";
metrics.emit(metricName, 1, { relay });
return; // Success - exit early
} catch (err) {
lastError = err as Error;
cb?.recordFailure();
healthTracker.recordFailure(relay);
metrics.emit("typing.error", 1, { relay });
onError?.(lastError, `typing ${action} to ${relay}`);
}
}
// Don't throw for typing failures - they're non-critical
if (lastError) {
onError?.(lastError, `typing ${action} failed on all relays`);
}
}
/**
* Create throttled typing indicator functions
* Returns start/stop functions that respect throttling (max 1 event per 5s per recipient)
*/
function createTypingController(
pool: SimplePool,
sk: Uint8Array,
relays: string[],
metrics: NostrMetrics,
circuitBreakers: Map<string, CircuitBreaker>,
healthTracker: RelayHealthTracker,
onError?: (error: Error, context: string) => void
): {
sendTypingStart: (toPubkey: string, conversationEventId?: string) => Promise<void>;
sendTypingStop: (toPubkey: string, conversationEventId?: string) => Promise<void>;
} {
// Track last send time per recipient for throttling
const lastSendTime = new Map<string, number>();
const sendWithThrottle = async (
toPubkey: string,
action: "start" | "stop",
conversationEventId?: string
): Promise<void> => {
const now = Date.now();
const lastSent = lastSendTime.get(toPubkey) ?? 0;
// Stop events bypass throttle for better UX
if (action === "start") {
if (now - lastSent < TYPING_THROTTLE_MS) {
return; // Throttled
}
lastSendTime.set(toPubkey, now);
}
await sendTypingIndicator(
pool,
sk,
toPubkey,
action,
relays,
metrics,
circuitBreakers,
healthTracker,
conversationEventId,
onError
);
};
return {
sendTypingStart: (toPubkey: string, conversationEventId?: string) =>
sendWithThrottle(toPubkey, "start", conversationEventId),
sendTypingStop: (toPubkey: string, conversationEventId?: string) =>
sendWithThrottle(toPubkey, "stop", conversationEventId),
};
}
// ============================================================================