From 31dbc62bdd3d95ccc35311c938ac678d1c84e672 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 6 Jan 2026 18:56:40 +0000 Subject: [PATCH] fix(telegram): prevent stuck typing after tool runs --- src/auto-reply/reply/agent-runner.ts | 8 +++++++- src/auto-reply/reply/typing.test.ts | 24 ++++++++++++++++++++++++ src/auto-reply/reply/typing.ts | 13 +++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 15ff46ab8..d4e7ba652 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -314,6 +314,9 @@ export async function runReplyAgent(params: { shouldEmitToolResult, onToolResult: opts?.onToolResult ? (payload) => { + // `subscribeEmbeddedPiSession` may invoke tool callbacks without awaiting them. + // If a tool callback starts typing after the run finalized, we can end up with + // a typing loop that never sees a matching markRunComplete(). Track and drain. const task = (async () => { let text = payload.text; if (!isHeartbeat && text?.includes("HEARTBEAT_OK")) { @@ -384,13 +387,16 @@ export async function runReplyAgent(params: { } const payloadArray = runResult.payloads ?? []; - if (payloadArray.length === 0) return finalizeWithFollowup(undefined); if (pendingBlockTasks.size > 0) { await Promise.allSettled(pendingBlockTasks); } if (pendingToolTasks.size > 0) { await Promise.allSettled(pendingToolTasks); } + // Drain any late tool/block deliveries before deciding there's "nothing to send". + // Otherwise, a late typing trigger (e.g. from a tool callback) can outlive the run and + // keep the typing indicator stuck. + if (payloadArray.length === 0) return finalizeWithFollowup(undefined); const sanitizedPayloads = isHeartbeat ? payloadArray diff --git a/src/auto-reply/reply/typing.test.ts b/src/auto-reply/reply/typing.test.ts index 4026eec13..18c3fd322 100644 --- a/src/auto-reply/reply/typing.test.ts +++ b/src/auto-reply/reply/typing.test.ts @@ -51,4 +51,28 @@ describe("typing controller", () => { vi.advanceTimersByTime(2_000); expect(onReplyStart).toHaveBeenCalledTimes(3); }); + + it("does not restart typing after it has stopped", async () => { + vi.useFakeTimers(); + const onReplyStart = vi.fn(async () => {}); + const typing = createTypingController({ + onReplyStart, + typingIntervalSeconds: 1, + typingTtlMs: 30_000, + }); + + await typing.startTypingLoop(); + expect(onReplyStart).toHaveBeenCalledTimes(1); + + typing.markRunComplete(); + typing.markDispatchIdle(); + + vi.advanceTimersByTime(5_000); + expect(onReplyStart).toHaveBeenCalledTimes(1); + + // Late callbacks should be ignored and must not restart the interval. + await typing.startTypingOnText("late tool result"); + vi.advanceTimersByTime(5_000); + expect(onReplyStart).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/auto-reply/reply/typing.ts b/src/auto-reply/reply/typing.ts index 8a2077652..7850ec132 100644 --- a/src/auto-reply/reply/typing.ts +++ b/src/auto-reply/reply/typing.ts @@ -26,6 +26,10 @@ export function createTypingController(params: { let active = false; let runComplete = false; let dispatchIdle = false; + // Important: callbacks (tool/block streaming) can fire late (after the run completed), + // especially when upstream event emitters don't await async listeners. + // Once we stop typing, we "seal" the controller so late events can't restart typing forever. + let sealed = false; let typingTimer: NodeJS.Timeout | undefined; let typingTtlTimer: NodeJS.Timeout | undefined; const typingIntervalMs = typingIntervalSeconds * 1000; @@ -43,6 +47,7 @@ export function createTypingController(params: { }; const cleanup = () => { + if (sealed) return; if (typingTtlTimer) { clearTimeout(typingTtlTimer); typingTtlTimer = undefined; @@ -52,9 +57,11 @@ export function createTypingController(params: { typingTimer = undefined; } resetCycle(); + sealed = true; }; const refreshTypingTtl = () => { + if (sealed) return; if (!typingIntervalMs || typingIntervalMs <= 0) return; if (typingTtlMs <= 0) return; if (typingTtlTimer) { @@ -70,10 +77,14 @@ export function createTypingController(params: { }; const triggerTyping = async () => { + if (sealed) return; await onReplyStart?.(); }; const ensureStart = async () => { + if (sealed) return; + // Late callbacks after a run completed should never restart typing. + if (runComplete) return; if (!active) { active = true; } @@ -89,6 +100,7 @@ export function createTypingController(params: { }; const startTypingLoop = async () => { + if (sealed) return; if (!onReplyStart) return; if (typingIntervalMs <= 0) return; if (typingTimer) return; @@ -100,6 +112,7 @@ export function createTypingController(params: { }; const startTypingOnText = async (text?: string) => { + if (sealed) return; const trimmed = text?.trim(); if (!trimmed) return; if (silentToken && trimmed === silentToken) return;