Compare commits
3 Commits
main
...
fix/voice-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0cbc051019 | ||
|
|
1005934964 | ||
|
|
76014685eb |
@ -51,6 +51,7 @@ Docs: https://docs.clawd.bot
|
||||
- Gateway: store lock files in the temp directory to avoid stale locks on persistent volumes. (#1676)
|
||||
- macOS: default direct-transport `ws://` URLs to port 18789; document `gateway.remote.transport`. (#1603) Thanks @ngutman.
|
||||
- Voice Call: return stream TwiML for outbound conversation calls on initial Twilio webhook. (#1634)
|
||||
- Voice Call: serialize Twilio TTS playback and cancel on barge-in to prevent overlap. (#1713) Thanks @dguido.
|
||||
- Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy.
|
||||
- Google Chat: normalize space targets without double `spaces/` prefix.
|
||||
- Messaging: keep newline chunking safe for fenced markdown blocks across channels.
|
||||
|
||||
97
extensions/voice-call/src/media-stream.test.ts
Normal file
97
extensions/voice-call/src/media-stream.test.ts
Normal file
@ -0,0 +1,97 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import type {
|
||||
OpenAIRealtimeSTTProvider,
|
||||
RealtimeSTTSession,
|
||||
} from "./providers/stt-openai-realtime.js";
|
||||
import { MediaStreamHandler } from "./media-stream.js";
|
||||
|
||||
const createStubSession = (): RealtimeSTTSession => ({
|
||||
connect: async () => {},
|
||||
sendAudio: () => {},
|
||||
waitForTranscript: async () => "",
|
||||
onPartial: () => {},
|
||||
onTranscript: () => {},
|
||||
onSpeechStart: () => {},
|
||||
close: () => {},
|
||||
isConnected: () => true,
|
||||
});
|
||||
|
||||
const createStubSttProvider = (): OpenAIRealtimeSTTProvider =>
|
||||
({
|
||||
createSession: () => createStubSession(),
|
||||
}) as unknown as OpenAIRealtimeSTTProvider;
|
||||
|
||||
const flush = async (): Promise<void> => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
};
|
||||
|
||||
const waitForAbort = (signal: AbortSignal): Promise<void> =>
|
||||
new Promise((resolve) => {
|
||||
if (signal.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
|
||||
describe("MediaStreamHandler TTS queue", () => {
|
||||
it("serializes TTS playback and resolves in order", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
});
|
||||
const started: number[] = [];
|
||||
const finished: number[] = [];
|
||||
|
||||
let resolveFirst!: () => void;
|
||||
const firstGate = new Promise<void>((resolve) => {
|
||||
resolveFirst = resolve;
|
||||
});
|
||||
|
||||
const first = handler.queueTts("stream-1", async () => {
|
||||
started.push(1);
|
||||
await firstGate;
|
||||
finished.push(1);
|
||||
});
|
||||
const second = handler.queueTts("stream-1", async () => {
|
||||
started.push(2);
|
||||
finished.push(2);
|
||||
});
|
||||
|
||||
await flush();
|
||||
expect(started).toEqual([1]);
|
||||
|
||||
resolveFirst();
|
||||
await first;
|
||||
await second;
|
||||
|
||||
expect(started).toEqual([1, 2]);
|
||||
expect(finished).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("cancels active playback and clears queued items", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
});
|
||||
|
||||
let queuedRan = false;
|
||||
const started: string[] = [];
|
||||
|
||||
const active = handler.queueTts("stream-1", async (signal) => {
|
||||
started.push("active");
|
||||
await waitForAbort(signal);
|
||||
});
|
||||
void handler.queueTts("stream-1", async () => {
|
||||
queuedRan = true;
|
||||
});
|
||||
|
||||
await flush();
|
||||
expect(started).toEqual(["active"]);
|
||||
|
||||
handler.clearTtsQueue("stream-1");
|
||||
await active;
|
||||
await flush();
|
||||
|
||||
expect(queuedRan).toBe(false);
|
||||
});
|
||||
});
|
||||
@ -29,6 +29,8 @@ export interface MediaStreamConfig {
|
||||
onPartialTranscript?: (callId: string, partial: string) => void;
|
||||
/** Callback when stream connects */
|
||||
onConnect?: (callId: string, streamSid: string) => void;
|
||||
/** Callback when speech starts (barge-in) */
|
||||
onSpeechStart?: (callId: string) => void;
|
||||
/** Callback when stream disconnects */
|
||||
onDisconnect?: (callId: string) => void;
|
||||
}
|
||||
@ -43,6 +45,13 @@ interface StreamSession {
|
||||
sttSession: RealtimeSTTSession;
|
||||
}
|
||||
|
||||
type TtsQueueEntry = {
|
||||
playFn: (signal: AbortSignal) => Promise<void>;
|
||||
controller: AbortController;
|
||||
resolve: () => void;
|
||||
reject: (error: unknown) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Manages WebSocket connections for Twilio media streams.
|
||||
*/
|
||||
@ -50,6 +59,12 @@ export class MediaStreamHandler {
|
||||
private wss: WebSocketServer | null = null;
|
||||
private sessions = new Map<string, StreamSession>();
|
||||
private config: MediaStreamConfig;
|
||||
/** TTS playback queues per stream (serialize audio to prevent overlap) */
|
||||
private ttsQueues = new Map<string, TtsQueueEntry[]>();
|
||||
/** Whether TTS is currently playing per stream */
|
||||
private ttsPlaying = new Map<string, boolean>();
|
||||
/** Active TTS playback controllers per stream */
|
||||
private ttsActiveControllers = new Map<string, AbortController>();
|
||||
|
||||
constructor(config: MediaStreamConfig) {
|
||||
this.config = config;
|
||||
@ -148,6 +163,10 @@ export class MediaStreamHandler {
|
||||
this.config.onTranscript?.(callSid, transcript);
|
||||
});
|
||||
|
||||
sttSession.onSpeechStart(() => {
|
||||
this.config.onSpeechStart?.(callSid);
|
||||
});
|
||||
|
||||
const session: StreamSession = {
|
||||
callId: callSid,
|
||||
streamSid,
|
||||
@ -177,6 +196,7 @@ export class MediaStreamHandler {
|
||||
private handleStop(session: StreamSession): void {
|
||||
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
|
||||
|
||||
this.clearTtsState(session.streamSid);
|
||||
session.sttSession.close();
|
||||
this.sessions.delete(session.streamSid);
|
||||
this.config.onDisconnect?.(session.callId);
|
||||
@ -228,6 +248,46 @@ export class MediaStreamHandler {
|
||||
this.sendToStream(streamSid, { event: "clear", streamSid });
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a TTS operation for sequential playback.
|
||||
* Only one TTS operation plays at a time per stream to prevent overlap.
|
||||
*/
|
||||
async queueTts(
|
||||
streamSid: string,
|
||||
playFn: (signal: AbortSignal) => Promise<void>,
|
||||
): Promise<void> {
|
||||
const queue = this.getTtsQueue(streamSid);
|
||||
let resolveEntry: () => void;
|
||||
let rejectEntry: (error: unknown) => void;
|
||||
const promise = new Promise<void>((resolve, reject) => {
|
||||
resolveEntry = resolve;
|
||||
rejectEntry = reject;
|
||||
});
|
||||
|
||||
queue.push({
|
||||
playFn,
|
||||
controller: new AbortController(),
|
||||
resolve: resolveEntry!,
|
||||
reject: rejectEntry!,
|
||||
});
|
||||
|
||||
if (!this.ttsPlaying.get(streamSid)) {
|
||||
void this.processQueue(streamSid);
|
||||
}
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear TTS queue and interrupt current playback (barge-in).
|
||||
*/
|
||||
clearTtsQueue(streamSid: string): void {
|
||||
const queue = this.getTtsQueue(streamSid);
|
||||
queue.length = 0;
|
||||
this.ttsActiveControllers.get(streamSid)?.abort();
|
||||
this.clearAudio(streamSid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get active session by call ID.
|
||||
*/
|
||||
@ -242,11 +302,65 @@ export class MediaStreamHandler {
|
||||
*/
|
||||
closeAll(): void {
|
||||
for (const session of this.sessions.values()) {
|
||||
this.clearTtsState(session.streamSid);
|
||||
session.sttSession.close();
|
||||
session.ws.close();
|
||||
}
|
||||
this.sessions.clear();
|
||||
}
|
||||
|
||||
private getTtsQueue(streamSid: string): TtsQueueEntry[] {
|
||||
const existing = this.ttsQueues.get(streamSid);
|
||||
if (existing) return existing;
|
||||
const queue: TtsQueueEntry[] = [];
|
||||
this.ttsQueues.set(streamSid, queue);
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the TTS queue for a stream.
|
||||
* Uses iterative approach to avoid stack accumulation from recursion.
|
||||
*/
|
||||
private async processQueue(streamSid: string): Promise<void> {
|
||||
this.ttsPlaying.set(streamSid, true);
|
||||
|
||||
while (true) {
|
||||
const queue = this.ttsQueues.get(streamSid);
|
||||
if (!queue || queue.length === 0) {
|
||||
this.ttsPlaying.set(streamSid, false);
|
||||
this.ttsActiveControllers.delete(streamSid);
|
||||
return;
|
||||
}
|
||||
|
||||
const entry = queue.shift()!;
|
||||
this.ttsActiveControllers.set(streamSid, entry.controller);
|
||||
|
||||
try {
|
||||
await entry.playFn(entry.controller.signal);
|
||||
entry.resolve();
|
||||
} catch (error) {
|
||||
if (entry.controller.signal.aborted) {
|
||||
entry.resolve();
|
||||
} else {
|
||||
console.error("[MediaStream] TTS playback error:", error);
|
||||
entry.reject(error);
|
||||
}
|
||||
} finally {
|
||||
if (this.ttsActiveControllers.get(streamSid) === entry.controller) {
|
||||
this.ttsActiveControllers.delete(streamSid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private clearTtsState(streamSid: string): void {
|
||||
const queue = this.ttsQueues.get(streamSid);
|
||||
if (queue) queue.length = 0;
|
||||
this.ttsActiveControllers.get(streamSid)?.abort();
|
||||
this.ttsActiveControllers.delete(streamSid);
|
||||
this.ttsPlaying.delete(streamSid);
|
||||
this.ttsQueues.delete(streamSid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -38,6 +38,8 @@ export interface RealtimeSTTSession {
|
||||
onPartial(callback: (partial: string) => void): void;
|
||||
/** Set callback for final transcripts */
|
||||
onTranscript(callback: (transcript: string) => void): void;
|
||||
/** Set callback when speech starts (VAD) */
|
||||
onSpeechStart(callback: () => void): void;
|
||||
/** Close the session */
|
||||
close(): void;
|
||||
/** Check if session is connected */
|
||||
@ -91,6 +93,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
||||
private pendingTranscript = "";
|
||||
private onTranscriptCallback: ((transcript: string) => void) | null = null;
|
||||
private onPartialCallback: ((partial: string) => void) | null = null;
|
||||
private onSpeechStartCallback: (() => void) | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly apiKey: string,
|
||||
@ -243,6 +246,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
||||
case "input_audio_buffer.speech_started":
|
||||
console.log("[RealtimeSTT] Speech started");
|
||||
this.pendingTranscript = "";
|
||||
this.onSpeechStartCallback?.();
|
||||
break;
|
||||
|
||||
case "error":
|
||||
@ -273,6 +277,10 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
||||
this.onTranscriptCallback = callback;
|
||||
}
|
||||
|
||||
onSpeechStart(callback: () => void): void {
|
||||
this.onSpeechStartCallback = callback;
|
||||
}
|
||||
|
||||
async waitForTranscript(timeoutMs = 30000): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
|
||||
@ -135,6 +135,17 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
this.callStreamMap.delete(callSid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear TTS queue for a call (barge-in).
|
||||
* Used when user starts speaking to interrupt current TTS playback.
|
||||
*/
|
||||
clearTtsQueue(callSid: string): void {
|
||||
const streamSid = this.callStreamMap.get(callSid);
|
||||
if (streamSid && this.mediaStreamHandler) {
|
||||
this.mediaStreamHandler.clearTtsQueue(streamSid);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make an authenticated request to the Twilio API.
|
||||
*/
|
||||
@ -504,7 +515,7 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
/**
|
||||
* Play TTS via core TTS and Twilio Media Streams.
|
||||
* Generates audio with core TTS, converts to mu-law, and streams via WebSocket.
|
||||
* Uses a jitter buffer to smooth out timing variations.
|
||||
* Uses a queue to serialize playback and prevent overlapping audio.
|
||||
*/
|
||||
private async playTtsViaStream(
|
||||
text: string,
|
||||
@ -514,22 +525,29 @@ export class TwilioProvider implements VoiceCallProvider {
|
||||
throw new Error("TTS provider and media stream handler required");
|
||||
}
|
||||
|
||||
// Generate audio with core TTS (returns mu-law at 8kHz)
|
||||
const muLawAudio = await this.ttsProvider.synthesizeForTelephony(text);
|
||||
|
||||
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
|
||||
const CHUNK_SIZE = 160;
|
||||
const CHUNK_DELAY_MS = 20;
|
||||
|
||||
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
|
||||
this.mediaStreamHandler.sendAudio(streamSid, chunk);
|
||||
const handler = this.mediaStreamHandler;
|
||||
const ttsProvider = this.ttsProvider;
|
||||
await handler.queueTts(streamSid, async (signal) => {
|
||||
// Generate audio with core TTS (returns mu-law at 8kHz)
|
||||
const muLawAudio = await ttsProvider.synthesizeForTelephony(text);
|
||||
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
|
||||
if (signal.aborted) break;
|
||||
handler.sendAudio(streamSid, chunk);
|
||||
|
||||
// Pace the audio to match real-time playback
|
||||
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
|
||||
}
|
||||
// Pace the audio to match real-time playback
|
||||
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
|
||||
if (signal.aborted) break;
|
||||
}
|
||||
|
||||
// Send a mark to track when audio finishes
|
||||
this.mediaStreamHandler.sendMark(streamSid, `tts-${Date.now()}`);
|
||||
if (!signal.aborted) {
|
||||
// Send a mark to track when audio finishes
|
||||
handler.sendMark(streamSid, `tts-${Date.now()}`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -78,6 +78,11 @@ export class VoiceCallWebhookServer {
|
||||
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
|
||||
);
|
||||
|
||||
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
|
||||
if (this.provider.name === "twilio") {
|
||||
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
|
||||
}
|
||||
|
||||
// Look up our internal call ID from the provider call ID
|
||||
const call = this.manager.getCallByProviderCallId(providerCallId);
|
||||
if (!call) {
|
||||
@ -109,6 +114,11 @@ export class VoiceCallWebhookServer {
|
||||
});
|
||||
}
|
||||
},
|
||||
onSpeechStart: (providerCallId) => {
|
||||
if (this.provider.name === "twilio") {
|
||||
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
|
||||
}
|
||||
},
|
||||
onPartialTranscript: (callId, partial) => {
|
||||
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
|
||||
},
|
||||
|
||||
Loading…
Reference in New Issue
Block a user