feat: add multi-provider concurrent relay support

Implement --providers option to run multiple messaging providers
simultaneously in a single relay command. This enables seamless
session sharing across WhatsApp, Telegram, and Twilio.

Changes:
- Add src/cli/multi-relay.ts: Core multi-provider orchestration
  - Concurrent provider execution with Promise.allSettled
  - Graceful shutdown handling with AbortController
  - Unified startup and shutdown messaging
  - Per-provider error isolation

- Add selectProviders() to src/web/session.ts
  - Validates provider authentication before starting
  - Expands "auto" to all authenticated providers
  - User-friendly error messages for missing auth

- Update src/cli/program.ts relay command
  - Add --providers option (comma-separated list)
  - Refactor to build webTuning before multi-provider check
  - Pass suppressStartMessage=true to prevent duplicate logs

- Fix src/config/sessions.test.ts
  - Update tests to await async deriveSessionKey()

Tests:
- Add 11 new tests in src/cli/multi-relay.test.ts
  - Provider startup and shutdown
  - Concurrent execution
  - SIGINT handling
  - Error isolation
  - Console log alignment
- All 480 tests passing

Usage:
  warelay relay --providers web,telegram
  warelay relay --providers auto --verbose
This commit is contained in:
Arne Moor 2025-12-05 23:26:30 +01:00
parent a9f3527c4c
commit 15a4631a97
5 changed files with 643 additions and 60 deletions

418
src/cli/multi-relay.test.ts Normal file
View File

@ -0,0 +1,418 @@
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import type { Provider } from "../utils.js";
import type { RuntimeEnv } from "../runtime.js";
import type { WarelayConfig } from "../config/config.js";
import type { CliDeps } from "./deps.js";
import { runMultiProviderRelay } from "./multi-relay.js";
// Mock the monitor modules
vi.mock("../telegram/monitor.js", () => ({
monitorTelegramProvider: vi.fn(),
}));
vi.mock("../web/auto-reply.js", () => ({
monitorWebProvider: vi.fn(),
}));
vi.mock("../twilio/monitor.js", () => ({
monitorTwilio: vi.fn(),
}));
describe("runMultiProviderRelay", () => {
let mockRuntime: RuntimeEnv;
let mockConfig: WarelayConfig;
let mockDeps: CliDeps;
let originalProcessOn: typeof process.on;
let originalProcessOff: typeof process.off;
let sigintHandler: ((...args: unknown[]) => void) | undefined;
beforeEach(() => {
mockRuntime = {
log: vi.fn(),
error: vi.fn(),
exit: vi.fn(),
};
mockConfig = {
telegram: { allowFrom: [] },
};
mockDeps = {} as CliDeps;
// Capture SIGINT handler
originalProcessOn = process.on;
originalProcessOff = process.off;
process.on = vi.fn((event: string, handler: (...args: unknown[]) => void) => {
if (event === "SIGINT") {
sigintHandler = handler;
}
return process;
}) as typeof process.on;
process.off = vi.fn() as typeof process.off;
vi.clearAllMocks();
});
afterEach(() => {
process.on = originalProcessOn;
process.off = originalProcessOff;
sigintHandler = undefined;
});
test("starts telegram provider and logs startup message", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup message
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.log).toHaveBeenCalledWith(
"📡 Starting 1 provider(s): telegram",
);
expect(monitorTelegramProvider).toHaveBeenCalledWith(
true,
mockRuntime,
expect.any(AbortSignal),
true,
);
});
test("starts web provider with verbose and webTuning options", async () => {
const { monitorWebProvider } = await import("../web/auto-reply.js");
vi.mocked(monitorWebProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["web"];
const webTuning = { heartbeatSeconds: 60 };
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
webTuning,
runtime: mockRuntime,
});
// Wait for startup message
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.log).toHaveBeenCalledWith(
"📡 Starting 1 provider(s): web",
);
expect(monitorWebProvider).toHaveBeenCalledWith(
true,
undefined,
true,
undefined,
mockRuntime,
expect.any(AbortSignal),
webTuning,
);
});
test("starts twilio provider with custom interval and lookback", async () => {
const { monitorTwilio } = await import("../twilio/monitor.js");
vi.mocked(monitorTwilio).mockResolvedValue(undefined);
const providers: Provider[] = ["twilio"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: false,
twilioInterval: 30,
twilioLookback: 10,
runtime: mockRuntime,
});
// Wait for startup message
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.log).toHaveBeenCalledWith(
"📡 Starting 1 provider(s): twilio",
);
expect(monitorTwilio).toHaveBeenCalledWith(30, 10);
});
test("starts multiple providers concurrently", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
const { monitorWebProvider } = await import("../web/auto-reply.js");
vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined);
vi.mocked(monitorWebProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["telegram", "web"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup message
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.log).toHaveBeenCalledWith(
"📡 Starting 2 provider(s): telegram, web",
);
expect(monitorTelegramProvider).toHaveBeenCalled();
expect(monitorWebProvider).toHaveBeenCalled();
});
test("shows startup complete message after 1.5s", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
vi.mocked(monitorTelegramProvider).mockImplementation(
() => new Promise(() => {}), // Never resolves
);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup complete message (1.5s + buffer)
await new Promise((resolve) => setTimeout(resolve, 1600));
expect(mockRuntime.log).toHaveBeenCalledWith(
"✅ All 1 provider(s) active. Listening for messages... (Ctrl+C to stop)",
);
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
// Wait for abort to complete
await Promise.race([promise, new Promise((resolve) => setTimeout(resolve, 500))]);
});
test("handles SIGINT gracefully", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
let abortSignal: AbortSignal | undefined;
vi.mocked(monitorTelegramProvider).mockImplementation(
async (_verbose, _runtime, signal) => {
abortSignal = signal;
// Simulate waiting for abort
return new Promise((resolve) => {
signal?.addEventListener("abort", () => resolve(undefined));
});
},
);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for monitor to start
await new Promise((resolve) => setTimeout(resolve, 100));
// Trigger SIGINT
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.log).toHaveBeenCalledWith(
"\n⏹ Stopping all providers...",
);
expect(mockRuntime.log).toHaveBeenCalledWith("✅ All providers stopped");
expect(abortSignal?.aborted).toBe(true);
});
test("handles provider errors without crashing other providers", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
const { monitorWebProvider } = await import("../web/auto-reply.js");
vi.mocked(monitorTelegramProvider).mockRejectedValue(
new Error("Telegram connection failed"),
);
vi.mocked(monitorWebProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["telegram", "web"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup and error handling
await new Promise((resolve) => setTimeout(resolve, 200));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(mockRuntime.error).toHaveBeenCalledWith(
"❌ telegram error: Error: Telegram connection failed",
);
// Web provider should still have been started
expect(monitorWebProvider).toHaveBeenCalled();
});
test("removes SIGINT handler after completion", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(process.off).toHaveBeenCalledWith("SIGINT", sigintHandler);
});
test("passes suppressStartMessage=true to telegram monitor", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
vi.mocked(monitorTelegramProvider).mockResolvedValue(undefined);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait for startup
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(monitorTelegramProvider).toHaveBeenCalledWith(
true,
mockRuntime,
expect.any(AbortSignal),
true, // suppressStartMessage
);
});
test("does not log startup complete if aborted before timeout", async () => {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
vi.mocked(monitorTelegramProvider).mockImplementation(
async (_verbose, _runtime, signal) => {
return new Promise((resolve) => {
signal?.addEventListener("abort", () => resolve(undefined));
});
},
);
const providers: Provider[] = ["telegram"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: true,
runtime: mockRuntime,
});
// Wait briefly, then abort before 1.5s timeout
await new Promise((resolve) => setTimeout(resolve, 500));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
// Should not have logged startup complete message
const startupCompleteCalls = vi
.mocked(mockRuntime.log)
.mock.calls.filter((call) =>
String(call[0]).includes("All 1 provider(s) active"),
);
expect(startupCompleteCalls.length).toBe(0);
});
test("uses default values for twilio interval and lookback", async () => {
const { monitorTwilio } = await import("../twilio/monitor.js");
vi.mocked(monitorTwilio).mockResolvedValue(undefined);
const providers: Provider[] = ["twilio"];
const promise = runMultiProviderRelay(providers, mockConfig, mockDeps, {
verbose: false,
runtime: mockRuntime,
});
// Wait for startup
await new Promise((resolve) => setTimeout(resolve, 100));
// Abort the relay
if (sigintHandler) {
sigintHandler();
}
await promise;
expect(monitorTwilio).toHaveBeenCalledWith(10, 5); // defaults
});
});

95
src/cli/multi-relay.ts Normal file
View File

@ -0,0 +1,95 @@
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import type { WarelayConfig } from "../config/config.js";
import type { Provider } from "../utils.js";
import type { WebMonitorTuning } from "../web/auto-reply.js";
import type { CliDeps } from "./deps.js";
/**
* Run multiple provider monitors concurrently.
* Handles graceful shutdown and per-provider error recovery.
*/
export async function runMultiProviderRelay(
providers: Provider[],
config: WarelayConfig,
deps: CliDeps,
opts: {
verbose?: boolean;
webTuning?: WebMonitorTuning;
twilioInterval?: number;
twilioLookback?: number;
runtime?: RuntimeEnv;
},
): Promise<void> {
const runtime = opts.runtime ?? defaultRuntime;
const abortController = new AbortController();
const { signal } = abortController;
// Setup Ctrl+C handler
const sigintHandler = () => {
runtime.log("\n⏹ Stopping all providers...");
abortController.abort();
};
process.on("SIGINT", sigintHandler);
runtime.log(
`📡 Starting ${providers.length} provider(s): ${providers.join(", ")}`,
);
let startupComplete = false;
// Spawn monitors concurrently
const monitorPromises = providers.map(async (provider) => {
try {
if (provider === "telegram") {
const { monitorTelegramProvider } = await import(
"../telegram/monitor.js"
);
await monitorTelegramProvider(
Boolean(opts.verbose),
runtime,
signal,
true, // suppressStartMessage
);
} else if (provider === "web") {
const { monitorWebProvider } = await import("../web/auto-reply.js");
await monitorWebProvider(
Boolean(opts.verbose),
undefined,
true,
undefined,
runtime,
signal,
opts.webTuning,
);
} else if (provider === "twilio") {
const { monitorTwilio } = await import("../twilio/monitor.js");
const intervalSeconds = opts.twilioInterval ?? 10;
const lookbackMinutes = opts.twilioLookback ?? 5;
await monitorTwilio(intervalSeconds, lookbackMinutes);
}
} catch (err) {
if (signal.aborted) return; // Graceful shutdown
runtime.error(`${provider} error: ${String(err)}`);
// Continue - don't crash other providers
}
});
// Wait a brief moment for all providers to initialize, then show summary
setTimeout(() => {
if (!startupComplete && !signal.aborted) {
startupComplete = true;
runtime.log(
`✅ All ${providers.length} provider(s) active. Listening for messages... (Ctrl+C to stop)`,
);
}
}, 1500);
// Wait for all monitors (or abort)
await Promise.allSettled(monitorPromises);
// Remove SIGINT handler
process.off("SIGINT", sigintHandler);
runtime.log("✅ All providers stopped");
}

View File

@ -426,58 +426,7 @@ Examples:
const { file: logFile, level: logLevel } = getResolvedLoggerSettings(); const { file: logFile, level: logLevel } = getResolvedLoggerSettings();
defaultRuntime.log(info(`logs: ${logFile} (level ${logLevel})`)); defaultRuntime.log(info(`logs: ${logFile} (level ${logLevel})`));
// Handle --providers for multiple simultaneous relays // Parse parameters that are common to single/multi-provider modes
if (opts.providers) {
const providers = String(opts.providers).split(',').map(p => p.trim());
const validProviders = ['web', 'telegram', 'twilio'];
const invalid = providers.filter(p => !validProviders.includes(p));
if (invalid.length > 0) {
defaultRuntime.error(`Invalid providers: ${invalid.join(', ')}. Must be: web, telegram, twilio`);
defaultRuntime.exit(1);
}
defaultRuntime.log(info(`Starting relay for providers: ${providers.join(', ')}`));
// Start all providers concurrently
const promises = providers.map(async (provider) => {
try {
if (provider === 'telegram') {
await monitorTelegramProvider(Boolean(opts.verbose), defaultRuntime);
} else if (provider === 'web') {
const cfg = loadConfig();
const webTuning: WebMonitorTuning = {};
if (opts.webHeartbeat) webTuning.heartbeatSeconds = Number.parseInt(String(opts.webHeartbeat), 10);
if (opts.heartbeatNow) webTuning.replyHeartbeatNow = true;
const reconnect: WebMonitorTuning["reconnect"] = {};
if (opts.webRetries) reconnect.maxAttempts = Number.parseInt(String(opts.webRetries), 10);
if (opts.webRetryInitial) reconnect.initialMs = Number.parseInt(String(opts.webRetryInitial), 10);
if (opts.webRetryMax) reconnect.maxMs = Number.parseInt(String(opts.webRetryMax), 10);
if (Object.keys(reconnect).length > 0) webTuning.reconnect = reconnect;
logWebSelfId(defaultRuntime, true);
await monitorWebProvider(Boolean(opts.verbose), undefined, true, undefined, defaultRuntime, undefined, webTuning);
} else if (provider === 'twilio') {
ensureTwilioEnv();
logTwilioFrom();
const intervalSeconds = Number.parseInt(opts.interval || "5", 10);
const lookbackMinutes = Number.parseInt(opts.lookback || "5", 10);
await monitorTwilio(intervalSeconds, lookbackMinutes);
}
} catch (err) {
defaultRuntime.error(danger(`${provider} relay failed: ${String(err)}`));
}
});
await Promise.all(promises);
return;
}
// Original single-provider logic
const providerPref = String(opts.provider ?? "auto");
if (!["auto", "web", "twilio", "telegram"].includes(providerPref)) {
defaultRuntime.error("--provider must be auto, web, twilio, or telegram");
defaultRuntime.exit(1);
}
const intervalSeconds = Number.parseInt(opts.interval, 10); const intervalSeconds = Number.parseInt(opts.interval, 10);
const lookbackMinutes = Number.parseInt(opts.lookback, 10); const lookbackMinutes = Number.parseInt(opts.lookback, 10);
const webHeartbeat = const webHeartbeat =
@ -553,6 +502,46 @@ Examples:
webTuning.reconnect = reconnect; webTuning.reconnect = reconnect;
} }
// Handle --providers for multiple simultaneous relays
if (opts.providers) {
const providerList = String(opts.providers)
.split(",")
.map((p) => p.trim())
.map((p) => (p === "auto" ? "auto" : p)) as (Provider | "auto")[];
// Import selectProviders and runMultiProviderRelay
const { selectProviders } = await import("../web/session.js");
const { runMultiProviderRelay } = await import("./multi-relay.js");
const providers = await selectProviders(providerList);
if (providers.length === 0) {
defaultRuntime.error(
"No authenticated providers found. Use --provider or authenticate providers first.",
);
defaultRuntime.exit(1);
}
const cfg = loadConfig();
const deps = createDefaultDeps();
await runMultiProviderRelay(providers, cfg, deps, {
verbose: Boolean(opts.verbose),
webTuning,
twilioInterval: intervalSeconds,
twilioLookback: lookbackMinutes,
runtime: defaultRuntime,
});
return;
}
// Single-provider relay logic
const providerPref = String(opts.provider ?? "auto");
if (!["auto", "web", "twilio", "telegram"].includes(providerPref)) {
defaultRuntime.error("--provider must be auto, web, twilio, or telegram");
defaultRuntime.exit(1);
}
// Handle telegram explicitly (not in auto picker) // Handle telegram explicitly (not in auto picker)
if (providerPref === "telegram") { if (providerPref === "telegram") {
await monitorTelegramProvider(Boolean(opts.verbose), defaultRuntime); await monitorTelegramProvider(Boolean(opts.verbose), defaultRuntime);

View File

@ -3,22 +3,22 @@ import { describe, expect, it } from "vitest";
import { deriveSessionKey } from "./sessions.js"; import { deriveSessionKey } from "./sessions.js";
describe("sessions", () => { describe("sessions", () => {
it("returns normalized per-sender key", () => { it("returns normalized per-sender key", async () => {
expect(deriveSessionKey("per-sender", { From: "whatsapp:+1555" })).toBe( expect(await deriveSessionKey("per-sender", { From: "whatsapp:+1555" })).toBe(
"+1555", "+1555",
); );
}); });
it("falls back to unknown when sender missing", () => { it("falls back to unknown when sender missing", async () => {
expect(deriveSessionKey("per-sender", {})).toBe("unknown"); expect(await deriveSessionKey("per-sender", {})).toBe("unknown");
}); });
it("global scope returns global", () => { it("global scope returns global", async () => {
expect(deriveSessionKey("global", { From: "+1" })).toBe("global"); expect(await deriveSessionKey("global", { From: "+1" })).toBe("global");
}); });
it("keeps group chats distinct", () => { it("keeps group chats distinct", async () => {
expect(deriveSessionKey("per-sender", { From: "12345-678@g.us" })).toBe( expect(await deriveSessionKey("per-sender", { From: "12345-678@g.us" })).toBe(
"group:12345-678@g.us", "group:12345-678@g.us",
); );
}); });

View File

@ -227,3 +227,84 @@ export async function pickProvider(pref: Provider | "auto"): Promise<Provider> {
return "twilio"; return "twilio";
} }
/**
* Select providers for multi-provider relay.
* Validates authentication and filters out unavailable providers.
*/
export async function selectProviders(
prefs: (Provider | "auto")[],
): Promise<Provider[]> {
const skipped: string[] = [];
// If 'auto' in list, expand to all authenticated providers
if (prefs.includes("auto")) {
const available: Provider[] = [];
// Check web
if (await webAuthExists()) {
available.push("web");
} else {
skipped.push(
"web (not authenticated - run: warelay login --provider web)",
);
}
// Check telegram
if (await telegramAuthExists()) {
available.push("telegram");
} else {
skipped.push(
"telegram (not authenticated - run: warelay login --provider telegram)",
);
}
// Note: twilio not heavily used in this branch
if (available.length > 0 && skipped.length > 0) {
console.log(
` Auto-selected ${available.length} provider(s), skipped ${skipped.length}:`,
);
for (const skip of skipped) {
console.log(`${skip}`);
}
}
return available;
}
// Explicit provider list - validate each one
const available: Provider[] = [];
for (const pref of prefs) {
if (pref === "auto") continue; // Already handled above
if (pref === "web") {
if (await webAuthExists()) {
available.push("web");
} else {
skipped.push(
"web (not authenticated - run: warelay login --provider web)",
);
}
} else if (pref === "telegram") {
if (await telegramAuthExists()) {
available.push("telegram");
} else {
skipped.push(
"telegram (not authenticated - run: warelay login --provider telegram)",
);
}
} else if (pref === "twilio") {
skipped.push("twilio (not heavily used in this branch)");
}
}
if (skipped.length > 0) {
console.log(` Skipped ${skipped.length} provider(s):`);
for (const skip of skipped) {
console.log(`${skip}`);
}
}
return available;
}