diff --git a/CHANGELOG.md b/CHANGELOG.md index b9910cbf3..df2e77a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Groups: `whatsapp.groups`, `telegram.groups`, and `imessage.groups` now act as allowlists when set. Add `"*"` to keep allow-all behavior. ### Fixes +- Typing indicators: stop typing once the reply dispatcher drains to prevent stuck typing across Discord/Telegram/WhatsApp. - Onboarding: resolve CLI entrypoint when running via `npx` so gateway daemon install works without a build step. - Onboarding: when OpenAI Codex OAuth is used, default to `openai-codex/gpt-5.2` and warn if the selected model lacks auth. - CLI: auto-migrate legacy config entries on command start (same behavior as gateway startup). diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 65b94e931..cd91595d6 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -233,6 +233,7 @@ export async function getReplyFromConfig( silentToken: SILENT_REPLY_TOKEN, log: defaultRuntime.log, }); + opts?.onTypingController?.(typing); let transcribedText: string | undefined; if (cfg.routing?.transcribeAudio && isAudio(ctx.MediaType)) { diff --git a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts index 2b437a57f..b0abb516d 100644 --- a/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts +++ b/src/auto-reply/reply/agent-runner.heartbeat-typing.test.ts @@ -50,6 +50,8 @@ function createTyping(): TypingController { startTypingLoop: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), + markRunComplete: vi.fn(), + markDispatchIdle: vi.fn(), cleanup: vi.fn(), }; } diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 9f994bdd6..1ff593ac4 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -514,6 +514,6 @@ export async function runReplyAgent(params: { finalPayloads.length === 1 ? finalPayloads[0] : finalPayloads, ); } finally { - typing.cleanup(); + typing.markRunComplete(); } } diff --git a/src/auto-reply/reply/followup-runner.compaction.test.ts b/src/auto-reply/reply/followup-runner.compaction.test.ts index b4ac4c856..481f42f8b 100644 --- a/src/auto-reply/reply/followup-runner.compaction.test.ts +++ b/src/auto-reply/reply/followup-runner.compaction.test.ts @@ -37,6 +37,8 @@ function createTyping(): TypingController { startTypingLoop: vi.fn(async () => {}), startTypingOnText: vi.fn(async () => {}), refreshTypingTtl: vi.fn(), + markRunComplete: vi.fn(), + markDispatchIdle: vi.fn(), cleanup: vi.fn(), }; } diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 528bca679..00d223be3 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -58,153 +58,157 @@ export function createFollowupRunner(params: { }; return async (queued: FollowupRun) => { - const runId = crypto.randomUUID(); - if (queued.run.sessionKey) { - registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); - } - let autoCompactionCompleted = false; - let runResult: Awaited>; - let fallbackProvider = queued.run.provider; - let fallbackModel = queued.run.model; try { - const fallbackResult = await runWithModelFallback({ - cfg: queued.run.config, - provider: queued.run.provider, - model: queued.run.model, - run: (provider, model) => - runEmbeddedPiAgent({ - sessionId: queued.run.sessionId, - sessionKey: queued.run.sessionKey, - surface: queued.run.surface, - sessionFile: queued.run.sessionFile, - workspaceDir: queued.run.workspaceDir, - config: queued.run.config, - skillsSnapshot: queued.run.skillsSnapshot, - prompt: queued.prompt, - extraSystemPrompt: queued.run.extraSystemPrompt, - ownerNumbers: queued.run.ownerNumbers, - enforceFinalTag: queued.run.enforceFinalTag, - provider, - model, - authProfileId: queued.run.authProfileId, - thinkLevel: queued.run.thinkLevel, - verboseLevel: queued.run.verboseLevel, - bashElevated: queued.run.bashElevated, - timeoutMs: queued.run.timeoutMs, - runId, - blockReplyBreak: queued.run.blockReplyBreak, - onAgentEvent: (evt) => { - if (evt.stream !== "compaction") return; - const phase = String(evt.data.phase ?? ""); - const willRetry = Boolean(evt.data.willRetry); - if (phase === "end" && !willRetry) { - autoCompactionCompleted = true; - } - }, - }), - }); - runResult = fallbackResult.result; - fallbackProvider = fallbackResult.provider; - fallbackModel = fallbackResult.model; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - defaultRuntime.error?.(`Followup agent failed before reply: ${message}`); - return; - } - - const payloadArray = runResult.payloads ?? []; - if (payloadArray.length === 0) return; - const sanitizedPayloads = payloadArray.flatMap((payload) => { - const text = payload.text; - if (!text || !text.includes("HEARTBEAT_OK")) return [payload]; - const stripped = stripHeartbeatToken(text, { mode: "message" }); - const hasMedia = - Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - if (stripped.shouldSkip && !hasMedia) return []; - return [{ ...payload, text: stripped.text }]; - }); - - const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads - .map((payload) => { - const { cleaned, replyToId } = extractReplyToTag(payload.text); - return { - ...payload, - text: cleaned ? cleaned : undefined, - replyToId: replyToId ?? payload.replyToId, - }; - }) - .filter( - (payload) => - payload.text || - payload.mediaUrl || - (payload.mediaUrls && payload.mediaUrls.length > 0), - ); - - if (replyTaggedPayloads.length === 0) return; - - if (autoCompactionCompleted) { - const count = await incrementCompactionCount({ - sessionEntry, - sessionStore, - sessionKey, - storePath, - }); - if (queued.run.verboseLevel === "on") { - const suffix = typeof count === "number" ? ` (count ${count})` : ""; - replyTaggedPayloads.unshift({ - text: `🧹 Auto-compaction complete${suffix}.`, + const runId = crypto.randomUUID(); + if (queued.run.sessionKey) { + registerAgentRunContext(runId, { sessionKey: queued.run.sessionKey }); + } + let autoCompactionCompleted = false; + let runResult: Awaited>; + let fallbackProvider = queued.run.provider; + let fallbackModel = queued.run.model; + try { + const fallbackResult = await runWithModelFallback({ + cfg: queued.run.config, + provider: queued.run.provider, + model: queued.run.model, + run: (provider, model) => + runEmbeddedPiAgent({ + sessionId: queued.run.sessionId, + sessionKey: queued.run.sessionKey, + surface: queued.run.surface, + sessionFile: queued.run.sessionFile, + workspaceDir: queued.run.workspaceDir, + config: queued.run.config, + skillsSnapshot: queued.run.skillsSnapshot, + prompt: queued.prompt, + extraSystemPrompt: queued.run.extraSystemPrompt, + ownerNumbers: queued.run.ownerNumbers, + enforceFinalTag: queued.run.enforceFinalTag, + provider, + model, + authProfileId: queued.run.authProfileId, + thinkLevel: queued.run.thinkLevel, + verboseLevel: queued.run.verboseLevel, + bashElevated: queued.run.bashElevated, + timeoutMs: queued.run.timeoutMs, + runId, + blockReplyBreak: queued.run.blockReplyBreak, + onAgentEvent: (evt) => { + if (evt.stream !== "compaction") return; + const phase = String(evt.data.phase ?? ""); + const willRetry = Boolean(evt.data.willRetry); + if (phase === "end" && !willRetry) { + autoCompactionCompleted = true; + } + }, + }), }); + runResult = fallbackResult.result; + fallbackProvider = fallbackResult.provider; + fallbackModel = fallbackResult.model; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + defaultRuntime.error?.(`Followup agent failed before reply: ${message}`); + return; } - } - if (sessionStore && sessionKey) { - const usage = runResult.meta.agentMeta?.usage; - const modelUsed = - runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; - const contextTokensUsed = - agentCfgContextTokens ?? - lookupContextTokens(modelUsed) ?? - sessionEntry?.contextTokens ?? - DEFAULT_CONTEXT_TOKENS; + const payloadArray = runResult.payloads ?? []; + if (payloadArray.length === 0) return; + const sanitizedPayloads = payloadArray.flatMap((payload) => { + const text = payload.text; + if (!text || !text.includes("HEARTBEAT_OK")) return [payload]; + const stripped = stripHeartbeatToken(text, { mode: "message" }); + const hasMedia = + Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + if (stripped.shouldSkip && !hasMedia) return []; + return [{ ...payload, text: stripped.text }]; + }); - if (usage) { - const entry = sessionStore[sessionKey]; - if (entry) { - const input = usage.input ?? 0; - const output = usage.output ?? 0; - const promptTokens = - input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); - sessionStore[sessionKey] = { - ...entry, - inputTokens: input, - outputTokens: output, - totalTokens: - promptTokens > 0 ? promptTokens : (usage.total ?? input), - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed, - contextTokens: contextTokensUsed ?? entry.contextTokens, - updatedAt: Date.now(), + const replyTaggedPayloads: ReplyPayload[] = sanitizedPayloads + .map((payload) => { + const { cleaned, replyToId } = extractReplyToTag(payload.text); + return { + ...payload, + text: cleaned ? cleaned : undefined, + replyToId: replyToId ?? payload.replyToId, }; - if (storePath) { - await saveSessionStore(storePath, sessionStore); - } + }) + .filter( + (payload) => + payload.text || + payload.mediaUrl || + (payload.mediaUrls && payload.mediaUrls.length > 0), + ); + + if (replyTaggedPayloads.length === 0) return; + + if (autoCompactionCompleted) { + const count = await incrementCompactionCount({ + sessionEntry, + sessionStore, + sessionKey, + storePath, + }); + if (queued.run.verboseLevel === "on") { + const suffix = typeof count === "number" ? ` (count ${count})` : ""; + replyTaggedPayloads.unshift({ + text: `🧹 Auto-compaction complete${suffix}.`, + }); } - } else if (modelUsed || contextTokensUsed) { - const entry = sessionStore[sessionKey]; - if (entry) { - sessionStore[sessionKey] = { - ...entry, - modelProvider: fallbackProvider ?? entry.modelProvider, - model: modelUsed ?? entry.model, - contextTokens: contextTokensUsed ?? entry.contextTokens, - }; - if (storePath) { - await saveSessionStore(storePath, sessionStore); + } + + if (sessionStore && sessionKey) { + const usage = runResult.meta.agentMeta?.usage; + const modelUsed = + runResult.meta.agentMeta?.model ?? fallbackModel ?? defaultModel; + const contextTokensUsed = + agentCfgContextTokens ?? + lookupContextTokens(modelUsed) ?? + sessionEntry?.contextTokens ?? + DEFAULT_CONTEXT_TOKENS; + + if (usage) { + const entry = sessionStore[sessionKey]; + if (entry) { + const input = usage.input ?? 0; + const output = usage.output ?? 0; + const promptTokens = + input + (usage.cacheRead ?? 0) + (usage.cacheWrite ?? 0); + sessionStore[sessionKey] = { + ...entry, + inputTokens: input, + outputTokens: output, + totalTokens: + promptTokens > 0 ? promptTokens : (usage.total ?? input), + modelProvider: fallbackProvider ?? entry.modelProvider, + model: modelUsed, + contextTokens: contextTokensUsed ?? entry.contextTokens, + updatedAt: Date.now(), + }; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } + } + } else if (modelUsed || contextTokensUsed) { + const entry = sessionStore[sessionKey]; + if (entry) { + sessionStore[sessionKey] = { + ...entry, + modelProvider: fallbackProvider ?? entry.modelProvider, + model: modelUsed ?? entry.model, + contextTokens: contextTokensUsed ?? entry.contextTokens, + }; + if (storePath) { + await saveSessionStore(storePath, sessionStore); + } } } } - } - await sendFollowupPayloads(replyTaggedPayloads); + await sendFollowupPayloads(replyTaggedPayloads); + } finally { + typing.markRunComplete(); + } }; } diff --git a/src/auto-reply/reply/reply-dispatcher.test.ts b/src/auto-reply/reply/reply-dispatcher.test.ts index d97822fe3..dee7795d2 100644 --- a/src/auto-reply/reply/reply-dispatcher.test.ts +++ b/src/auto-reply/reply/reply-dispatcher.test.ts @@ -79,4 +79,18 @@ describe("createReplyDispatcher", () => { await dispatcher.waitForIdle(); expect(delivered).toEqual(["tool", "block", "final"]); }); + + it("fires onIdle when the queue drains", async () => { + const deliver = vi.fn( + async () => await new Promise((resolve) => setTimeout(resolve, 5)), + ); + const onIdle = vi.fn(); + const dispatcher = createReplyDispatcher({ deliver, onIdle }); + + dispatcher.sendToolResult({ text: "one" }); + dispatcher.sendFinalReply({ text: "two" }); + + await dispatcher.waitForIdle(); + expect(onIdle).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/auto-reply/reply/reply-dispatcher.ts b/src/auto-reply/reply/reply-dispatcher.ts index 070cc7a65..26d4f14d3 100644 --- a/src/auto-reply/reply/reply-dispatcher.ts +++ b/src/auto-reply/reply/reply-dispatcher.ts @@ -18,6 +18,7 @@ export type ReplyDispatcherOptions = { deliver: ReplyDispatchDeliverer; responsePrefix?: string; onHeartbeatStrip?: () => void; + onIdle?: () => void; onError?: ReplyDispatchErrorHandler; }; @@ -70,6 +71,8 @@ export function createReplyDispatcher( options: ReplyDispatcherOptions, ): ReplyDispatcher { let sendChain: Promise = Promise.resolve(); + // Track in-flight deliveries so we can emit a reliable "idle" signal. + let pending = 0; // Serialize outbound replies to preserve tool/block/final order. const queuedCounts: Record = { tool: 0, @@ -81,10 +84,17 @@ export function createReplyDispatcher( const normalized = normalizeReplyPayload(payload, options); if (!normalized) return false; queuedCounts[kind] += 1; + pending += 1; sendChain = sendChain .then(() => options.deliver(normalized, { kind })) .catch((err) => { options.onError?.(err, { kind }); + }) + .finally(() => { + pending -= 1; + if (pending === 0) { + options.onIdle?.(); + } }); return true; }; diff --git a/src/auto-reply/reply/typing.ts b/src/auto-reply/reply/typing.ts index 6c2004e67..4478ccd0e 100644 --- a/src/auto-reply/reply/typing.ts +++ b/src/auto-reply/reply/typing.ts @@ -3,6 +3,8 @@ export type TypingController = { startTypingLoop: () => Promise; startTypingOnText: (text?: string) => Promise; refreshTypingTtl: () => void; + markRunComplete: () => void; + markDispatchIdle: () => void; cleanup: () => void; }; @@ -21,6 +23,9 @@ export function createTypingController(params: { log, } = params; let started = false; + let active = false; + let runComplete = false; + let dispatchIdle = false; let typingTimer: NodeJS.Timeout | undefined; let typingTtlTimer: NodeJS.Timeout | undefined; const typingIntervalMs = typingIntervalSeconds * 1000; @@ -30,6 +35,13 @@ export function createTypingController(params: { return `${Math.round(ms / 1000)}s`; }; + const resetCycle = () => { + started = false; + active = false; + runComplete = false; + dispatchIdle = false; + }; + const cleanup = () => { if (typingTtlTimer) { clearTimeout(typingTtlTimer); @@ -39,6 +51,7 @@ export function createTypingController(params: { clearInterval(typingTimer); typingTimer = undefined; } + resetCycle(); }; const refreshTypingTtl = () => { @@ -61,11 +74,22 @@ export function createTypingController(params: { }; const ensureStart = async () => { + if (!active) { + active = true; + runComplete = false; + dispatchIdle = false; + } if (started) return; started = true; await triggerTyping(); }; + const maybeStopOnIdle = () => { + if (!active) return; + // Stop only when the model run is done and the dispatcher queue is empty. + if (runComplete && dispatchIdle) cleanup(); + }; + const startTypingLoop = async () => { if (!onReplyStart) return; if (typingIntervalMs <= 0) return; @@ -85,11 +109,23 @@ export function createTypingController(params: { await startTypingLoop(); }; + const markRunComplete = () => { + runComplete = true; + maybeStopOnIdle(); + }; + + const markDispatchIdle = () => { + dispatchIdle = true; + maybeStopOnIdle(); + }; + return { onReplyStart: ensureStart, startTypingLoop, startTypingOnText, refreshTypingTtl, + markRunComplete, + markDispatchIdle, cleanup, }; } diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 3ab927358..62b6d75bb 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,5 +1,8 @@ +import type { TypingController } from "./reply/typing.js"; + export type GetReplyOptions = { onReplyStart?: () => Promise | void; + onTypingController?: (typing: TypingController) => void; isHeartbeat?: boolean; onPartialReply?: (payload: ReplyPayload) => Promise | void; onBlockReply?: (payload: ReplyPayload) => Promise | void; diff --git a/src/discord/monitor.ts b/src/discord/monitor.ts index 4a8ecebf6..c4dfd5546 100644 --- a/src/discord/monitor.ts +++ b/src/discord/monitor.ts @@ -24,6 +24,7 @@ import { } from "../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; +import type { TypingController } from "../auto-reply/reply/typing.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { DiscordSlashCommandConfig, @@ -541,6 +542,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { } let didSendReply = false; + let typingController: TypingController | undefined; const dispatcher = createReplyDispatcher({ responsePrefix: cfg.messages?.responsePrefix, deliver: async (payload) => { @@ -554,6 +556,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { }); didSendReply = true; }, + onIdle: () => { + typingController?.markDispatchIdle(); + }, onError: (err, info) => { runtime.error?.( danger(`discord ${info.kind} reply failed: ${String(err)}`), @@ -565,6 +570,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { ctxPayload, { onReplyStart: () => sendTyping(message), + onTypingController: (typing) => { + typingController = typing; + }, onToolResult: (payload) => { dispatcher.sendToolResult(payload); }, @@ -584,6 +592,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; } await dispatcher.waitForIdle(); + typingController?.markDispatchIdle(); if (!queuedFinal) { if ( isGuildMessage && diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 952550148..671f89ffb 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -13,6 +13,7 @@ import { } from "../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; +import type { TypingController } from "../auto-reply/reply/typing.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import type { ReplyToMode } from "../config/config.js"; import { loadConfig } from "../config/config.js"; @@ -235,6 +236,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { ); } + let typingController: TypingController | undefined; const dispatcher = createReplyDispatcher({ responsePrefix: cfg.messages?.responsePrefix, deliver: async (payload) => { @@ -248,6 +250,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { textLimit, }); }, + onIdle: () => { + typingController?.markDispatchIdle(); + }, onError: (err, info) => { runtime.error?.( danger(`telegram ${info.kind} reply failed: ${String(err)}`), @@ -259,6 +264,9 @@ export function createTelegramBot(opts: TelegramBotOptions) { ctxPayload, { onReplyStart: sendTyping, + onTypingController: (typing) => { + typingController = typing; + }, onToolResult: dispatcher.sendToolResult, onBlockReply: dispatcher.sendBlockReply, }, @@ -274,6 +282,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { queuedFinal = dispatcher.sendFinalReply(reply) || queuedFinal; } await dispatcher.waitForIdle(); + typingController?.markDispatchIdle(); if (!queuedFinal) return; } catch (err) { runtime.error?.(danger(`handler failed: ${String(err)}`)); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index c46afcf66..a5763e399 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -15,6 +15,7 @@ import { } from "../auto-reply/reply/mentions.js"; import { createReplyDispatcher } from "../auto-reply/reply/reply-dispatcher.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; +import type { TypingController } from "../auto-reply/reply/typing.js"; import { HEARTBEAT_TOKEN, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { waitForever } from "../cli/wait.js"; @@ -1113,6 +1114,7 @@ export async function monitorWebProvider( const textLimit = resolveTextChunkLimit(cfg, "whatsapp"); let didLogHeartbeatStrip = false; let didSendReply = false; + let typingController: TypingController | undefined; const dispatcher = createReplyDispatcher({ responsePrefix: cfg.messages?.responsePrefix, onHeartbeatStrip: () => { @@ -1163,6 +1165,9 @@ export async function monitorWebProvider( } } }, + onIdle: () => { + typingController?.markDispatchIdle(); + }, onError: (err, info) => { const label = info.kind === "tool" @@ -1202,6 +1207,9 @@ export async function monitorWebProvider( }, { onReplyStart: msg.sendComposing, + onTypingController: (typing) => { + typingController = typing; + }, onToolResult: (payload) => { dispatcher.sendToolResult(payload); }, @@ -1222,6 +1230,7 @@ export async function monitorWebProvider( queuedFinal = dispatcher.sendFinalReply(replyPayload) || queuedFinal; } await dispatcher.waitForIdle(); + typingController?.markDispatchIdle(); if (!queuedFinal) { if (shouldClearGroupHistory && didSendReply) { groupHistories.set(conversationId, []);