From 0cbc0510198d6c3cd7d1bc04e5604ddadbe66d60 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 25 Jan 2026 11:37:11 +0000 Subject: [PATCH] fix: prevent voice-call TTS overlap (#1713) (thanks @dguido) --- CHANGELOG.md | 1 + .../voice-call/src/media-stream.test.ts | 97 +++++++++++++ extensions/voice-call/src/media-stream.ts | 127 +++++++++++++----- .../src/providers/stt-openai-realtime.ts | 8 ++ extensions/voice-call/src/providers/twilio.ts | 1 - extensions/voice-call/src/webhook.ts | 5 + 6 files changed, 205 insertions(+), 34 deletions(-) create mode 100644 extensions/voice-call/src/media-stream.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ef37aef9..656abc4cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ Docs: https://docs.clawd.bot - Gateway: store lock files in the temp directory to avoid stale locks on persistent volumes. (#1676) - macOS: default direct-transport `ws://` URLs to port 18789; document `gateway.remote.transport`. (#1603) Thanks @ngutman. - Voice Call: return stream TwiML for outbound conversation calls on initial Twilio webhook. (#1634) +- Voice Call: serialize Twilio TTS playback and cancel on barge-in to prevent overlap. (#1713) Thanks @dguido. - Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy. - Google Chat: normalize space targets without double `spaces/` prefix. - Messaging: keep newline chunking safe for fenced markdown blocks across channels. diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts new file mode 100644 index 000000000..773445121 --- /dev/null +++ b/extensions/voice-call/src/media-stream.test.ts @@ -0,0 +1,97 @@ +import { describe, expect, it } from "vitest"; + +import type { + OpenAIRealtimeSTTProvider, + RealtimeSTTSession, +} from "./providers/stt-openai-realtime.js"; +import { MediaStreamHandler } from "./media-stream.js"; + +const createStubSession = (): RealtimeSTTSession => ({ + connect: async () => {}, + sendAudio: () => {}, + waitForTranscript: async () => "", + onPartial: () => {}, + onTranscript: () => {}, + onSpeechStart: () => {}, + close: () => {}, + isConnected: () => true, +}); + +const createStubSttProvider = (): OpenAIRealtimeSTTProvider => + ({ + createSession: () => createStubSession(), + }) as unknown as OpenAIRealtimeSTTProvider; + +const flush = async (): Promise => { + await new Promise((resolve) => setTimeout(resolve, 0)); +}; + +const waitForAbort = (signal: AbortSignal): Promise => + new Promise((resolve) => { + if (signal.aborted) { + resolve(); + return; + } + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + +describe("MediaStreamHandler TTS queue", () => { + it("serializes TTS playback and resolves in order", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + const started: number[] = []; + const finished: number[] = []; + + let resolveFirst!: () => void; + const firstGate = new Promise((resolve) => { + resolveFirst = resolve; + }); + + const first = handler.queueTts("stream-1", async () => { + started.push(1); + await firstGate; + finished.push(1); + }); + const second = handler.queueTts("stream-1", async () => { + started.push(2); + finished.push(2); + }); + + await flush(); + expect(started).toEqual([1]); + + resolveFirst(); + await first; + await second; + + expect(started).toEqual([1, 2]); + expect(finished).toEqual([1, 2]); + }); + + it("cancels active playback and clears queued items", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + }); + + let queuedRan = false; + const started: string[] = []; + + const active = handler.queueTts("stream-1", async (signal) => { + started.push("active"); + await waitForAbort(signal); + }); + void handler.queueTts("stream-1", async () => { + queuedRan = true; + }); + + await flush(); + expect(started).toEqual(["active"]); + + handler.clearTtsQueue("stream-1"); + await active; + await flush(); + + expect(queuedRan).toBe(false); + }); +}); diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index cae7279b6..e14dc9137 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -29,6 +29,8 @@ export interface MediaStreamConfig { onPartialTranscript?: (callId: string, partial: string) => void; /** Callback when stream connects */ onConnect?: (callId: string, streamSid: string) => void; + /** Callback when speech starts (barge-in) */ + onSpeechStart?: (callId: string) => void; /** Callback when stream disconnects */ onDisconnect?: (callId: string) => void; } @@ -43,6 +45,13 @@ interface StreamSession { sttSession: RealtimeSTTSession; } +type TtsQueueEntry = { + playFn: (signal: AbortSignal) => Promise; + controller: AbortController; + resolve: () => void; + reject: (error: unknown) => void; +}; + /** * Manages WebSocket connections for Twilio media streams. */ @@ -50,11 +59,12 @@ export class MediaStreamHandler { private wss: WebSocketServer | null = null; private sessions = new Map(); private config: MediaStreamConfig; - /** TTS playback queues per stream (serialize audio to prevent overlap) */ - private ttsQueues = new Map Promise>>(); + private ttsQueues = new Map(); /** Whether TTS is currently playing per stream */ private ttsPlaying = new Map(); + /** Active TTS playback controllers per stream */ + private ttsActiveControllers = new Map(); constructor(config: MediaStreamConfig) { this.config = config; @@ -153,6 +163,10 @@ export class MediaStreamHandler { this.config.onTranscript?.(callSid, transcript); }); + sttSession.onSpeechStart(() => { + this.config.onSpeechStart?.(callSid); + }); + const session: StreamSession = { callId: callSid, streamSid, @@ -182,6 +196,7 @@ export class MediaStreamHandler { private handleStop(session: StreamSession): void { console.log(`[MediaStream] Stream stopped: ${session.streamSid}`); + this.clearTtsState(session.streamSid); session.sttSession.close(); this.sessions.delete(session.streamSid); this.config.onDisconnect?.(session.callId); @@ -237,47 +252,39 @@ export class MediaStreamHandler { * Queue a TTS operation for sequential playback. * Only one TTS operation plays at a time per stream to prevent overlap. */ - async queueTts(streamSid: string, playFn: () => Promise): Promise { - if (!this.ttsQueues.has(streamSid)) { - this.ttsQueues.set(streamSid, []); - } - const queue = this.ttsQueues.get(streamSid)!; - queue.push(playFn); + async queueTts( + streamSid: string, + playFn: (signal: AbortSignal) => Promise, + ): Promise { + const queue = this.getTtsQueue(streamSid); + let resolveEntry: () => void; + let rejectEntry: (error: unknown) => void; + const promise = new Promise((resolve, reject) => { + resolveEntry = resolve; + rejectEntry = reject; + }); + + queue.push({ + playFn, + controller: new AbortController(), + resolve: resolveEntry!, + reject: rejectEntry!, + }); - // Process queue if not already playing if (!this.ttsPlaying.get(streamSid)) { - await this.processQueue(streamSid); + void this.processQueue(streamSid); } - } - /** - * Process the TTS queue for a stream. - * Uses iterative approach to avoid stack accumulation from recursion. - */ - private async processQueue(streamSid: string): Promise { - this.ttsPlaying.set(streamSid, true); - - while (true) { - const queue = this.ttsQueues.get(streamSid); - if (!queue || queue.length === 0) { - this.ttsPlaying.set(streamSid, false); - return; - } - - const playFn = queue.shift()!; - try { - await playFn(); - } catch (error) { - console.error("[MediaStream] TTS playback error:", error); - } - } + return promise; } /** * Clear TTS queue and interrupt current playback (barge-in). */ clearTtsQueue(streamSid: string): void { - this.ttsQueues.set(streamSid, []); + const queue = this.getTtsQueue(streamSid); + queue.length = 0; + this.ttsActiveControllers.get(streamSid)?.abort(); this.clearAudio(streamSid); } @@ -295,11 +302,65 @@ export class MediaStreamHandler { */ closeAll(): void { for (const session of this.sessions.values()) { + this.clearTtsState(session.streamSid); session.sttSession.close(); session.ws.close(); } this.sessions.clear(); } + + private getTtsQueue(streamSid: string): TtsQueueEntry[] { + const existing = this.ttsQueues.get(streamSid); + if (existing) return existing; + const queue: TtsQueueEntry[] = []; + this.ttsQueues.set(streamSid, queue); + return queue; + } + + /** + * Process the TTS queue for a stream. + * Uses iterative approach to avoid stack accumulation from recursion. + */ + private async processQueue(streamSid: string): Promise { + this.ttsPlaying.set(streamSid, true); + + while (true) { + const queue = this.ttsQueues.get(streamSid); + if (!queue || queue.length === 0) { + this.ttsPlaying.set(streamSid, false); + this.ttsActiveControllers.delete(streamSid); + return; + } + + const entry = queue.shift()!; + this.ttsActiveControllers.set(streamSid, entry.controller); + + try { + await entry.playFn(entry.controller.signal); + entry.resolve(); + } catch (error) { + if (entry.controller.signal.aborted) { + entry.resolve(); + } else { + console.error("[MediaStream] TTS playback error:", error); + entry.reject(error); + } + } finally { + if (this.ttsActiveControllers.get(streamSid) === entry.controller) { + this.ttsActiveControllers.delete(streamSid); + } + } + } + } + + private clearTtsState(streamSid: string): void { + const queue = this.ttsQueues.get(streamSid); + if (queue) queue.length = 0; + this.ttsActiveControllers.get(streamSid)?.abort(); + this.ttsActiveControllers.delete(streamSid); + this.ttsPlaying.delete(streamSid); + this.ttsQueues.delete(streamSid); + } } /** diff --git a/extensions/voice-call/src/providers/stt-openai-realtime.ts b/extensions/voice-call/src/providers/stt-openai-realtime.ts index 01c698f21..5cd52658d 100644 --- a/extensions/voice-call/src/providers/stt-openai-realtime.ts +++ b/extensions/voice-call/src/providers/stt-openai-realtime.ts @@ -38,6 +38,8 @@ export interface RealtimeSTTSession { onPartial(callback: (partial: string) => void): void; /** Set callback for final transcripts */ onTranscript(callback: (transcript: string) => void): void; + /** Set callback when speech starts (VAD) */ + onSpeechStart(callback: () => void): void; /** Close the session */ close(): void; /** Check if session is connected */ @@ -91,6 +93,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { private pendingTranscript = ""; private onTranscriptCallback: ((transcript: string) => void) | null = null; private onPartialCallback: ((partial: string) => void) | null = null; + private onSpeechStartCallback: (() => void) | null = null; constructor( private readonly apiKey: string, @@ -243,6 +246,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { case "input_audio_buffer.speech_started": console.log("[RealtimeSTT] Speech started"); this.pendingTranscript = ""; + this.onSpeechStartCallback?.(); break; case "error": @@ -273,6 +277,10 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession { this.onTranscriptCallback = callback; } + onSpeechStart(callback: () => void): void { + this.onSpeechStartCallback = callback; + } + async waitForTranscript(timeoutMs = 30000): Promise { return new Promise((resolve, reject) => { const timeout = setTimeout(() => { diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index d77b29f6c..be9dd6eda 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -534,7 +534,6 @@ export class TwilioProvider implements VoiceCallProvider { await handler.queueTts(streamSid, async (signal) => { // Generate audio with core TTS (returns mu-law at 8kHz) const muLawAudio = await ttsProvider.synthesizeForTelephony(text); - for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) { if (signal.aborted) break; handler.sendAudio(streamSid, chunk); diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 56e104ff8..6ab4d0eed 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -114,6 +114,11 @@ export class VoiceCallWebhookServer { }); } }, + onSpeechStart: (providerCallId) => { + if (this.provider.name === "twilio") { + (this.provider as TwilioProvider).clearTtsQueue(providerCallId); + } + }, onPartialTranscript: (callId, partial) => { console.log(`[voice-call] Partial for ${callId}: ${partial}`); },