This commit is contained in:
Garnet Liu 2026-01-30 11:55:33 +00:00 committed by GitHub
commit 3f14b19091
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 202 additions and 16 deletions

View File

@ -66,15 +66,15 @@ export class CallManager {
/**
* Initialize the call manager with a provider.
*/
initialize(provider: VoiceCallProvider, webhookUrl: string): void {
async initialize(provider: VoiceCallProvider, webhookUrl: string): Promise<void> {
this.provider = provider;
this.webhookUrl = webhookUrl;
// Ensure store directory exists
fs.mkdirSync(this.storePath, { recursive: true });
// Load any persisted active calls
this.loadActiveCalls();
// Load any persisted active calls (verifying with provider)
await this.loadActiveCalls();
}
/**
@ -836,9 +836,9 @@ export class CallManager {
/**
* Load active calls from persistence (for crash recovery).
* Uses streaming to handle large log files efficiently.
* Verifies with provider that calls are still active before loading.
*/
private loadActiveCalls(): void {
private async loadActiveCalls(): Promise<void> {
const logPath = path.join(this.storePath, "calls.jsonl");
if (!fs.existsSync(logPath)) return;
@ -859,19 +859,54 @@ export class CallManager {
}
}
// Only keep non-terminal calls
// Calls older than maxDurationSeconds are definitely stale (fallback check)
const maxAgeMs = this.config.maxDurationSeconds * 1000;
const now = Date.now();
// Only keep non-terminal calls that are verified active
for (const [callId, call] of callMap) {
if (!TerminalStates.has(call.state)) {
this.activeCalls.set(callId, call);
// Populate providerCallId mapping for lookups
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
// Populate processed event IDs
for (const eventId of call.processedEventIds) {
this.processedEventIds.add(eventId);
// Skip terminal states
if (TerminalStates.has(call.state)) continue;
// Skip calls older than max duration (definitely stale)
const callAge = now - call.startedAt;
if (callAge > maxAgeMs) {
console.log(
`[voice-call] Skipping stale call ${callId} (age: ${Math.round(callAge / 1000)}s exceeds max: ${this.config.maxDurationSeconds}s)`,
);
continue;
}
// Verify with provider if call is still active
if (call.providerCallId && this.provider) {
try {
const status = await this.provider.getCallStatus({
providerCallId: call.providerCallId,
});
if (status.isTerminal) {
console.log(
`[voice-call] Skipping ended call ${callId} (provider status: ${status.status})`,
);
continue;
}
} catch (err) {
// If we can't verify, skip the call to be safe
console.log(
`[voice-call] Skipping unverifiable call ${callId}: ${err instanceof Error ? err.message : String(err)}`,
);
continue;
}
}
this.activeCalls.set(callId, call);
// Populate providerCallId mapping for lookups
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
// Populate processed event IDs
for (const eventId of call.processedEventIds) {
this.processedEventIds.add(eventId);
}
}
}

View File

@ -1,4 +1,6 @@
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@ -64,4 +66,10 @@ export interface VoiceCallProvider {
* Stop listening for user speech (deactivate STT).
*/
stopListening(input: StopListeningInput): Promise<void>;
/**
* Get current status of a call from the provider.
* Used to verify if persisted calls are still active.
*/
getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult>;
}

View File

@ -2,6 +2,8 @@ import crypto from "node:crypto";
import type {
EndReason,
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@ -165,4 +167,12 @@ export class MockProvider implements VoiceCallProvider {
async stopListening(_input: StopListeningInput): Promise<void> {
// No-op for mock
}
async getCallStatus(_input: GetCallStatusInput): Promise<GetCallStatusResult> {
// Mock always returns completed (stale) to test cleanup logic
return {
status: "completed",
isTerminal: true,
};
}
}

View File

@ -2,6 +2,8 @@ import crypto from "node:crypto";
import type { PlivoConfig } from "../config.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@ -401,6 +403,48 @@ export class PlivoProvider implements VoiceCallProvider {
// GetInput ends automatically when speech ends.
}
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const terminalStatuses = new Set([
"completed",
"failed",
"busy",
"no-answer",
"cancel",
]);
try {
const response = await fetch(
`https://api.plivo.com/v1/Account/${this.authId}/Call/${input.providerCallId}/`,
{
headers: {
Authorization: `Basic ${Buffer.from(`${this.authId}:${this.authToken}`).toString("base64")}`,
},
},
);
if (!response.ok) {
return {
status: "unknown",
isTerminal: true,
error: `Plivo API error: ${response.status}`,
};
}
const data = (await response.json()) as { call_status?: string };
const status = data.call_status || "unknown";
return {
status,
isTerminal: terminalStatuses.has(status),
};
} catch (err) {
return {
status: "unknown",
isTerminal: true,
error: err instanceof Error ? err.message : String(err),
};
}
}
private static normalizeNumber(numberOrSip: string): string {
const trimmed = numberOrSip.trim();
if (trimmed.toLowerCase().startsWith("sip:")) return trimmed;

View File

@ -3,6 +3,8 @@ import crypto from "node:crypto";
import type { TelnyxConfig } from "../config.js";
import type {
EndReason,
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@ -331,6 +333,41 @@ export class TelnyxProvider implements VoiceCallProvider {
{ allowNotFound: true },
);
}
/**
* Get call status from Telnyx API.
*/
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const terminalStatuses = new Set([
"hangup",
"completed",
"failed",
"busy",
"no_answer",
"cancel",
]);
try {
const result = await this.apiRequest<{ data?: { is_alive?: boolean; state?: string } }>(
`/calls/${input.providerCallId}`,
{},
{ allowNotFound: true },
);
const state = result?.data?.state || "unknown";
const isAlive = result?.data?.is_alive ?? false;
return {
status: state,
isTerminal: !isAlive || terminalStatuses.has(state),
};
} catch (err) {
return {
status: "unknown",
isTerminal: true,
error: err instanceof Error ? err.message : String(err),
};
}
}
}
// -----------------------------------------------------------------------------

View File

@ -3,6 +3,8 @@ import crypto from "node:crypto";
import type { TwilioConfig } from "../config.js";
import type { MediaStreamHandler } from "../media-stream.js";
import type {
GetCallStatusInput,
GetCallStatusResult,
HangupCallInput,
InitiateCallInput,
InitiateCallResult,
@ -579,6 +581,41 @@ export class TwilioProvider implements VoiceCallProvider {
// Twilio's <Gather> automatically stops on speech end
// No explicit action needed
}
/**
* Get call status from Twilio API.
* Used to verify if persisted calls are still active on gateway restart.
*/
async getCallStatus(input: GetCallStatusInput): Promise<GetCallStatusResult> {
const terminalStatuses = new Set([
"completed",
"failed",
"busy",
"no-answer",
"canceled",
]);
try {
const result = await this.apiRequest<TwilioCallResponse>(
`/Calls/${input.providerCallId}.json`,
{},
{ allowNotFound: true },
);
const status = result?.status || "unknown";
return {
status,
isTerminal: terminalStatuses.has(status),
};
} catch (err) {
// If we can't reach Twilio, assume call is stale
return {
status: "unknown",
isTerminal: true,
error: err instanceof Error ? err.message : String(err),
};
}
}
}
// -----------------------------------------------------------------------------

View File

@ -189,7 +189,7 @@ export async function createVoiceCallRuntime(params: {
}
}
manager.initialize(provider, webhookUrl);
await manager.initialize(provider, webhookUrl);
const stop = async () => {
if (tunnelResult) {

View File

@ -271,3 +271,18 @@ export type EndCallToolResult = {
success: boolean;
error?: string;
};
/** Input for getting call status from provider */
export type GetCallStatusInput = {
providerCallId: string;
};
/** Result of getting call status from provider */
export type GetCallStatusResult = {
/** Provider's call status (e.g., 'completed', 'in-progress', 'failed') */
status: string;
/** Whether the call has ended */
isTerminal: boolean;
/** Error message if status check failed */
error?: string;
};