From 8d0e3a7ecd73d52081b43d43e0e846f787dc0f63 Mon Sep 17 00:00:00 2001 From: abhijrathod Date: Fri, 30 Jan 2026 15:20:14 +0530 Subject: [PATCH] fix(nostr): use dispatchReplyFromConfig for inbound messages The Nostr channel was calling a non-existent handleInboundMessage method on the plugin runtime. This refactors the onMessage handler to use the correct API pattern: - Build proper MsgContext with all required fields - Finalize context with finalizeInboundContext - Record session metadata with recordInboundSession - Create reply dispatcher with createReplyDispatcherWithTyping - Dispatch to agent with dispatchReplyFromConfig This follows the same pattern used by Matrix, MS Teams, and Mattermost channel extensions. Fixes #4547 --- extensions/nostr/src/channel.ts | 164 ++++++++++++++++++++++++++++++-- 1 file changed, 155 insertions(+), 9 deletions(-) diff --git a/extensions/nostr/src/channel.ts b/extensions/nostr/src/channel.ts index 990c06f88..5aa66b1ff 100644 --- a/extensions/nostr/src/channel.ts +++ b/extensions/nostr/src/channel.ts @@ -1,7 +1,10 @@ import { buildChannelConfigSchema, + createReplyPrefixContext, + createTypingCallbacks, DEFAULT_ACCOUNT_ID, formatPairingApproveHint, + logTypingFailure, type ChannelPlugin, } from "openclaw/plugin-sdk"; @@ -221,18 +224,161 @@ export const nostrPlugin: ChannelPlugin = { onMessage: async (senderPubkey, text, reply) => { ctx.log?.debug(`[${account.accountId}] DM from ${senderPubkey}: ${text.slice(0, 50)}...`); - // Forward to OpenClaw's message pipeline - await runtime.channel.reply.handleInboundMessage({ + const cfg = runtime.config.loadConfig(); + + // Resolve agent route for this message + const route = runtime.channel.routing.resolveAgentRoute({ + cfg, channel: "nostr", - accountId: account.accountId, - senderId: senderPubkey, - chatType: "direct", - chatId: senderPubkey, // For DMs, chatId is the sender's pubkey - text, - reply: async (responseText: string) => { - await reply(responseText); + peer: { + kind: "dm", + id: senderPubkey, }, }); + + // Build envelope for agent context + const storePath = runtime.channel.session.resolveStorePath(cfg.session?.store, { + agentId: route.agentId, + }); + const envelopeOptions = runtime.channel.reply.resolveEnvelopeFormatOptions(cfg); + const previousTimestamp = runtime.channel.session.readSessionUpdatedAt({ + storePath, + sessionKey: route.sessionKey, + }); + const timestamp = Date.now(); + const body = runtime.channel.reply.formatAgentEnvelope({ + channel: "Nostr", + from: senderPubkey.slice(0, 12) + "...", + timestamp, + previousTimestamp, + envelope: envelopeOptions, + body: text, + }); + + // Check if sender is allowed (for command authorization) + const allowFrom = account.config.allowFrom ?? []; + const normalizedSender = normalizePubkey(senderPubkey); + const senderAllowed = allowFrom.length === 0 || allowFrom.some((entry) => { + if (entry === "*") return true; + try { + return normalizePubkey(String(entry)) === normalizedSender; + } catch { + return String(entry) === senderPubkey; + } + }); + + // Check for control commands + const hasControlCommand = runtime.channel.text.hasControlCommand(text, cfg); + const allowTextCommands = runtime.channel.commands.shouldHandleTextCommands({ + cfg, + surface: "nostr", + }); + const commandAuthorized = allowTextCommands && senderAllowed && hasControlCommand; + + // Build the inbound context + const ctxPayload = runtime.channel.reply.finalizeInboundContext({ + Body: body, + RawBody: text, + CommandBody: text, + From: `nostr:${senderPubkey}`, + To: `nostr:${account.publicKey}`, + SessionKey: route.sessionKey, + AccountId: account.accountId, + ChatType: "direct", + ConversationLabel: senderPubkey.slice(0, 12) + "...", + SenderName: senderPubkey.slice(0, 12) + "...", + SenderId: senderPubkey, + Provider: "nostr" as const, + Surface: "nostr" as const, + Timestamp: timestamp, + CommandAuthorized: commandAuthorized, + CommandSource: "text" as const, + OriginatingChannel: "nostr" as const, + OriginatingTo: `nostr:${account.publicKey}`, + }); + + // Record inbound session + await runtime.channel.session.recordInboundSession({ + storePath, + sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + ctx: ctxPayload, + updateLastRoute: { + sessionKey: route.mainSessionKey, + channel: "nostr", + to: `nostr:${account.publicKey}`, + accountId: account.accountId, + }, + onRecordError: (err) => { + ctx.log?.warn?.(`[${account.accountId}] failed updating session meta: ${String(err)}`); + }, + }); + + // Set up typing callbacks (Nostr doesn't support typing indicators, so these are no-ops) + const typingCallbacks = createTypingCallbacks({ + start: () => Promise.resolve(), + stop: () => Promise.resolve(), + onStartError: (err) => { + logTypingFailure({ + log: (msg) => ctx.log?.debug?.(msg), + channel: "nostr", + action: "start", + error: err, + }); + }, + }); + + // Set up reply prefix context + const prefixContext = createReplyPrefixContext({ cfg, agentId: route.agentId }); + + // Create reply dispatcher + const tableMode = runtime.channel.text.resolveMarkdownTableMode({ + cfg, + channel: "nostr", + accountId: account.accountId, + }); + const { dispatcher, replyOptions, markDispatchIdle } = + runtime.channel.reply.createReplyDispatcherWithTyping({ + responsePrefix: prefixContext.responsePrefix, + responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, + humanDelay: runtime.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), + deliver: async (payload) => { + const message = runtime.channel.text.convertMarkdownTables( + payload.text ?? "", + tableMode, + ); + if (message.trim()) { + await reply(message); + } + }, + onError: (err, info) => { + ctx.log?.error(`[${account.accountId}] Nostr ${info.kind} reply failed: ${String(err)}`); + }, + onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, + }); + + // Dispatch the message to the agent + try { + const { queuedFinal, counts } = await runtime.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + onModelSelected: prefixContext.onModelSelected, + }, + }); + markDispatchIdle(); + if (queuedFinal) { + const finalCount = counts.final; + ctx.log?.debug?.( + `[${account.accountId}] delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${senderPubkey}`, + ); + } + } catch (err) { + markDispatchIdle(); + ctx.log?.error(`[${account.accountId}] Nostr dispatch failed: ${String(err)}`); + } }, onError: (error, context) => { ctx.log?.error(`[${account.accountId}] Nostr error (${context}): ${error.message}`);