Compare commits
3 Commits
main
...
fix/voice-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0cbc051019 | ||
|
|
1005934964 | ||
|
|
76014685eb |
@ -51,6 +51,7 @@ Docs: https://docs.clawd.bot
|
|||||||
- Gateway: store lock files in the temp directory to avoid stale locks on persistent volumes. (#1676)
|
- Gateway: store lock files in the temp directory to avoid stale locks on persistent volumes. (#1676)
|
||||||
- macOS: default direct-transport `ws://` URLs to port 18789; document `gateway.remote.transport`. (#1603) Thanks @ngutman.
|
- macOS: default direct-transport `ws://` URLs to port 18789; document `gateway.remote.transport`. (#1603) Thanks @ngutman.
|
||||||
- Voice Call: return stream TwiML for outbound conversation calls on initial Twilio webhook. (#1634)
|
- Voice Call: return stream TwiML for outbound conversation calls on initial Twilio webhook. (#1634)
|
||||||
|
- Voice Call: serialize Twilio TTS playback and cancel on barge-in to prevent overlap. (#1713) Thanks @dguido.
|
||||||
- Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy.
|
- Google Chat: tighten email allowlist matching, typing cleanup, media caps, and onboarding/docs/tests. (#1635) Thanks @iHildy.
|
||||||
- Google Chat: normalize space targets without double `spaces/` prefix.
|
- Google Chat: normalize space targets without double `spaces/` prefix.
|
||||||
- Messaging: keep newline chunking safe for fenced markdown blocks across channels.
|
- Messaging: keep newline chunking safe for fenced markdown blocks across channels.
|
||||||
|
|||||||
97
extensions/voice-call/src/media-stream.test.ts
Normal file
97
extensions/voice-call/src/media-stream.test.ts
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
|
import type {
|
||||||
|
OpenAIRealtimeSTTProvider,
|
||||||
|
RealtimeSTTSession,
|
||||||
|
} from "./providers/stt-openai-realtime.js";
|
||||||
|
import { MediaStreamHandler } from "./media-stream.js";
|
||||||
|
|
||||||
|
const createStubSession = (): RealtimeSTTSession => ({
|
||||||
|
connect: async () => {},
|
||||||
|
sendAudio: () => {},
|
||||||
|
waitForTranscript: async () => "",
|
||||||
|
onPartial: () => {},
|
||||||
|
onTranscript: () => {},
|
||||||
|
onSpeechStart: () => {},
|
||||||
|
close: () => {},
|
||||||
|
isConnected: () => true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const createStubSttProvider = (): OpenAIRealtimeSTTProvider =>
|
||||||
|
({
|
||||||
|
createSession: () => createStubSession(),
|
||||||
|
}) as unknown as OpenAIRealtimeSTTProvider;
|
||||||
|
|
||||||
|
const flush = async (): Promise<void> => {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||||
|
};
|
||||||
|
|
||||||
|
const waitForAbort = (signal: AbortSignal): Promise<void> =>
|
||||||
|
new Promise((resolve) => {
|
||||||
|
if (signal.aborted) {
|
||||||
|
resolve();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("MediaStreamHandler TTS queue", () => {
|
||||||
|
it("serializes TTS playback and resolves in order", async () => {
|
||||||
|
const handler = new MediaStreamHandler({
|
||||||
|
sttProvider: createStubSttProvider(),
|
||||||
|
});
|
||||||
|
const started: number[] = [];
|
||||||
|
const finished: number[] = [];
|
||||||
|
|
||||||
|
let resolveFirst!: () => void;
|
||||||
|
const firstGate = new Promise<void>((resolve) => {
|
||||||
|
resolveFirst = resolve;
|
||||||
|
});
|
||||||
|
|
||||||
|
const first = handler.queueTts("stream-1", async () => {
|
||||||
|
started.push(1);
|
||||||
|
await firstGate;
|
||||||
|
finished.push(1);
|
||||||
|
});
|
||||||
|
const second = handler.queueTts("stream-1", async () => {
|
||||||
|
started.push(2);
|
||||||
|
finished.push(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
await flush();
|
||||||
|
expect(started).toEqual([1]);
|
||||||
|
|
||||||
|
resolveFirst();
|
||||||
|
await first;
|
||||||
|
await second;
|
||||||
|
|
||||||
|
expect(started).toEqual([1, 2]);
|
||||||
|
expect(finished).toEqual([1, 2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("cancels active playback and clears queued items", async () => {
|
||||||
|
const handler = new MediaStreamHandler({
|
||||||
|
sttProvider: createStubSttProvider(),
|
||||||
|
});
|
||||||
|
|
||||||
|
let queuedRan = false;
|
||||||
|
const started: string[] = [];
|
||||||
|
|
||||||
|
const active = handler.queueTts("stream-1", async (signal) => {
|
||||||
|
started.push("active");
|
||||||
|
await waitForAbort(signal);
|
||||||
|
});
|
||||||
|
void handler.queueTts("stream-1", async () => {
|
||||||
|
queuedRan = true;
|
||||||
|
});
|
||||||
|
|
||||||
|
await flush();
|
||||||
|
expect(started).toEqual(["active"]);
|
||||||
|
|
||||||
|
handler.clearTtsQueue("stream-1");
|
||||||
|
await active;
|
||||||
|
await flush();
|
||||||
|
|
||||||
|
expect(queuedRan).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -29,6 +29,8 @@ export interface MediaStreamConfig {
|
|||||||
onPartialTranscript?: (callId: string, partial: string) => void;
|
onPartialTranscript?: (callId: string, partial: string) => void;
|
||||||
/** Callback when stream connects */
|
/** Callback when stream connects */
|
||||||
onConnect?: (callId: string, streamSid: string) => void;
|
onConnect?: (callId: string, streamSid: string) => void;
|
||||||
|
/** Callback when speech starts (barge-in) */
|
||||||
|
onSpeechStart?: (callId: string) => void;
|
||||||
/** Callback when stream disconnects */
|
/** Callback when stream disconnects */
|
||||||
onDisconnect?: (callId: string) => void;
|
onDisconnect?: (callId: string) => void;
|
||||||
}
|
}
|
||||||
@ -43,6 +45,13 @@ interface StreamSession {
|
|||||||
sttSession: RealtimeSTTSession;
|
sttSession: RealtimeSTTSession;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TtsQueueEntry = {
|
||||||
|
playFn: (signal: AbortSignal) => Promise<void>;
|
||||||
|
controller: AbortController;
|
||||||
|
resolve: () => void;
|
||||||
|
reject: (error: unknown) => void;
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages WebSocket connections for Twilio media streams.
|
* Manages WebSocket connections for Twilio media streams.
|
||||||
*/
|
*/
|
||||||
@ -50,6 +59,12 @@ export class MediaStreamHandler {
|
|||||||
private wss: WebSocketServer | null = null;
|
private wss: WebSocketServer | null = null;
|
||||||
private sessions = new Map<string, StreamSession>();
|
private sessions = new Map<string, StreamSession>();
|
||||||
private config: MediaStreamConfig;
|
private config: MediaStreamConfig;
|
||||||
|
/** TTS playback queues per stream (serialize audio to prevent overlap) */
|
||||||
|
private ttsQueues = new Map<string, TtsQueueEntry[]>();
|
||||||
|
/** Whether TTS is currently playing per stream */
|
||||||
|
private ttsPlaying = new Map<string, boolean>();
|
||||||
|
/** Active TTS playback controllers per stream */
|
||||||
|
private ttsActiveControllers = new Map<string, AbortController>();
|
||||||
|
|
||||||
constructor(config: MediaStreamConfig) {
|
constructor(config: MediaStreamConfig) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
@ -148,6 +163,10 @@ export class MediaStreamHandler {
|
|||||||
this.config.onTranscript?.(callSid, transcript);
|
this.config.onTranscript?.(callSid, transcript);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
sttSession.onSpeechStart(() => {
|
||||||
|
this.config.onSpeechStart?.(callSid);
|
||||||
|
});
|
||||||
|
|
||||||
const session: StreamSession = {
|
const session: StreamSession = {
|
||||||
callId: callSid,
|
callId: callSid,
|
||||||
streamSid,
|
streamSid,
|
||||||
@ -177,6 +196,7 @@ export class MediaStreamHandler {
|
|||||||
private handleStop(session: StreamSession): void {
|
private handleStop(session: StreamSession): void {
|
||||||
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
|
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
|
||||||
|
|
||||||
|
this.clearTtsState(session.streamSid);
|
||||||
session.sttSession.close();
|
session.sttSession.close();
|
||||||
this.sessions.delete(session.streamSid);
|
this.sessions.delete(session.streamSid);
|
||||||
this.config.onDisconnect?.(session.callId);
|
this.config.onDisconnect?.(session.callId);
|
||||||
@ -228,6 +248,46 @@ export class MediaStreamHandler {
|
|||||||
this.sendToStream(streamSid, { event: "clear", streamSid });
|
this.sendToStream(streamSid, { event: "clear", streamSid });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue a TTS operation for sequential playback.
|
||||||
|
* Only one TTS operation plays at a time per stream to prevent overlap.
|
||||||
|
*/
|
||||||
|
async queueTts(
|
||||||
|
streamSid: string,
|
||||||
|
playFn: (signal: AbortSignal) => Promise<void>,
|
||||||
|
): Promise<void> {
|
||||||
|
const queue = this.getTtsQueue(streamSid);
|
||||||
|
let resolveEntry: () => void;
|
||||||
|
let rejectEntry: (error: unknown) => void;
|
||||||
|
const promise = new Promise<void>((resolve, reject) => {
|
||||||
|
resolveEntry = resolve;
|
||||||
|
rejectEntry = reject;
|
||||||
|
});
|
||||||
|
|
||||||
|
queue.push({
|
||||||
|
playFn,
|
||||||
|
controller: new AbortController(),
|
||||||
|
resolve: resolveEntry!,
|
||||||
|
reject: rejectEntry!,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!this.ttsPlaying.get(streamSid)) {
|
||||||
|
void this.processQueue(streamSid);
|
||||||
|
}
|
||||||
|
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear TTS queue and interrupt current playback (barge-in).
|
||||||
|
*/
|
||||||
|
clearTtsQueue(streamSid: string): void {
|
||||||
|
const queue = this.getTtsQueue(streamSid);
|
||||||
|
queue.length = 0;
|
||||||
|
this.ttsActiveControllers.get(streamSid)?.abort();
|
||||||
|
this.clearAudio(streamSid);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get active session by call ID.
|
* Get active session by call ID.
|
||||||
*/
|
*/
|
||||||
@ -242,11 +302,65 @@ export class MediaStreamHandler {
|
|||||||
*/
|
*/
|
||||||
closeAll(): void {
|
closeAll(): void {
|
||||||
for (const session of this.sessions.values()) {
|
for (const session of this.sessions.values()) {
|
||||||
|
this.clearTtsState(session.streamSid);
|
||||||
session.sttSession.close();
|
session.sttSession.close();
|
||||||
session.ws.close();
|
session.ws.close();
|
||||||
}
|
}
|
||||||
this.sessions.clear();
|
this.sessions.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getTtsQueue(streamSid: string): TtsQueueEntry[] {
|
||||||
|
const existing = this.ttsQueues.get(streamSid);
|
||||||
|
if (existing) return existing;
|
||||||
|
const queue: TtsQueueEntry[] = [];
|
||||||
|
this.ttsQueues.set(streamSid, queue);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process the TTS queue for a stream.
|
||||||
|
* Uses iterative approach to avoid stack accumulation from recursion.
|
||||||
|
*/
|
||||||
|
private async processQueue(streamSid: string): Promise<void> {
|
||||||
|
this.ttsPlaying.set(streamSid, true);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const queue = this.ttsQueues.get(streamSid);
|
||||||
|
if (!queue || queue.length === 0) {
|
||||||
|
this.ttsPlaying.set(streamSid, false);
|
||||||
|
this.ttsActiveControllers.delete(streamSid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const entry = queue.shift()!;
|
||||||
|
this.ttsActiveControllers.set(streamSid, entry.controller);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await entry.playFn(entry.controller.signal);
|
||||||
|
entry.resolve();
|
||||||
|
} catch (error) {
|
||||||
|
if (entry.controller.signal.aborted) {
|
||||||
|
entry.resolve();
|
||||||
|
} else {
|
||||||
|
console.error("[MediaStream] TTS playback error:", error);
|
||||||
|
entry.reject(error);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (this.ttsActiveControllers.get(streamSid) === entry.controller) {
|
||||||
|
this.ttsActiveControllers.delete(streamSid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearTtsState(streamSid: string): void {
|
||||||
|
const queue = this.ttsQueues.get(streamSid);
|
||||||
|
if (queue) queue.length = 0;
|
||||||
|
this.ttsActiveControllers.get(streamSid)?.abort();
|
||||||
|
this.ttsActiveControllers.delete(streamSid);
|
||||||
|
this.ttsPlaying.delete(streamSid);
|
||||||
|
this.ttsQueues.delete(streamSid);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -38,6 +38,8 @@ export interface RealtimeSTTSession {
|
|||||||
onPartial(callback: (partial: string) => void): void;
|
onPartial(callback: (partial: string) => void): void;
|
||||||
/** Set callback for final transcripts */
|
/** Set callback for final transcripts */
|
||||||
onTranscript(callback: (transcript: string) => void): void;
|
onTranscript(callback: (transcript: string) => void): void;
|
||||||
|
/** Set callback when speech starts (VAD) */
|
||||||
|
onSpeechStart(callback: () => void): void;
|
||||||
/** Close the session */
|
/** Close the session */
|
||||||
close(): void;
|
close(): void;
|
||||||
/** Check if session is connected */
|
/** Check if session is connected */
|
||||||
@ -91,6 +93,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
|||||||
private pendingTranscript = "";
|
private pendingTranscript = "";
|
||||||
private onTranscriptCallback: ((transcript: string) => void) | null = null;
|
private onTranscriptCallback: ((transcript: string) => void) | null = null;
|
||||||
private onPartialCallback: ((partial: string) => void) | null = null;
|
private onPartialCallback: ((partial: string) => void) | null = null;
|
||||||
|
private onSpeechStartCallback: (() => void) | null = null;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly apiKey: string,
|
private readonly apiKey: string,
|
||||||
@ -243,6 +246,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
|||||||
case "input_audio_buffer.speech_started":
|
case "input_audio_buffer.speech_started":
|
||||||
console.log("[RealtimeSTT] Speech started");
|
console.log("[RealtimeSTT] Speech started");
|
||||||
this.pendingTranscript = "";
|
this.pendingTranscript = "";
|
||||||
|
this.onSpeechStartCallback?.();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case "error":
|
case "error":
|
||||||
@ -273,6 +277,10 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
|
|||||||
this.onTranscriptCallback = callback;
|
this.onTranscriptCallback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
onSpeechStart(callback: () => void): void {
|
||||||
|
this.onSpeechStartCallback = callback;
|
||||||
|
}
|
||||||
|
|
||||||
async waitForTranscript(timeoutMs = 30000): Promise<string> {
|
async waitForTranscript(timeoutMs = 30000): Promise<string> {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
const timeout = setTimeout(() => {
|
const timeout = setTimeout(() => {
|
||||||
|
|||||||
@ -135,6 +135,17 @@ export class TwilioProvider implements VoiceCallProvider {
|
|||||||
this.callStreamMap.delete(callSid);
|
this.callStreamMap.delete(callSid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear TTS queue for a call (barge-in).
|
||||||
|
* Used when user starts speaking to interrupt current TTS playback.
|
||||||
|
*/
|
||||||
|
clearTtsQueue(callSid: string): void {
|
||||||
|
const streamSid = this.callStreamMap.get(callSid);
|
||||||
|
if (streamSid && this.mediaStreamHandler) {
|
||||||
|
this.mediaStreamHandler.clearTtsQueue(streamSid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make an authenticated request to the Twilio API.
|
* Make an authenticated request to the Twilio API.
|
||||||
*/
|
*/
|
||||||
@ -504,7 +515,7 @@ export class TwilioProvider implements VoiceCallProvider {
|
|||||||
/**
|
/**
|
||||||
* Play TTS via core TTS and Twilio Media Streams.
|
* Play TTS via core TTS and Twilio Media Streams.
|
||||||
* Generates audio with core TTS, converts to mu-law, and streams via WebSocket.
|
* Generates audio with core TTS, converts to mu-law, and streams via WebSocket.
|
||||||
* Uses a jitter buffer to smooth out timing variations.
|
* Uses a queue to serialize playback and prevent overlapping audio.
|
||||||
*/
|
*/
|
||||||
private async playTtsViaStream(
|
private async playTtsViaStream(
|
||||||
text: string,
|
text: string,
|
||||||
@ -514,22 +525,29 @@ export class TwilioProvider implements VoiceCallProvider {
|
|||||||
throw new Error("TTS provider and media stream handler required");
|
throw new Error("TTS provider and media stream handler required");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Generate audio with core TTS (returns mu-law at 8kHz)
|
|
||||||
const muLawAudio = await this.ttsProvider.synthesizeForTelephony(text);
|
|
||||||
|
|
||||||
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
|
// Stream audio in 20ms chunks (160 bytes at 8kHz mu-law)
|
||||||
const CHUNK_SIZE = 160;
|
const CHUNK_SIZE = 160;
|
||||||
const CHUNK_DELAY_MS = 20;
|
const CHUNK_DELAY_MS = 20;
|
||||||
|
|
||||||
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
|
const handler = this.mediaStreamHandler;
|
||||||
this.mediaStreamHandler.sendAudio(streamSid, chunk);
|
const ttsProvider = this.ttsProvider;
|
||||||
|
await handler.queueTts(streamSid, async (signal) => {
|
||||||
|
// Generate audio with core TTS (returns mu-law at 8kHz)
|
||||||
|
const muLawAudio = await ttsProvider.synthesizeForTelephony(text);
|
||||||
|
for (const chunk of chunkAudio(muLawAudio, CHUNK_SIZE)) {
|
||||||
|
if (signal.aborted) break;
|
||||||
|
handler.sendAudio(streamSid, chunk);
|
||||||
|
|
||||||
// Pace the audio to match real-time playback
|
// Pace the audio to match real-time playback
|
||||||
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
|
await new Promise((resolve) => setTimeout(resolve, CHUNK_DELAY_MS));
|
||||||
}
|
if (signal.aborted) break;
|
||||||
|
}
|
||||||
|
|
||||||
// Send a mark to track when audio finishes
|
if (!signal.aborted) {
|
||||||
this.mediaStreamHandler.sendMark(streamSid, `tts-${Date.now()}`);
|
// Send a mark to track when audio finishes
|
||||||
|
handler.sendMark(streamSid, `tts-${Date.now()}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@ -78,6 +78,11 @@ export class VoiceCallWebhookServer {
|
|||||||
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
|
`[voice-call] Transcript for ${providerCallId}: ${transcript}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Clear TTS queue on barge-in (user started speaking, interrupt current playback)
|
||||||
|
if (this.provider.name === "twilio") {
|
||||||
|
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
|
||||||
|
}
|
||||||
|
|
||||||
// Look up our internal call ID from the provider call ID
|
// Look up our internal call ID from the provider call ID
|
||||||
const call = this.manager.getCallByProviderCallId(providerCallId);
|
const call = this.manager.getCallByProviderCallId(providerCallId);
|
||||||
if (!call) {
|
if (!call) {
|
||||||
@ -109,6 +114,11 @@ export class VoiceCallWebhookServer {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
onSpeechStart: (providerCallId) => {
|
||||||
|
if (this.provider.name === "twilio") {
|
||||||
|
(this.provider as TwilioProvider).clearTtsQueue(providerCallId);
|
||||||
|
}
|
||||||
|
},
|
||||||
onPartialTranscript: (callId, partial) => {
|
onPartialTranscript: (callId, partial) => {
|
||||||
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
|
console.log(`[voice-call] Partial for ${callId}: ${partial}`);
|
||||||
},
|
},
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user