From c063b066ad6bb232864112bda97ddff52cad47ac Mon Sep 17 00:00:00 2001 From: mbp-2013 Date: Thu, 29 Jan 2026 18:23:07 -0800 Subject: [PATCH] fix(voice-call): verify call status with provider before loading stale calls Problem: When the gateway restarts, loadActiveCalls() reloads non-terminal calls from calls.jsonl. However, these calls may have already ended (e.g., Twilio timed them out, or webhook couldn't reach local URL) and are now stale. This causes the concurrent call limit to be reached with phantom calls. Solution: - Add getCallStatus() method to VoiceCallProvider interface - Implement getCallStatus() for all providers (Twilio, Plivo, Telnyx, Mock) - On load, verify each non-terminal call with the provider before adding to activeCalls - Skip calls that the provider reports as terminal (completed, failed, etc.) - Also skip calls older than maxDurationSeconds as a fallback This is an improvement over PR #2810 which only uses time-based cleanup. By querying the provider, we can accurately determine if a call is still active. --- extensions/voice-call/src/manager.ts | 65 ++++++++++++++----- extensions/voice-call/src/providers/base.ts | 8 +++ extensions/voice-call/src/providers/mock.ts | 10 +++ extensions/voice-call/src/providers/plivo.ts | 44 +++++++++++++ extensions/voice-call/src/providers/telnyx.ts | 37 +++++++++++ extensions/voice-call/src/providers/twilio.ts | 37 +++++++++++ extensions/voice-call/src/runtime.ts | 2 +- extensions/voice-call/src/types.ts | 15 +++++ 8 files changed, 202 insertions(+), 16 deletions(-) diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 2e2e4661b..06b803ff9 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -54,15 +54,15 @@ export class CallManager { /** * Initialize the call manager with a provider. */ - initialize(provider: VoiceCallProvider, webhookUrl: string): void { + async initialize(provider: VoiceCallProvider, webhookUrl: string): Promise { this.provider = provider; this.webhookUrl = webhookUrl; // Ensure store directory exists fs.mkdirSync(this.storePath, { recursive: true }); - // Load any persisted active calls - this.loadActiveCalls(); + // Load any persisted active calls (verifying with provider) + await this.loadActiveCalls(); } /** @@ -824,9 +824,9 @@ export class CallManager { /** * Load active calls from persistence (for crash recovery). - * Uses streaming to handle large log files efficiently. + * Verifies with provider that calls are still active before loading. */ - private loadActiveCalls(): void { + private async loadActiveCalls(): Promise { const logPath = path.join(this.storePath, "calls.jsonl"); if (!fs.existsSync(logPath)) return; @@ -847,19 +847,54 @@ export class CallManager { } } - // Only keep non-terminal calls + // Calls older than maxDurationSeconds are definitely stale (fallback check) + const maxAgeMs = this.config.maxDurationSeconds * 1000; + const now = Date.now(); + + // Only keep non-terminal calls that are verified active for (const [callId, call] of callMap) { - if (!TerminalStates.has(call.state)) { - this.activeCalls.set(callId, call); - // Populate providerCallId mapping for lookups - if (call.providerCallId) { - this.providerCallIdMap.set(call.providerCallId, callId); - } - // Populate processed event IDs - for (const eventId of call.processedEventIds) { - this.processedEventIds.add(eventId); + // Skip terminal states + if (TerminalStates.has(call.state)) continue; + + // Skip calls older than max duration (definitely stale) + const callAge = now - call.startedAt; + if (callAge > maxAgeMs) { + console.log( + `[voice-call] Skipping stale call ${callId} (age: ${Math.round(callAge / 1000)}s exceeds max: ${this.config.maxDurationSeconds}s)`, + ); + continue; + } + + // Verify with provider if call is still active + if (call.providerCallId && this.provider) { + try { + const status = await this.provider.getCallStatus({ + providerCallId: call.providerCallId, + }); + if (status.isTerminal) { + console.log( + `[voice-call] Skipping ended call ${callId} (provider status: ${status.status})`, + ); + continue; + } + } catch (err) { + // If we can't verify, skip the call to be safe + console.log( + `[voice-call] Skipping unverifiable call ${callId}: ${err instanceof Error ? err.message : String(err)}`, + ); + continue; } } + + this.activeCalls.set(callId, call); + // Populate providerCallId mapping for lookups + if (call.providerCallId) { + this.providerCallIdMap.set(call.providerCallId, callId); + } + // Populate processed event IDs + for (const eventId of call.processedEventIds) { + this.processedEventIds.add(eventId); + } } } diff --git a/extensions/voice-call/src/providers/base.ts b/extensions/voice-call/src/providers/base.ts index 63a9a0471..5714c4da7 100644 --- a/extensions/voice-call/src/providers/base.ts +++ b/extensions/voice-call/src/providers/base.ts @@ -1,4 +1,6 @@ import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -64,4 +66,10 @@ export interface VoiceCallProvider { * Stop listening for user speech (deactivate STT). */ stopListening(input: StopListeningInput): Promise; + + /** + * Get current status of a call from the provider. + * Used to verify if persisted calls are still active. + */ + getCallStatus(input: GetCallStatusInput): Promise; } diff --git a/extensions/voice-call/src/providers/mock.ts b/extensions/voice-call/src/providers/mock.ts index 85a532fcf..f5bdb7b2c 100644 --- a/extensions/voice-call/src/providers/mock.ts +++ b/extensions/voice-call/src/providers/mock.ts @@ -2,6 +2,8 @@ import crypto from "node:crypto"; import type { EndReason, + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -165,4 +167,12 @@ export class MockProvider implements VoiceCallProvider { async stopListening(_input: StopListeningInput): Promise { // No-op for mock } + + async getCallStatus(_input: GetCallStatusInput): Promise { + // Mock always returns completed (stale) to test cleanup logic + return { + status: "completed", + isTerminal: true, + }; + } } diff --git a/extensions/voice-call/src/providers/plivo.ts b/extensions/voice-call/src/providers/plivo.ts index df110bfd6..811afbe3b 100644 --- a/extensions/voice-call/src/providers/plivo.ts +++ b/extensions/voice-call/src/providers/plivo.ts @@ -2,6 +2,8 @@ import crypto from "node:crypto"; import type { PlivoConfig } from "../config.js"; import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -401,6 +403,48 @@ export class PlivoProvider implements VoiceCallProvider { // GetInput ends automatically when speech ends. } + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "completed", + "failed", + "busy", + "no-answer", + "cancel", + ]); + + try { + const response = await fetch( + `https://api.plivo.com/v1/Account/${this.authId}/Call/${input.providerCallId}/`, + { + headers: { + Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`, + }, + }, + ); + + if (!response.ok) { + return { + status: "unknown", + isTerminal: true, + error: `Plivo API error: ${response.status}`, + }; + } + + const data = (await response.json()) as { call_status?: string }; + const status = data.call_status || "unknown"; + return { + status, + isTerminal: terminalStatuses.has(status), + }; + } catch (err) { + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } + private static normalizeNumber(numberOrSip: string): string { const trimmed = numberOrSip.trim(); if (trimmed.toLowerCase().startsWith("sip:")) return trimmed; diff --git a/extensions/voice-call/src/providers/telnyx.ts b/extensions/voice-call/src/providers/telnyx.ts index 56db19247..bbbd064cc 100644 --- a/extensions/voice-call/src/providers/telnyx.ts +++ b/extensions/voice-call/src/providers/telnyx.ts @@ -3,6 +3,8 @@ import crypto from "node:crypto"; import type { TelnyxConfig } from "../config.js"; import type { EndReason, + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -331,6 +333,41 @@ export class TelnyxProvider implements VoiceCallProvider { { allowNotFound: true }, ); } + + /** + * Get call status from Telnyx API. + */ + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "hangup", + "completed", + "failed", + "busy", + "no_answer", + "cancel", + ]); + + try { + const result = await this.apiRequest<{ data?: { is_alive?: boolean; state?: string } }>( + `/calls/${input.providerCallId}`, + {}, + { allowNotFound: true }, + ); + + const state = result?.data?.state || "unknown"; + const isAlive = result?.data?.is_alive ?? false; + return { + status: state, + isTerminal: !isAlive || terminalStatuses.has(state), + }; + } catch (err) { + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } } // ----------------------------------------------------------------------------- diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index 87c0f244d..4483602aa 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -3,6 +3,8 @@ import crypto from "node:crypto"; import type { TwilioConfig } from "../config.js"; import type { MediaStreamHandler } from "../media-stream.js"; import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -579,6 +581,41 @@ export class TwilioProvider implements VoiceCallProvider { // Twilio's automatically stops on speech end // No explicit action needed } + + /** + * Get call status from Twilio API. + * Used to verify if persisted calls are still active on gateway restart. + */ + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "completed", + "failed", + "busy", + "no-answer", + "canceled", + ]); + + try { + const result = await this.apiRequest( + `/Calls/${input.providerCallId}.json`, + {}, + { allowNotFound: true }, + ); + + const status = result?.status || "unknown"; + return { + status, + isTerminal: terminalStatuses.has(status), + }; + } catch (err) { + // If we can't reach Twilio, assume call is stale + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } } // ----------------------------------------------------------------------------- diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index 6f638ab5b..a634f0206 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -189,7 +189,7 @@ export async function createVoiceCallRuntime(params: { } } - manager.initialize(provider, webhookUrl); + await manager.initialize(provider, webhookUrl); const stop = async () => { if (tunnelResult) { diff --git a/extensions/voice-call/src/types.ts b/extensions/voice-call/src/types.ts index 68cca11e6..e5975eec1 100644 --- a/extensions/voice-call/src/types.ts +++ b/extensions/voice-call/src/types.ts @@ -271,3 +271,18 @@ export type EndCallToolResult = { success: boolean; error?: string; }; + +/** Input for getting call status from provider */ +export type GetCallStatusInput = { + providerCallId: string; +}; + +/** Result of getting call status from provider */ +export type GetCallStatusResult = { + /** Provider's call status (e.g., 'completed', 'in-progress', 'failed') */ + status: string; + /** Whether the call has ended */ + isTerminal: boolean; + /** Error message if status check failed */ + error?: string; +};