import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { normalizeReplyPayload } from "./normalize-reply.js"; import type { TypingController } from "./typing.js"; export type ReplyDispatchKind = "tool" | "block" | "final"; type ReplyDispatchErrorHandler = ( err: unknown, info: { kind: ReplyDispatchKind }, ) => void; type ReplyDispatchDeliverer = ( payload: ReplyPayload, info: { kind: ReplyDispatchKind }, ) => Promise; export type ReplyDispatcherOptions = { deliver: ReplyDispatchDeliverer; responsePrefix?: string; onHeartbeatStrip?: () => void; onIdle?: () => void; onError?: ReplyDispatchErrorHandler; }; type ReplyDispatcherWithTypingOptions = Omit< ReplyDispatcherOptions, "onIdle" > & { onReplyStart?: () => Promise | void; onIdle?: () => void; }; type ReplyDispatcherWithTypingResult = { dispatcher: ReplyDispatcher; replyOptions: Pick; markDispatchIdle: () => void; }; export type ReplyDispatcher = { sendToolResult: (payload: ReplyPayload) => boolean; sendBlockReply: (payload: ReplyPayload) => boolean; sendFinalReply: (payload: ReplyPayload) => boolean; waitForIdle: () => Promise; getQueuedCounts: () => Record; }; function normalizeReplyPayloadInternal( payload: ReplyPayload, opts: Pick, ): ReplyPayload | null { return normalizeReplyPayload(payload, { responsePrefix: opts.responsePrefix, onHeartbeatStrip: opts.onHeartbeatStrip, }); } export function createReplyDispatcher( options: ReplyDispatcherOptions, ): ReplyDispatcher { let sendChain: Promise = Promise.resolve(); // Track in-flight deliveries so we can emit a reliable "idle" signal. let pending = 0; // Serialize outbound replies to preserve tool/block/final order. const queuedCounts: Record = { tool: 0, block: 0, final: 0, }; const enqueue = (kind: ReplyDispatchKind, payload: ReplyPayload) => { const normalized = normalizeReplyPayloadInternal(payload, options); if (!normalized) return false; queuedCounts[kind] += 1; pending += 1; sendChain = sendChain .then(() => options.deliver(normalized, { kind })) .catch((err) => { options.onError?.(err, { kind }); }) .finally(() => { pending -= 1; if (pending === 0) { options.onIdle?.(); } }); return true; }; return { sendToolResult: (payload) => enqueue("tool", payload), sendBlockReply: (payload) => enqueue("block", payload), sendFinalReply: (payload) => enqueue("final", payload), waitForIdle: () => sendChain, getQueuedCounts: () => ({ ...queuedCounts }), }; } export function createReplyDispatcherWithTyping( options: ReplyDispatcherWithTypingOptions, ): ReplyDispatcherWithTypingResult { const { onReplyStart, onIdle, ...dispatcherOptions } = options; let typingController: TypingController | undefined; const dispatcher = createReplyDispatcher({ ...dispatcherOptions, onIdle: () => { typingController?.markDispatchIdle(); onIdle?.(); }, }); return { dispatcher, replyOptions: { onReplyStart, onTypingController: (typing) => { typingController = typing; }, }, markDispatchIdle: () => { typingController?.markDispatchIdle(); onIdle?.(); }, }; }