fix(voice-call): wait for TTS completion before notify hangup

- Add call.speak.ended event type and handler for Telnyx
- Track calls pending speak completion in pendingNotifyHangup Set
- Hangup triggers on speak completion event (with fallback timer)
- Fix sandbox vs embedded mode detection for RPC
This commit is contained in:
hershey-g 2026-01-28 13:09:46 -05:00
parent 109ac1c549
commit c17523d36c
No known key found for this signature in database
4 changed files with 186 additions and 94 deletions

View File

@ -1,13 +1,14 @@
import { Type } from "@sinclair/typebox"; import { Type } from "@sinclair/typebox";
import type { CoreConfig } from "./src/core-bridge.js"; import type { CoreConfig } from "./src/core-bridge.js";
import { import {
VoiceCallConfigSchema, VoiceCallConfigSchema,
resolveVoiceCallConfig,
validateProviderConfig, validateProviderConfig,
type VoiceCallConfig, type VoiceCallConfig,
} from "./src/config.js"; } from "./src/config.js";
import { registerVoiceCallCli } from "./src/cli.js"; import { registerVoiceCallCli } from "./src/cli.js";
import { createVoiceCallRuntime, type VoiceCallRuntime } from "./src/runtime.js"; import { createVoiceCallRuntime, type VoiceCallRuntime } from "./src/runtime.js";
import { callGateway } from "../../dist/gateway/call.js";
const voiceCallConfigSchema = { const voiceCallConfigSchema = {
parse(value: unknown): VoiceCallConfig { parse(value: unknown): VoiceCallConfig {
@ -62,8 +63,8 @@ const voiceCallConfigSchema = {
advanced: true, advanced: true,
}, },
"tunnel.ngrokDomain": { label: "ngrok Domain", advanced: true }, "tunnel.ngrokDomain": { label: "ngrok Domain", advanced: true },
"tunnel.allowNgrokFreeTierLoopbackBypass": { "tunnel.allowNgrokFreeTier": {
label: "Allow ngrok Free Tier (Loopback Bypass)", label: "Allow ngrok Free Tier",
advanced: true, advanced: true,
}, },
"streaming.enabled": { label: "Enable Streaming", advanced: true }, "streaming.enabled": { label: "Enable Streaming", advanced: true },
@ -145,10 +146,8 @@ const voiceCallPlugin = {
description: "Voice-call plugin with Telnyx/Twilio/Plivo providers", description: "Voice-call plugin with Telnyx/Twilio/Plivo providers",
configSchema: voiceCallConfigSchema, configSchema: voiceCallConfigSchema,
register(api) { register(api) {
const config = resolveVoiceCallConfig( const cfg = voiceCallConfigSchema.parse(api.pluginConfig);
voiceCallConfigSchema.parse(api.pluginConfig), const validation = validateProviderConfig(cfg);
);
const validation = validateProviderConfig(config);
if (api.pluginConfig && typeof api.pluginConfig === "object") { if (api.pluginConfig && typeof api.pluginConfig === "object") {
const raw = api.pluginConfig as Record<string, unknown>; const raw = api.pluginConfig as Record<string, unknown>;
@ -169,7 +168,7 @@ const voiceCallPlugin = {
let runtime: VoiceCallRuntime | null = null; let runtime: VoiceCallRuntime | null = null;
const ensureRuntime = async () => { const ensureRuntime = async () => {
if (!config.enabled) { if (!cfg.enabled) {
throw new Error("Voice call disabled in plugin config"); throw new Error("Voice call disabled in plugin config");
} }
if (!validation.valid) { if (!validation.valid) {
@ -178,7 +177,7 @@ const voiceCallPlugin = {
if (runtime) return runtime; if (runtime) return runtime;
if (!runtimePromise) { if (!runtimePromise) {
runtimePromise = createVoiceCallRuntime({ runtimePromise = createVoiceCallRuntime({
config, config: cfg,
coreConfig: api.config as CoreConfig, coreConfig: api.config as CoreConfig,
ttsRuntime: api.runtime.tts, ttsRuntime: api.runtime.tts,
logger: api.logger, logger: api.logger,
@ -188,8 +187,14 @@ const voiceCallPlugin = {
return runtime; return runtime;
}; };
const sendError = (respond: (ok: boolean, payload?: unknown) => void, err: unknown) => { // PATCHED: Use proper error format for gateway responses
respond(false, { error: err instanceof Error ? err.message : String(err) }); // respond(false, undefined, { message: "error" }) instead of respond(false, { error: "..." })
const respondError = (respond: (ok: boolean, payload?: unknown, error?: unknown) => void, msg: string) => {
respond(false, undefined, { message: msg });
};
const sendError = (respond: (ok: boolean, payload?: unknown, error?: unknown) => void, err: unknown) => {
respond(false, undefined, { message: err instanceof Error ? err.message : String(err) });
}; };
api.registerGatewayMethod("voicecall.initiate", async ({ params, respond }) => { api.registerGatewayMethod("voicecall.initiate", async ({ params, respond }) => {
@ -197,7 +202,7 @@ const voiceCallPlugin = {
const message = const message =
typeof params?.message === "string" ? params.message.trim() : ""; typeof params?.message === "string" ? params.message.trim() : "";
if (!message) { if (!message) {
respond(false, { error: "message required" }); respondError(respond, "message required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
@ -206,7 +211,7 @@ const voiceCallPlugin = {
? params.to.trim() ? params.to.trim()
: rt.config.toNumber; : rt.config.toNumber;
if (!to) { if (!to) {
respond(false, { error: "to required" }); respondError(respond, "to required");
return; return;
} }
const mode = const mode =
@ -218,7 +223,7 @@ const voiceCallPlugin = {
mode, mode,
}); });
if (!result.success) { if (!result.success) {
respond(false, { error: result.error || "initiate failed" }); respondError(respond, result.error || "initiate failed");
return; return;
} }
respond(true, { callId: result.callId, initiated: true }); respond(true, { callId: result.callId, initiated: true });
@ -234,13 +239,13 @@ const voiceCallPlugin = {
const message = const message =
typeof params?.message === "string" ? params.message.trim() : ""; typeof params?.message === "string" ? params.message.trim() : "";
if (!callId || !message) { if (!callId || !message) {
respond(false, { error: "callId and message required" }); respondError(respond, "callId and message required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
const result = await rt.manager.continueCall(callId, message); const result = await rt.manager.continueCall(callId, message);
if (!result.success) { if (!result.success) {
respond(false, { error: result.error || "continue failed" }); respondError(respond, result.error || "continue failed");
return; return;
} }
respond(true, { success: true, transcript: result.transcript }); respond(true, { success: true, transcript: result.transcript });
@ -256,13 +261,13 @@ const voiceCallPlugin = {
const message = const message =
typeof params?.message === "string" ? params.message.trim() : ""; typeof params?.message === "string" ? params.message.trim() : "";
if (!callId || !message) { if (!callId || !message) {
respond(false, { error: "callId and message required" }); respondError(respond, "callId and message required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
const result = await rt.manager.speak(callId, message); const result = await rt.manager.speak(callId, message);
if (!result.success) { if (!result.success) {
respond(false, { error: result.error || "speak failed" }); respondError(respond, result.error || "speak failed");
return; return;
} }
respond(true, { success: true }); respond(true, { success: true });
@ -276,13 +281,13 @@ const voiceCallPlugin = {
const callId = const callId =
typeof params?.callId === "string" ? params.callId.trim() : ""; typeof params?.callId === "string" ? params.callId.trim() : "";
if (!callId) { if (!callId) {
respond(false, { error: "callId required" }); respondError(respond, "callId required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
const result = await rt.manager.endCall(callId); const result = await rt.manager.endCall(callId);
if (!result.success) { if (!result.success) {
respond(false, { error: result.error || "end failed" }); respondError(respond, result.error || "end failed");
return; return;
} }
respond(true, { success: true }); respond(true, { success: true });
@ -300,7 +305,7 @@ const voiceCallPlugin = {
? params.sid.trim() ? params.sid.trim()
: ""; : "";
if (!raw) { if (!raw) {
respond(false, { error: "callId required" }); respondError(respond, "callId required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
@ -322,7 +327,7 @@ const voiceCallPlugin = {
const message = const message =
typeof params?.message === "string" ? params.message.trim() : ""; typeof params?.message === "string" ? params.message.trim() : "";
if (!to) { if (!to) {
respond(false, { error: "to required" }); respondError(respond, "to required");
return; return;
} }
const rt = await ensureRuntime(); const rt = await ensureRuntime();
@ -330,7 +335,7 @@ const voiceCallPlugin = {
message: message || undefined, message: message || undefined,
}); });
if (!result.success) { if (!result.success) {
respond(false, { error: result.error || "initiate failed" }); respondError(respond, result.error || "initiate failed");
return; return;
} }
respond(true, { callId: result.callId, initiated: true }); respond(true, { callId: result.callId, initiated: true });
@ -339,6 +344,9 @@ const voiceCallPlugin = {
} }
}); });
// PATCHED: Detect embedded vs sandbox mode and use appropriate call path
// In embedded mode (runtime available): use ensureRuntime() directly
// In sandbox mode (runtime null): use gateway RPC
api.registerTool({ api.registerTool({
name: "voice_call", name: "voice_call",
label: "Voice Call", label: "Voice Call",
@ -353,100 +361,155 @@ const voiceCallPlugin = {
details: payload, details: payload,
}); });
try { // Detect mode: if runtime exists, we're in gateway process (embedded)
const rt = await ensureRuntime(); // If runtime is null, we're in sandbox - use gateway RPC
const useDirectCalls = runtime !== null;
// Gateway RPC helper (for sandbox mode)
const gatewayCall = async (method: string, callParams: Record<string, unknown>): Promise<Record<string, unknown>> => {
try {
const result = await callGateway({
method,
params: callParams,
timeoutMs: 10000,
});
return result as Record<string, unknown>;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
return { error: msg };
}
};
// Direct call helper (for embedded mode)
const directCall = async (action: string, callParams: Record<string, unknown>): Promise<Record<string, unknown>> => {
const rt = await ensureRuntime();
switch (action) {
case "initiate": {
const to = callParams.to as string | undefined ?? rt.config.toNumber;
if (!to) return { error: "to required" };
const result = await rt.manager.initiateCall(to, undefined, {
message: callParams.message as string | undefined,
mode: callParams.mode as "notify" | "conversation" | undefined,
});
if (!result.success) return { error: result.error || "initiate failed" };
return { callId: result.callId, initiated: true };
}
case "continue": {
const result = await rt.manager.continueCall(
callParams.callId as string,
callParams.message as string
);
if (!result.success) return { error: result.error || "continue failed" };
return { success: true, transcript: result.transcript };
}
case "speak": {
const result = await rt.manager.speak(
callParams.callId as string,
callParams.message as string
);
if (!result.success) return { error: result.error || "speak failed" };
return { success: true };
}
case "end": {
const result = await rt.manager.endCall(callParams.callId as string);
if (!result.success) return { error: result.error || "end failed" };
return { success: true };
}
case "status": {
const raw = callParams.callId as string;
const call = rt.manager.getCall(raw) || rt.manager.getCallByProviderCallId(raw);
if (!call) return { found: false };
return { found: true, call };
}
default:
return { error: "unknown action" };
}
};
// Unified call function that routes based on mode
const doCall = async (action: string, callParams: Record<string, unknown>): Promise<Record<string, unknown>> => {
if (useDirectCalls) {
return directCall(action, callParams);
} else {
const methodMap: Record<string, string> = {
initiate: "voicecall.initiate",
continue: "voicecall.continue",
speak: "voicecall.speak",
end: "voicecall.end",
status: "voicecall.status",
};
return gatewayCall(methodMap[action] || action, callParams);
}
};
try {
if (typeof params?.action === "string") { if (typeof params?.action === "string") {
switch (params.action) { switch (params.action) {
case "initiate_call": { case "initiate_call": {
const message = String(params.message || "").trim(); const message = String(params.message || "").trim();
if (!message) throw new Error("message required"); if (!message) throw new Error("message required");
const to = const to = typeof params.to === "string" && params.to.trim()
typeof params.to === "string" && params.to.trim() ? params.to.trim()
? params.to.trim() : undefined;
: rt.config.toNumber; const mode = params.mode === "notify" || params.mode === "conversation"
if (!to) throw new Error("to required"); ? params.mode
const result = await rt.manager.initiateCall(to, undefined, { : undefined;
message, const result = await doCall("initiate", { to, message, mode });
mode: if (result.error) throw new Error(String(result.error));
params.mode === "notify" || params.mode === "conversation"
? params.mode
: undefined,
});
if (!result.success) {
throw new Error(result.error || "initiate failed");
}
return json({ callId: result.callId, initiated: true }); return json({ callId: result.callId, initiated: true });
} }
case "continue_call": { case "continue_call": {
const callId = String(params.callId || "").trim(); const callId = String(params.callId || "").trim();
const message = String(params.message || "").trim(); const message = String(params.message || "").trim();
if (!callId || !message) { if (!callId || !message) throw new Error("callId and message required");
throw new Error("callId and message required"); const result = await doCall("continue", { callId, message });
} if (result.error) throw new Error(String(result.error));
const result = await rt.manager.continueCall(callId, message);
if (!result.success) {
throw new Error(result.error || "continue failed");
}
return json({ success: true, transcript: result.transcript }); return json({ success: true, transcript: result.transcript });
} }
case "speak_to_user": { case "speak_to_user": {
const callId = String(params.callId || "").trim(); const callId = String(params.callId || "").trim();
const message = String(params.message || "").trim(); const message = String(params.message || "").trim();
if (!callId || !message) { if (!callId || !message) throw new Error("callId and message required");
throw new Error("callId and message required"); const result = await doCall("speak", { callId, message });
} if (result.error) throw new Error(String(result.error));
const result = await rt.manager.speak(callId, message);
if (!result.success) {
throw new Error(result.error || "speak failed");
}
return json({ success: true }); return json({ success: true });
} }
case "end_call": { case "end_call": {
const callId = String(params.callId || "").trim(); const callId = String(params.callId || "").trim();
if (!callId) throw new Error("callId required"); if (!callId) throw new Error("callId required");
const result = await rt.manager.endCall(callId); const result = await doCall("end", { callId });
if (!result.success) { if (result.error) throw new Error(String(result.error));
throw new Error(result.error || "end failed");
}
return json({ success: true }); return json({ success: true });
} }
case "get_status": { case "get_status": {
const callId = String(params.callId || "").trim(); const callId = String(params.callId || "").trim();
if (!callId) throw new Error("callId required"); if (!callId) throw new Error("callId required");
const call = const result = await doCall("status", { callId });
rt.manager.getCall(callId) || return json(result);
rt.manager.getCallByProviderCallId(callId);
return json(call ? { found: true, call } : { found: false });
} }
} }
} }
// Legacy mode-based params
const mode = params?.mode ?? "call"; const mode = params?.mode ?? "call";
if (mode === "status") { if (mode === "status") {
const sid = const sid = typeof params.sid === "string" ? params.sid.trim() : "";
typeof params.sid === "string" ? params.sid.trim() : "";
if (!sid) throw new Error("sid required for status"); if (!sid) throw new Error("sid required for status");
const call = const result = await doCall("status", { callId: sid });
rt.manager.getCall(sid) || rt.manager.getCallByProviderCallId(sid); return json(result);
return json(call ? { found: true, call } : { found: false });
} }
const to = // Default: initiate call
typeof params.to === "string" && params.to.trim() const to = typeof params.to === "string" && params.to.trim()
? params.to.trim() ? params.to.trim()
: rt.config.toNumber; : undefined;
if (!to) throw new Error("to required for call"); const message = typeof params.message === "string" && params.message.trim()
const result = await rt.manager.initiateCall(to, undefined, { ? params.message.trim()
message: : undefined;
typeof params.message === "string" && params.message.trim() const result = await doCall("initiate", { to, message });
? params.message.trim() if (result.error) throw new Error(String(result.error));
: undefined,
});
if (!result.success) {
throw new Error(result.error || "initiate failed");
}
return json({ callId: result.callId, initiated: true }); return json({ callId: result.callId, initiated: true });
} catch (err) { } catch (err) {
return json({ return json({
error: err instanceof Error ? err.message : String(err), error: err instanceof Error ? err.message : String(err),
@ -459,7 +522,7 @@ const voiceCallPlugin = {
({ program }) => ({ program }) =>
registerVoiceCallCli({ registerVoiceCallCli({
program, program,
config, config: cfg,
ensureRuntime, ensureRuntime,
logger: api.logger, logger: api.logger,
}), }),
@ -469,7 +532,7 @@ const voiceCallPlugin = {
api.registerService({ api.registerService({
id: "voicecall", id: "voicecall",
start: async () => { start: async () => {
if (!config.enabled) return; if (!cfg.enabled) return;
try { try {
await ensureRuntime(); await ensureRuntime();
} catch (err) { } catch (err) {

View File

@ -40,6 +40,8 @@ export class CallManager {
>(); >();
/** Max duration timers to auto-hangup calls after configured timeout */ /** Max duration timers to auto-hangup calls after configured timeout */
private maxDurationTimers = new Map<CallId, NodeJS.Timeout>(); private maxDurationTimers = new Map<CallId, NodeJS.Timeout>();
/** Calls pending speak completion for notify auto-hangup */
private pendingNotifyHangup = new Set<CallId>();
constructor(config: VoiceCallConfig, storePath?: string) { constructor(config: VoiceCallConfig, storePath?: string) {
this.config = config; this.config = config;
@ -269,21 +271,26 @@ export class CallManager {
return; return;
} }
// In notify mode, auto-hangup after delay // In notify mode, register for speak completion hangup
if (mode === "notify") { if (mode === "notify") {
const delaySec = this.config.outbound.notifyHangupDelaySec; this.pendingNotifyHangup.add(call.callId);
console.log( console.log(
`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`, `[voice-call] Notify mode: will hangup after speak completes for call ${call.callId}`,
); );
// Fallback timeout in case speak.ended event never arrives
const fallbackSec = this.config.outbound.notifyHangupDelaySec;
setTimeout(async () => { setTimeout(async () => {
const currentCall = this.getCall(call.callId); if (this.pendingNotifyHangup.has(call.callId)) {
if (currentCall && !TerminalStates.has(currentCall.state)) { this.pendingNotifyHangup.delete(call.callId);
console.log( const currentCall = this.getCall(call.callId);
`[voice-call] Notify mode: hanging up call ${call.callId}`, if (currentCall && !TerminalStates.has(currentCall.state)) {
); console.log(
await this.endCall(call.callId); `[voice-call] Notify mode: fallback hangup (no speak.ended) for call ${call.callId}`,
);
await this.endCall(call.callId);
}
} }
}, delaySec * 1000); }, fallbackSec * 1000);
} }
} }
@ -625,6 +632,18 @@ export class CallManager {
this.transitionState(call, "speaking"); this.transitionState(call, "speaking");
break; break;
case "call.speak.ended":
this.transitionState(call, "active");
// If this call was pending notify hangup, end it now
if (this.pendingNotifyHangup.has(call.callId)) {
this.pendingNotifyHangup.delete(call.callId);
console.log(
`[voice-call] Notify mode: speak completed, hanging up call ${call.callId}`,
);
void this.endCall(call.callId);
}
break;
case "call.speech": case "call.speech":
if (event.isFinal) { if (event.isFinal) {
this.addTranscriptEntry(call, "user", event.transcript); this.addTranscriptEntry(call, "user", event.transcript);

View File

@ -200,6 +200,13 @@ export class TelnyxProvider implements VoiceCallProvider {
text: data.payload?.text || "", text: data.payload?.text || "",
}; };
case "call.speak.ended":
return {
...baseEvent,
type: "call.speak.ended",
text: data.payload?.text || "",
};
case "call.transcription": case "call.transcription":
return { return {
...baseEvent, ...baseEvent,

View File

@ -101,6 +101,10 @@ export const NormalizedEventSchema = z.discriminatedUnion("type", [
type: z.literal("call.speaking"), type: z.literal("call.speaking"),
text: z.string(), text: z.string(),
}), }),
BaseEventSchema.extend({
type: z.literal("call.speak.ended"),
text: z.string().optional(),
}),
BaseEventSchema.extend({ BaseEventSchema.extend({
type: z.literal("call.speech"), type: z.literal("call.speech"),
transcript: z.string(), transcript: z.string(),
@ -180,7 +184,6 @@ export type WebhookContext = {
url: string; url: string;
method: "GET" | "POST" | "PUT" | "DELETE" | "PATCH"; method: "GET" | "POST" | "PUT" | "DELETE" | "PATCH";
query?: Record<string, string | string[] | undefined>; query?: Record<string, string | string[] | undefined>;
remoteAddress?: string;
}; };
export type ProviderWebhookParseResult = { export type ProviderWebhookParseResult = {