diff --git a/src/cli/multi-relay.test.ts b/src/cli/multi-relay.test.ts new file mode 100644 index 000000000..d4a9831b0 --- /dev/null +++ b/src/cli/multi-relay.test.ts @@ -0,0 +1,418 @@ +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import type { Provider } from "../utils.js"; +import type { RuntimeEnv } from "../runtime.js"; +import type { WarelayConfig } from "../config/config.js"; +import type { CliDeps } from "./deps.js"; +import { runMultiProviderRelay } from "./multi-relay.js"; + +// Mock the monitor modules +vi.mock("../telegram/monitor.js", () => ({ + monitorTelegramProvider: vi.fn(), +})); + +vi.mock("../web/auto-reply.js", () => ({ + monitorWebProvider: vi.fn(), +})); + +vi.mock("../twilio/monitor.js", () => ({ + monitorTwilio: vi.fn(), +})); + +describe("runMultiProviderRelay", () => { + let mockRuntime: RuntimeEnv; + let mockConfig: WarelayConfig; + let mockDeps: CliDeps; + let originalProcessOn: typeof process.on; + let originalProcessOff: typeof process.off; + let sigintHandler: ((...args: unknown[]) => void) | undefined; + + beforeEach(() => { + mockRuntime = { + log: vi.fn(), + error: vi.fn(), + exit: vi.fn(), + }; + + mockConfig = { + telegram: { allowFrom: [] }, + }; + + mockDeps = {} as CliDeps; + + // Capture SIGINT handler + originalProcessOn = process.on; + originalProcessOff = process.off; + process.on = vi.fn((event: string, handler: (...args: unknown[]) => void) => { + if (event === "SIGINT") { + sigintHandler = handler; + } + return process; + }) as typeof process.on; + process.off = vi.fn() as typeof process.off; + + vi.clearAllMocks(); + }); + + afterEach(() => { + process.on = originalProcessOn; + process.off = originalProcessOff; + sigintHandler = undefined; + }); + + test("starts telegram provider and logs startup message", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup message + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.log).toHaveBeenCalledWith( + "šŸ“” Starting 1 provider(s): telegram", + ); + expect(monitorTelegramProvider).toHaveBeenCalledWith( + true, + mockRuntime, + expect.any(AbortSignal), + true, + ); + }); + + test("starts web provider with verbose and webTuning options", async () => { + const { monitorWebProvider } = await import("../web/auto-reply.js"); + vi.mocked(monitorWebProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["web"]; + const webTuning = { heartbeatSeconds: 60 }; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + webTuning, + runtime: mockRuntime, + }); + + // Wait for startup message + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.log).toHaveBeenCalledWith( + "šŸ“” Starting 1 provider(s): web", + ); + expect(monitorWebProvider).toHaveBeenCalledWith( + true, + undefined, + true, + undefined, + mockRuntime, + expect.any(AbortSignal), + webTuning, + ); + }); + + test("starts twilio provider with custom interval and lookback", async () => { + const { monitorTwilio } = await import("../twilio/monitor.js"); + vi.mocked(monitorTwilio).mockResolvedValue(undefined); + + const providers: Provider[] = ["twilio"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: false, + twilioInterval: 30, + twilioLookback: 10, + runtime: mockRuntime, + }); + + // Wait for startup message + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.log).toHaveBeenCalledWith( + "šŸ“” Starting 1 provider(s): twilio", + ); + expect(monitorTwilio).toHaveBeenCalledWith(30, 10); + }); + + test("starts multiple providers concurrently", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + const { monitorWebProvider } = await import("../web/auto-reply.js"); + + vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined); + vi.mocked(monitorWebProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["telegram", "web"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup message + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.log).toHaveBeenCalledWith( + "šŸ“” Starting 2 provider(s): telegram, web", + ); + expect(monitorTelegramProvider).toHaveBeenCalled(); + expect(monitorWebProvider).toHaveBeenCalled(); + }); + + test("shows startup complete message after 1.5s", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + vi.mocked(monitorTelegramProvider).mockImplementation( + () => new Promise(() => {}), // Never resolves + ); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup complete message (1.5s + buffer) + await new Promise((resolve) => setTimeout(resolve, 1600)); + + expect(mockRuntime.log).toHaveBeenCalledWith( + "āœ… All 1 provider(s) active. Listening for messages... (Ctrl+C to stop)", + ); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + // Wait for abort to complete + await Promise.race([promise, new Promise((resolve) => setTimeout(resolve, 500))]); + }); + + test("handles SIGINT gracefully", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + let abortSignal: AbortSignal | undefined; + vi.mocked(monitorTelegramProvider).mockImplementation( + async (_verbose, _runtime, signal) => { + abortSignal = signal; + // Simulate waiting for abort + return new Promise((resolve) => { + signal?.addEventListener("abort", () => resolve(undefined)); + }); + }, + ); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for monitor to start + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Trigger SIGINT + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.log).toHaveBeenCalledWith( + "\nā¹ Stopping all providers...", + ); + expect(mockRuntime.log).toHaveBeenCalledWith("āœ… All providers stopped"); + expect(abortSignal?.aborted).toBe(true); + }); + + test("handles provider errors without crashing other providers", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + const { monitorWebProvider } = await import("../web/auto-reply.js"); + + vi.mocked(monitorTelegramProvider).mockRejectedValue( + new Error("Telegram connection failed"), + ); + vi.mocked(monitorWebProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["telegram", "web"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup and error handling + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(mockRuntime.error).toHaveBeenCalledWith( + "āŒ telegram error: Error: Telegram connection failed", + ); + // Web provider should still have been started + expect(monitorWebProvider).toHaveBeenCalled(); + }); + + test("removes SIGINT handler after completion", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(process.off).toHaveBeenCalledWith("SIGINT", sigintHandler); + }); + + test("passes suppressStartMessage=true to telegram monitor", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait for startup + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(monitorTelegramProvider).toHaveBeenCalledWith( + true, + mockRuntime, + expect.any(AbortSignal), + true, // suppressStartMessage + ); + }); + + test("does not log startup complete if aborted before timeout", async () => { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + vi.mocked(monitorTelegramProvider).mockImplementation( + async (_verbose, _runtime, signal) => { + return new Promise((resolve) => { + signal?.addEventListener("abort", () => resolve(undefined)); + }); + }, + ); + + const providers: Provider[] = ["telegram"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: true, + runtime: mockRuntime, + }); + + // Wait briefly, then abort before 1.5s timeout + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + // Should not have logged startup complete message + const startupCompleteCalls = vi + .mocked(mockRuntime.log) + .mock.calls.filter((call) => + String(call[0]).includes("All 1 provider(s) active"), + ); + expect(startupCompleteCalls.length).toBe(0); + }); + + test("uses default values for twilio interval and lookback", async () => { + const { monitorTwilio } = await import("../twilio/monitor.js"); + vi.mocked(monitorTwilio).mockResolvedValue(undefined); + + const providers: Provider[] = ["twilio"]; + + const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, { + verbose: false, + runtime: mockRuntime, + }); + + // Wait for startup + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Abort the relay + if (sigintHandler) { + sigintHandler(); + } + + await promise; + + expect(monitorTwilio).toHaveBeenCalledWith(10, 5); // defaults + }); +}); diff --git a/src/cli/multi-relay.ts b/src/cli/multi-relay.ts new file mode 100644 index 000000000..aad7a9c02 --- /dev/null +++ b/src/cli/multi-relay.ts @@ -0,0 +1,95 @@ +import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import type { WarelayConfig } from "../config/config.js"; +import type { Provider } from "../utils.js"; +import type { WebMonitorTuning } from "../web/auto-reply.js"; +import type { CliDeps } from "./deps.js"; + +/** + * Run multiple provider monitors concurrently. + * Handles graceful shutdown and per-provider error recovery. + */ +export async function runMultiProviderRelay( + providers: Provider[], + config: WarelayConfig, + deps: CliDeps, + opts: { + verbose?: boolean; + webTuning?: WebMonitorTuning; + twilioInterval?: number; + twilioLookback?: number; + runtime?: RuntimeEnv; + }, +): Promise { + const runtime = opts.runtime ?? defaultRuntime; + const abortController = new AbortController(); + const { signal } = abortController; + + // Setup Ctrl+C handler + const sigintHandler = () => { + runtime.log("\nā¹ Stopping all providers..."); + abortController.abort(); + }; + process.on("SIGINT", sigintHandler); + + runtime.log( + `šŸ“” Starting ${providers.length} provider(s): ${providers.join(", ")}`, + ); + + let startupComplete = false; + + // Spawn monitors concurrently + const monitorPromises = providers.map(async (provider) => { + try { + if (provider === "telegram") { + const { monitorTelegramProvider } = await import( + "../telegram/monitor.js" + ); + await monitorTelegramProvider( + Boolean(opts.verbose), + runtime, + signal, + true, // suppressStartMessage + ); + } else if (provider === "web") { + const { monitorWebProvider } = await import("../web/auto-reply.js"); + await monitorWebProvider( + Boolean(opts.verbose), + undefined, + true, + undefined, + runtime, + signal, + opts.webTuning, + ); + } else if (provider === "twilio") { + const { monitorTwilio } = await import("../twilio/monitor.js"); + const intervalSeconds = opts.twilioInterval ?? 10; + const lookbackMinutes = opts.twilioLookback ?? 5; + + await monitorTwilio(intervalSeconds, lookbackMinutes); + } + } catch (err) { + if (signal.aborted) return; // Graceful shutdown + runtime.error(`āŒ ${provider} error: ${String(err)}`); + // Continue - don't crash other providers + } + }); + + // Wait a brief moment for all providers to initialize, then show summary + setTimeout(() => { + if (!startupComplete && !signal.aborted) { + startupComplete = true; + runtime.log( + `āœ… All ${providers.length} provider(s) active. Listening for messages... (Ctrl+C to stop)`, + ); + } + }, 1500); + + // Wait for all monitors (or abort) + await Promise.allSettled(monitorPromises); + + // Remove SIGINT handler + process.off("SIGINT", sigintHandler); + + runtime.log("āœ… All providers stopped"); +} diff --git a/src/cli/program.ts b/src/cli/program.ts index 7d71c046e..05b185942 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -426,58 +426,7 @@ Examples: const { file: logFile, level: logLevel } = getResolvedLoggerSettings(); defaultRuntime.log(info(`logs: ${logFile} (level ${logLevel})`)); - // Handle --providers for multiple simultaneous relays - if (opts.providers) { - const providers = String(opts.providers).split(',').map(p => p.trim()); - const validProviders = ['web', 'telegram', 'twilio']; - const invalid = providers.filter(p => !validProviders.includes(p)); - if (invalid.length > 0) { - defaultRuntime.error(`Invalid providers: ${invalid.join(', ')}. Must be: web, telegram, twilio`); - defaultRuntime.exit(1); - } - - defaultRuntime.log(info(`Starting relay for providers: ${providers.join(', ')}`)); - - // Start all providers concurrently - const promises = providers.map(async (provider) => { - try { - if (provider === 'telegram') { - await monitorTelegramProvider(Boolean(opts.verbose), defaultRuntime); - } else if (provider === 'web') { - const cfg = loadConfig(); - const webTuning: WebMonitorTuning = {}; - if (opts.webHeartbeat) webTuning.heartbeatSeconds = Number.parseInt(String(opts.webHeartbeat), 10); - if (opts.heartbeatNow) webTuning.replyHeartbeatNow = true; - const reconnect: WebMonitorTuning["reconnect"] = {}; - if (opts.webRetries) reconnect.maxAttempts = Number.parseInt(String(opts.webRetries), 10); - if (opts.webRetryInitial) reconnect.initialMs = Number.parseInt(String(opts.webRetryInitial), 10); - if (opts.webRetryMax) reconnect.maxMs = Number.parseInt(String(opts.webRetryMax), 10); - if (Object.keys(reconnect).length > 0) webTuning.reconnect = reconnect; - - logWebSelfId(defaultRuntime, true); - await monitorWebProvider(Boolean(opts.verbose), undefined, true, undefined, defaultRuntime, undefined, webTuning); - } else if (provider === 'twilio') { - ensureTwilioEnv(); - logTwilioFrom(); - const intervalSeconds = Number.parseInt(opts.interval || "5", 10); - const lookbackMinutes = Number.parseInt(opts.lookback || "5", 10); - await monitorTwilio(intervalSeconds, lookbackMinutes); - } - } catch (err) { - defaultRuntime.error(danger(`${provider} relay failed: ${String(err)}`)); - } - }); - - await Promise.all(promises); - return; - } - - // Original single-provider logic - const providerPref = String(opts.provider ?? "auto"); - if (!["auto", "web", "twilio", "telegram"].includes(providerPref)) { - defaultRuntime.error("--provider must be auto, web, twilio, or telegram"); - defaultRuntime.exit(1); - } + // Parse parameters that are common to single/multi-provider modes const intervalSeconds = Number.parseInt(opts.interval, 10); const lookbackMinutes = Number.parseInt(opts.lookback, 10); const webHeartbeat = @@ -553,6 +502,46 @@ Examples: webTuning.reconnect = reconnect; } + // Handle --providers for multiple simultaneous relays + if (opts.providers) { + const providerList = String(opts.providers) + .split(",") + .map((p) => p.trim()) + .map((p) => (p === "auto" ? "auto" : p)) as (Provider | "auto")[]; + + // Import selectProviders and runMultiProviderRelay + const { selectProviders } = await import("../web/session.js"); + const { runMultiProviderRelay } = await import("./multi-relay.js"); + + const providers = await selectProviders(providerList); + + if (providers.length === 0) { + defaultRuntime.error( + "No authenticated providers found. Use --provider or authenticate providers first.", + ); + defaultRuntime.exit(1); + } + + const cfg = loadConfig(); + const deps = createDefaultDeps(); + + await runMultiProviderRelay(providers, cfg, deps, { + verbose: Boolean(opts.verbose), + webTuning, + twilioInterval: intervalSeconds, + twilioLookback: lookbackMinutes, + runtime: defaultRuntime, + }); + return; + } + + // Single-provider relay logic + const providerPref = String(opts.provider ?? "auto"); + if (!["auto", "web", "twilio", "telegram"].includes(providerPref)) { + defaultRuntime.error("--provider must be auto, web, twilio, or telegram"); + defaultRuntime.exit(1); + } + // Handle telegram explicitly (not in auto picker) if (providerPref === "telegram") { await monitorTelegramProvider(Boolean(opts.verbose), defaultRuntime); diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index 0743d1503..bdf6d10b9 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -3,22 +3,22 @@ import { describe, expect, it } from "vitest"; import { deriveSessionKey } from "./sessions.js"; describe("sessions", () => { - it("returns normalized per-sender key", () => { - expect(deriveSessionKey("per-sender", { From: "whatsapp:+1555" })).toBe( + it("returns normalized per-sender key", async () => { + expect(await deriveSessionKey("per-sender", { From: "whatsapp:+1555" })).toBe( "+1555", ); }); - it("falls back to unknown when sender missing", () => { - expect(deriveSessionKey("per-sender", {})).toBe("unknown"); + it("falls back to unknown when sender missing", async () => { + expect(await deriveSessionKey("per-sender", {})).toBe("unknown"); }); - it("global scope returns global", () => { - expect(deriveSessionKey("global", { From: "+1" })).toBe("global"); + it("global scope returns global", async () => { + expect(await deriveSessionKey("global", { From: "+1" })).toBe("global"); }); - it("keeps group chats distinct", () => { - expect(deriveSessionKey("per-sender", { From: "12345-678@g.us" })).toBe( + it("keeps group chats distinct", async () => { + expect(await deriveSessionKey("per-sender", { From: "12345-678@g.us" })).toBe( "group:12345-678@g.us", ); }); diff --git a/src/web/session.ts b/src/web/session.ts index 45ecb482f..a1d5d486f 100644 --- a/src/web/session.ts +++ b/src/web/session.ts @@ -227,3 +227,84 @@ export async function pickProvider(pref: Provider | "auto"): Promise { return "twilio"; } + +/** + * Select providers for multi-provider relay. + * Validates authentication and filters out unavailable providers. + */ +export async function selectProviders( + prefs: (Provider | "auto")[], +): Promise { + const skipped: string[] = []; + + // If 'auto' in list, expand to all authenticated providers + if (prefs.includes("auto")) { + const available: Provider[] = []; + + // Check web + if (await webAuthExists()) { + available.push("web"); + } else { + skipped.push( + "web (not authenticated - run: warelay login --provider web)", + ); + } + + // Check telegram + if (await telegramAuthExists()) { + available.push("telegram"); + } else { + skipped.push( + "telegram (not authenticated - run: warelay login --provider telegram)", + ); + } + + // Note: twilio not heavily used in this branch + + if (available.length > 0 && skipped.length > 0) { + console.log( + `ā„¹ļø Auto-selected ${available.length} provider(s), skipped ${skipped.length}:`, + ); + for (const skip of skipped) { + console.log(` • ${skip}`); + } + } + + return available; + } + + // Explicit provider list - validate each one + const available: Provider[] = []; + for (const pref of prefs) { + if (pref === "auto") continue; // Already handled above + + if (pref === "web") { + if (await webAuthExists()) { + available.push("web"); + } else { + skipped.push( + "web (not authenticated - run: warelay login --provider web)", + ); + } + } else if (pref === "telegram") { + if (await telegramAuthExists()) { + available.push("telegram"); + } else { + skipped.push( + "telegram (not authenticated - run: warelay login --provider telegram)", + ); + } + } else if (pref === "twilio") { + skipped.push("twilio (not heavily used in this branch)"); + } + } + + if (skipped.length > 0) { + console.log(`ā„¹ļø Skipped ${skipped.length} provider(s):`); + for (const skip of skipped) { + console.log(` • ${skip}`); + } + } + + return available; +}