diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 2e2e4661b..06b803ff9 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -54,15 +54,15 @@ export class CallManager { /** * Initialize the call manager with a provider. */ - initialize(provider: VoiceCallProvider, webhookUrl: string): void { + async initialize(provider: VoiceCallProvider, webhookUrl: string): Promise { this.provider = provider; this.webhookUrl = webhookUrl; // Ensure store directory exists fs.mkdirSync(this.storePath, { recursive: true }); - // Load any persisted active calls - this.loadActiveCalls(); + // Load any persisted active calls (verifying with provider) + await this.loadActiveCalls(); } /** @@ -824,9 +824,9 @@ export class CallManager { /** * Load active calls from persistence (for crash recovery). - * Uses streaming to handle large log files efficiently. + * Verifies with provider that calls are still active before loading. */ - private loadActiveCalls(): void { + private async loadActiveCalls(): Promise { const logPath = path.join(this.storePath, "calls.jsonl"); if (!fs.existsSync(logPath)) return; @@ -847,19 +847,54 @@ export class CallManager { } } - // Only keep non-terminal calls + // Calls older than maxDurationSeconds are definitely stale (fallback check) + const maxAgeMs = this.config.maxDurationSeconds * 1000; + const now = Date.now(); + + // Only keep non-terminal calls that are verified active for (const [callId, call] of callMap) { - if (!TerminalStates.has(call.state)) { - this.activeCalls.set(callId, call); - // Populate providerCallId mapping for lookups - if (call.providerCallId) { - this.providerCallIdMap.set(call.providerCallId, callId); - } - // Populate processed event IDs - for (const eventId of call.processedEventIds) { - this.processedEventIds.add(eventId); + // Skip terminal states + if (TerminalStates.has(call.state)) continue; + + // Skip calls older than max duration (definitely stale) + const callAge = now - call.startedAt; + if (callAge > maxAgeMs) { + console.log( + `[voice-call] Skipping stale call ${callId} (age: ${Math.round(callAge / 1000)}s exceeds max: ${this.config.maxDurationSeconds}s)`, + ); + continue; + } + + // Verify with provider if call is still active + if (call.providerCallId && this.provider) { + try { + const status = await this.provider.getCallStatus({ + providerCallId: call.providerCallId, + }); + if (status.isTerminal) { + console.log( + `[voice-call] Skipping ended call ${callId} (provider status: ${status.status})`, + ); + continue; + } + } catch (err) { + // If we can't verify, skip the call to be safe + console.log( + `[voice-call] Skipping unverifiable call ${callId}: ${err instanceof Error ? err.message : String(err)}`, + ); + continue; } } + + this.activeCalls.set(callId, call); + // Populate providerCallId mapping for lookups + if (call.providerCallId) { + this.providerCallIdMap.set(call.providerCallId, callId); + } + // Populate processed event IDs + for (const eventId of call.processedEventIds) { + this.processedEventIds.add(eventId); + } } } diff --git a/extensions/voice-call/src/providers/base.ts b/extensions/voice-call/src/providers/base.ts index 63a9a0471..5714c4da7 100644 --- a/extensions/voice-call/src/providers/base.ts +++ b/extensions/voice-call/src/providers/base.ts @@ -1,4 +1,6 @@ import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -64,4 +66,10 @@ export interface VoiceCallProvider { * Stop listening for user speech (deactivate STT). */ stopListening(input: StopListeningInput): Promise; + + /** + * Get current status of a call from the provider. + * Used to verify if persisted calls are still active. + */ + getCallStatus(input: GetCallStatusInput): Promise; } diff --git a/extensions/voice-call/src/providers/mock.ts b/extensions/voice-call/src/providers/mock.ts index 85a532fcf..f5bdb7b2c 100644 --- a/extensions/voice-call/src/providers/mock.ts +++ b/extensions/voice-call/src/providers/mock.ts @@ -2,6 +2,8 @@ import crypto from "node:crypto"; import type { EndReason, + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -165,4 +167,12 @@ export class MockProvider implements VoiceCallProvider { async stopListening(_input: StopListeningInput): Promise { // No-op for mock } + + async getCallStatus(_input: GetCallStatusInput): Promise { + // Mock always returns completed (stale) to test cleanup logic + return { + status: "completed", + isTerminal: true, + }; + } } diff --git a/extensions/voice-call/src/providers/plivo.ts b/extensions/voice-call/src/providers/plivo.ts index df110bfd6..811afbe3b 100644 --- a/extensions/voice-call/src/providers/plivo.ts +++ b/extensions/voice-call/src/providers/plivo.ts @@ -2,6 +2,8 @@ import crypto from "node:crypto"; import type { PlivoConfig } from "../config.js"; import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -401,6 +403,48 @@ export class PlivoProvider implements VoiceCallProvider { // GetInput ends automatically when speech ends. } + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "completed", + "failed", + "busy", + "no-answer", + "cancel", + ]); + + try { + const response = await fetch( + `https://api.plivo.com/v1/Account/${this.authId}/Call/${input.providerCallId}/`, + { + headers: { + Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`, + }, + }, + ); + + if (!response.ok) { + return { + status: "unknown", + isTerminal: true, + error: `Plivo API error: ${response.status}`, + }; + } + + const data = (await response.json()) as { call_status?: string }; + const status = data.call_status || "unknown"; + return { + status, + isTerminal: terminalStatuses.has(status), + }; + } catch (err) { + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } + private static normalizeNumber(numberOrSip: string): string { const trimmed = numberOrSip.trim(); if (trimmed.toLowerCase().startsWith("sip:")) return trimmed; diff --git a/extensions/voice-call/src/providers/telnyx.ts b/extensions/voice-call/src/providers/telnyx.ts index 56db19247..bbbd064cc 100644 --- a/extensions/voice-call/src/providers/telnyx.ts +++ b/extensions/voice-call/src/providers/telnyx.ts @@ -3,6 +3,8 @@ import crypto from "node:crypto"; import type { TelnyxConfig } from "../config.js"; import type { EndReason, + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -331,6 +333,41 @@ export class TelnyxProvider implements VoiceCallProvider { { allowNotFound: true }, ); } + + /** + * Get call status from Telnyx API. + */ + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "hangup", + "completed", + "failed", + "busy", + "no_answer", + "cancel", + ]); + + try { + const result = await this.apiRequest<{ data?: { is_alive?: boolean; state?: string } }>( + `/calls/${input.providerCallId}`, + {}, + { allowNotFound: true }, + ); + + const state = result?.data?.state || "unknown"; + const isAlive = result?.data?.is_alive ?? false; + return { + status: state, + isTerminal: !isAlive || terminalStatuses.has(state), + }; + } catch (err) { + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } } // ----------------------------------------------------------------------------- diff --git a/extensions/voice-call/src/providers/twilio.ts b/extensions/voice-call/src/providers/twilio.ts index 87c0f244d..4483602aa 100644 --- a/extensions/voice-call/src/providers/twilio.ts +++ b/extensions/voice-call/src/providers/twilio.ts @@ -3,6 +3,8 @@ import crypto from "node:crypto"; import type { TwilioConfig } from "../config.js"; import type { MediaStreamHandler } from "../media-stream.js"; import type { + GetCallStatusInput, + GetCallStatusResult, HangupCallInput, InitiateCallInput, InitiateCallResult, @@ -579,6 +581,41 @@ export class TwilioProvider implements VoiceCallProvider { // Twilio's automatically stops on speech end // No explicit action needed } + + /** + * Get call status from Twilio API. + * Used to verify if persisted calls are still active on gateway restart. + */ + async getCallStatus(input: GetCallStatusInput): Promise { + const terminalStatuses = new Set([ + "completed", + "failed", + "busy", + "no-answer", + "canceled", + ]); + + try { + const result = await this.apiRequest( + `/Calls/${input.providerCallId}.json`, + {}, + { allowNotFound: true }, + ); + + const status = result?.status || "unknown"; + return { + status, + isTerminal: terminalStatuses.has(status), + }; + } catch (err) { + // If we can't reach Twilio, assume call is stale + return { + status: "unknown", + isTerminal: true, + error: err instanceof Error ? err.message : String(err), + }; + } + } } // ----------------------------------------------------------------------------- diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index 6f638ab5b..a634f0206 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -189,7 +189,7 @@ export async function createVoiceCallRuntime(params: { } } - manager.initialize(provider, webhookUrl); + await manager.initialize(provider, webhookUrl); const stop = async () => { if (tunnelResult) { diff --git a/extensions/voice-call/src/types.ts b/extensions/voice-call/src/types.ts index 68cca11e6..e5975eec1 100644 --- a/extensions/voice-call/src/types.ts +++ b/extensions/voice-call/src/types.ts @@ -271,3 +271,18 @@ export type EndCallToolResult = { success: boolean; error?: string; }; + +/** Input for getting call status from provider */ +export type GetCallStatusInput = { + providerCallId: string; +}; + +/** Result of getting call status from provider */ +export type GetCallStatusResult = { + /** Provider's call status (e.g., 'completed', 'in-progress', 'failed') */ + status: string; + /** Whether the call has ended */ + isTerminal: boolean; + /** Error message if status check failed */ + error?: string; +};