From c5ab442f46f336daeb1949fe8405f072494925e3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 04:29:17 +0000 Subject: [PATCH 1/5] Fix empty result JSON dump and missing heartbeat prefix Bug fixes: - Empty result field handling: Changed truthy check to explicit type check (`typeof parsed?.text === "string"`) in command-reply.ts. Previously, Claude CLI returning `result: ""` would cause raw JSON to be sent to WhatsApp. - Response prefix on heartbeat: Apply `responsePrefix` to heartbeat alert messages in runReplyHeartbeat, matching behavior of regular message handler. --- CHANGELOG.md | 4 ++++ src/auto-reply/command-reply.ts | 2 +- src/web/auto-reply.ts | 9 ++++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba6829ada..c4a655c12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## 1.2.3 — Unreleased +### Bug Fixes +- **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. +- **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`. + ### Changes - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. - **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter. diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 914f25d96..4d8b7141f 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -230,7 +230,7 @@ export async function runCommandReply( `Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`, ); } - if (parsed?.text) { + if (typeof parsed?.text === "string") { logVerbose( `Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`, ); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 2b1c358ff..858a3f7a8 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -862,9 +862,16 @@ export async function monitorWebProvider( return; } + // Apply response prefix if configured (same as regular messages) + let finalText = stripped.text; + const responsePrefix = cfg.inbound?.responsePrefix; + if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) { + finalText = `${responsePrefix} ${finalText}`; + } + const cleanedReply: ReplyPayload = { ...replyResult, - text: stripped.text, + text: finalText, }; await deliverWebReply({ From d107b79c6319944628f9fd56d797342ffff68005 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 05:54:31 +0000 Subject: [PATCH 2/5] Fix test corrupting production sessions.json The test 'falls back to most recent session when no to is provided' was using resolveStorePath() which returns the real ~/.warelay/sessions.json. This overwrote production session data with test values, causing session fragmentation issues. Changed to use a temp directory like other tests. --- src/auto-reply/command-reply.test.ts | 75 ++++++++++++ src/auto-reply/command-reply.ts | 13 +- src/web/auto-reply.test.ts | 171 ++++++++++++++++++++++++--- src/web/monitor-inbox.test.ts | 160 +++++++++++++++++++++++-- src/web/test-helpers.ts | 31 +++-- 5 files changed, 412 insertions(+), 38 deletions(-) diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index fb47c0414..db1e64767 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -292,4 +292,79 @@ describe("runCommandReply", () => { expect(meta.queuedMs).toBe(25); expect(meta.queuedAhead).toBe(2); }); + + it("handles empty result string without dumping raw JSON", async () => { + // Bug fix: Claude CLI returning {"result": ""} should not send raw JSON to WhatsApp + // The fix changed from truthy check to explicit typeof check + const runner = makeRunner({ + stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}', + }); + const { payload } = await runCommandReply({ + reply: { + mode: "command", + command: ["claude", "{{Body}}"], + claudeOutputFormat: "json", + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: runner, + enqueue: enqueueImmediate, + }); + // Should NOT contain raw JSON - empty result should produce fallback message + expect(payload?.text).not.toContain('{"result"'); + expect(payload?.text).toContain("command produced no output"); + }); + + it("handles empty text string in Claude JSON", async () => { + const runner = makeRunner({ + stdout: '{"text":"","duration_ms":50}', + }); + const { payload } = await runCommandReply({ + reply: { + mode: "command", + command: ["claude", "{{Body}}"], + claudeOutputFormat: "json", + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: runner, + enqueue: enqueueImmediate, + }); + // Empty text should produce fallback message, not raw JSON + expect(payload?.text).not.toContain('{"text"'); + expect(payload?.text).toContain("command produced no output"); + }); + + it("returns actual text when result is non-empty", async () => { + const runner = makeRunner({ + stdout: '{"result":"hello world","duration_ms":50}', + }); + const { payload } = await runCommandReply({ + reply: { + mode: "command", + command: ["claude", "{{Body}}"], + claudeOutputFormat: "json", + }, + templatingCtx: noopTemplateCtx, + sendSystemOnce: false, + isNewSession: true, + isFirstTurnInSession: true, + systemSent: false, + timeoutMs: 1000, + timeoutSeconds: 1, + commandRunner: runner, + enqueue: enqueueImmediate, + }); + expect(payload?.text).toBe("hello world"); + }); }); diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 4d8b7141f..71e564948 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -259,8 +259,11 @@ export async function runCommandReply( console.error( `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, ); + // Include any partial output or stderr in error message + const partialOut = trimmed ? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` : ""; + const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; return { - payload: undefined, + payload: { text: errorText }, meta: { durationMs: Date.now() - started, queuedMs, @@ -278,8 +281,9 @@ export async function runCommandReply( console.error( `Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`, ); + const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`; return { - payload: undefined, + payload: { text: errorText }, meta: { durationMs: Date.now() - started, queuedMs, @@ -379,8 +383,11 @@ export async function runCommandReply( }; } logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`); + // Send error message to user so they know the command failed + const errMsg = err instanceof Error ? err.message : String(err); + const errorText = `⚠️ Command failed: ${errMsg}`; return { - payload: undefined, + payload: { text: errorText }, meta: { durationMs: elapsed, queuedMs, diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 1918b5619..ff59e05d9 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -1,3 +1,10 @@ +// Import test-helpers FIRST to set up mocks before other imports +import { + resetBaileysMocks, + resetLoadConfigMock, + setLoadConfigMock, +} from "./test-helpers.js"; + import crypto from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; @@ -18,11 +25,6 @@ import { stripHeartbeatToken, } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; -import { - resetBaileysMocks, - resetLoadConfigMock, - setLoadConfigMock, -} from "./test-helpers.js"; describe("heartbeat helpers", () => { it("strips heartbeat token and skips when only token", () => { @@ -186,6 +188,11 @@ describe("runWebHeartbeatOnce", () => { }); it("falls back to most recent session when no to is provided", async () => { + // Use temp directory to avoid corrupting production sessions.json + const tmpDir = await fs.mkdtemp( + path.join(os.tmpdir(), "warelay-fallback-session-"), + ); + const storePath = path.join(tmpDir, "sessions.json"); const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); @@ -196,15 +203,11 @@ describe("runWebHeartbeatOnce", () => { "+1222": { sessionId: "s1", updatedAt: now - 1000 }, "+1333": { sessionId: "s2", updatedAt: now }, }; - const storePath = resolveStorePath(); - await fs.mkdir(resolveStorePath().replace("sessions.json", ""), { - recursive: true, - }); await fs.writeFile(storePath, JSON.stringify(store)); setLoadConfigMock({ inbound: { allowFrom: ["+1999"], - reply: { mode: "command", session: {} }, + reply: { mode: "command", session: { store: storePath } }, }, }); await runWebHeartbeatOnce({ @@ -947,6 +950,16 @@ describe("web auto-reply", () => { }); it("prefixes body with same-phone marker when from === to", async () => { + // Enable messagePrefix for same-phone mode testing + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["*"], + messagePrefix: "[same-phone]", + responsePrefix: undefined, + timestampPrefix: false, + }, + })); + let capturedOnMessage: | ((msg: import("./inbound.js").WebInboundMessage) => Promise) | undefined; @@ -974,12 +987,11 @@ describe("web auto-reply", () => { sendMedia: vi.fn(), }); - // The resolver should receive a prefixed body (the exact marker depends on config) - // Key test: body should start with some marker and end with original message + // The resolver should receive a prefixed body with the configured marker const callArg = resolver.mock.calls[0]?.[0] as { Body?: string }; expect(callArg?.Body).toBeDefined(); - expect(callArg?.Body).toMatch(/^\[.*\] hello$/); - expect(callArg?.Body).not.toBe("hello"); // Should be prefixed + expect(callArg?.Body).toBe("[same-phone] hello"); + resetLoadConfigMock(); }); it("does not prefix body when from !== to", async () => { @@ -1014,4 +1026,135 @@ describe("web auto-reply", () => { const callArg = resolver.mock.calls[0]?.[0] as { Body?: string }; expect(callArg?.Body).toBe("hello"); }); + + it("applies responsePrefix to regular replies", async () => { + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: "🦞", + timestampPrefix: false, + }, + })); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const reply = vi.fn(); + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + const resolver = vi.fn().mockResolvedValue({ text: "hello there" }); + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + body: "hi", + from: "+1555", + to: "+2666", + id: "msg1", + sendComposing: vi.fn(), + reply, + sendMedia: vi.fn(), + }); + + // Reply should have responsePrefix prepended + expect(reply).toHaveBeenCalledWith("🦞 hello there"); + resetLoadConfigMock(); + }); + + it("skips responsePrefix for HEARTBEAT_OK responses", async () => { + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: "🦞", + timestampPrefix: false, + }, + })); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const reply = vi.fn(); + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + // Resolver returns exact HEARTBEAT_OK + const resolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN }); + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + body: "test", + from: "+1555", + to: "+2666", + id: "msg1", + sendComposing: vi.fn(), + reply, + sendMedia: vi.fn(), + }); + + // HEARTBEAT_OK should NOT have prefix - warelay needs exact match + expect(reply).toHaveBeenCalledWith(HEARTBEAT_TOKEN); + resetLoadConfigMock(); + }); + + it("does not double-prefix if responsePrefix already present", async () => { + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: "🦞", + timestampPrefix: false, + }, + })); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const reply = vi.fn(); + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + // Resolver returns text that already has prefix + const resolver = vi.fn().mockResolvedValue({ text: "🦞 already prefixed" }); + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + await capturedOnMessage?.({ + body: "test", + from: "+1555", + to: "+2666", + id: "msg1", + sendComposing: vi.fn(), + reply, + sendMedia: vi.fn(), + }); + + // Should not double-prefix + expect(reply).toHaveBeenCalledWith("🦞 already prefixed"); + resetLoadConfigMock(); + }); }); diff --git a/src/web/monitor-inbox.test.ts b/src/web/monitor-inbox.test.ts index 10e312691..b696d2513 100644 --- a/src/web/monitor-inbox.test.ts +++ b/src/web/monitor-inbox.test.ts @@ -9,15 +9,17 @@ vi.mock("../media/store.js", () => ({ }), })); +const mockLoadConfig = vi.fn().mockReturnValue({ + inbound: { + allowFrom: ["*"], // Allow all in tests + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, +}); + vi.mock("../config/config.js", () => ({ - loadConfig: vi.fn().mockReturnValue({ - inbound: { - allowFrom: ["*"], // Allow all in tests - messagePrefix: undefined, - responsePrefix: undefined, - timestampPrefix: false, - }, - }), + loadConfig: () => mockLoadConfig(), })); vi.mock("./session.js", () => { @@ -227,4 +229,146 @@ describe("web monitor inbox", () => { ]); await listener.close(); }); + + it("blocks messages from unauthorized senders not in allowFrom", async () => { + // Test for auto-recovery fix: early allowFrom filtering prevents Bad MAC errors + // from unauthorized senders corrupting sessions + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["+111"], // Only allow +111 + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + const onMessage = vi.fn(); + const listener = await monitorWebInbox({ verbose: false, onMessage }); + const sock = await createWaSocket(); + + // Message from unauthorized sender +999 (not in allowFrom) + const upsert = { + type: "notify", + messages: [ + { + key: { id: "unauth1", fromMe: false, remoteJid: "999@s.whatsapp.net" }, + message: { conversation: "unauthorized message" }, + messageTimestamp: 1_700_000_000, + }, + ], + }; + + sock.ev.emit("messages.upsert", upsert); + await new Promise((resolve) => setImmediate(resolve)); + + // Should NOT call onMessage for unauthorized senders + expect(onMessage).not.toHaveBeenCalled(); + + // Reset mock for other tests + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + await listener.close(); + }); + + it("allows messages from senders in allowFrom list", async () => { + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["+111", "+999"], // Allow +999 + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + const onMessage = vi.fn(); + const listener = await monitorWebInbox({ verbose: false, onMessage }); + const sock = await createWaSocket(); + + const upsert = { + type: "notify", + messages: [ + { + key: { id: "auth1", fromMe: false, remoteJid: "999@s.whatsapp.net" }, + message: { conversation: "authorized message" }, + messageTimestamp: 1_700_000_000, + }, + ], + }; + + sock.ev.emit("messages.upsert", upsert); + await new Promise((resolve) => setImmediate(resolve)); + + // Should call onMessage for authorized senders + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ body: "authorized message", from: "+999" }), + ); + + // Reset mock for other tests + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + await listener.close(); + }); + + it("allows same-phone messages even if not in allowFrom", async () => { + // Same-phone mode: when from === selfJid, should always be allowed + // This allows users to message themselves even with restrictive allowFrom + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["+111"], // Only allow +111, but self is +123 + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + const onMessage = vi.fn(); + const listener = await monitorWebInbox({ verbose: false, onMessage }); + const sock = await createWaSocket(); + + // Message from self (sock.user.id is "123@s.whatsapp.net" in mock) + const upsert = { + type: "notify", + messages: [ + { + key: { id: "self1", fromMe: false, remoteJid: "123@s.whatsapp.net" }, + message: { conversation: "self message" }, + messageTimestamp: 1_700_000_000, + }, + ], + }; + + sock.ev.emit("messages.upsert", upsert); + await new Promise((resolve) => setImmediate(resolve)); + + // Should allow self-messages even if not in allowFrom + expect(onMessage).toHaveBeenCalledWith( + expect.objectContaining({ body: "self message", from: "+123" }), + ); + + // Reset mock for other tests + mockLoadConfig.mockReturnValue({ + inbound: { + allowFrom: ["*"], + messagePrefix: undefined, + responsePrefix: undefined, + timestampPrefix: false, + }, + }); + + await listener.close(); + }); }); diff --git a/src/web/test-helpers.ts b/src/web/test-helpers.ts index 8a0b02195..923fa19f3 100644 --- a/src/web/test-helpers.ts +++ b/src/web/test-helpers.ts @@ -3,32 +3,37 @@ import { vi } from "vitest"; import type { MockBaileysSocket } from "../../test/mocks/baileys.js"; import { createMockBaileys } from "../../test/mocks/baileys.js"; -let loadConfigMock: () => unknown = () => ({ +// Use globalThis to store the mock config so it survives vi.mock hoisting +const CONFIG_KEY = Symbol.for("warelay:testConfigMock"); +const DEFAULT_CONFIG = { inbound: { allowFrom: ["*"], // Allow all in tests by default messagePrefix: undefined, // No message prefix in tests responsePrefix: undefined, // No response prefix in tests timestampPrefix: false, // No timestamp in tests }, -}); +}; -export function setLoadConfigMock(fn: () => unknown) { - loadConfigMock = fn; +// Initialize default if not set +if (!(globalThis as Record)[CONFIG_KEY]) { + (globalThis as Record)[CONFIG_KEY] = () => DEFAULT_CONFIG; +} + +export function setLoadConfigMock(fn: (() => unknown) | unknown) { + (globalThis as Record)[CONFIG_KEY] = + typeof fn === "function" ? fn : () => fn; } export function resetLoadConfigMock() { - loadConfigMock = () => ({ - inbound: { - allowFrom: ["*"], - messagePrefix: undefined, - responsePrefix: undefined, - timestampPrefix: false, - }, - }); + (globalThis as Record)[CONFIG_KEY] = () => DEFAULT_CONFIG; } vi.mock("../config/config.js", () => ({ - loadConfig: () => loadConfigMock(), + loadConfig: () => { + const getter = (globalThis as Record)[CONFIG_KEY]; + if (typeof getter === "function") return getter(); + return DEFAULT_CONFIG; + }, })); vi.mock("../media/store.js", () => ({ From 1b0e1edb084cdeaef5852581cc7356122d0b2ade Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 05:59:31 +0000 Subject: [PATCH 3/5] Update changelog with error message and test isolation fixes --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4a655c12..696203b9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### Bug Fixes - **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. - **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`. +- **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses. +- **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files. ### Changes - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. From 2fc3a822c83be14779d3903441d4a661328580e4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 06:15:20 +0000 Subject: [PATCH 4/5] web: isolate session fixtures and skip heartbeat when busy --- CHANGELOG.md | 6 ++ src/web/auto-reply.test.ts | 148 +++++++++++++++++++++++++++---------- src/web/auto-reply.ts | 10 +++ 3 files changed, 123 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 696203b9f..c66c5f794 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,12 @@ ### Changes - **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending. +## Unreleased + +### Changes +- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests. +- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs. + ## 1.2.1 — 2025-11-28 ### Changes diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index ff59e05d9..059c6e570 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -13,7 +13,7 @@ import sharp from "sharp"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { WarelayConfig } from "../config/config.js"; -import { resolveStorePath } from "../config/sessions.js"; +import * as commandQueue from "../process/command-queue.js"; import { resetLogger, setLoggerOverride } from "../logging.js"; import { HEARTBEAT_PROMPT, @@ -26,6 +26,18 @@ import { } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; +const makeSessionStore = async ( + entries: Record = {}, +): Promise<{ storePath: string; cleanup: () => Promise }> => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-")); + const storePath = path.join(dir, "sessions.json"); + await fs.writeFile(storePath, JSON.stringify(entries)); + return { + storePath, + cleanup: () => fs.rm(dir, { recursive: true, force: true }), + }; +}; + describe("heartbeat helpers", () => { it("strips heartbeat token and skips when only token", () => { expect(stripHeartbeatToken(undefined)).toEqual({ @@ -80,19 +92,9 @@ describe("heartbeat helpers", () => { }); describe("resolveHeartbeatRecipients", () => { - const makeStore = async (entries: Record) => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-")); - const storePath = path.join(dir, "sessions.json"); - await fs.writeFile(storePath, JSON.stringify(entries)); - return { - storePath, - cleanup: async () => fs.rm(dir, { recursive: true, force: true }), - }; - }; - it("returns the sole session recipient", async () => { const now = Date.now(); - const store = await makeStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const cfg: WarelayConfig = { inbound: { allowFrom: ["+1999"], @@ -107,7 +109,7 @@ describe("resolveHeartbeatRecipients", () => { it("surfaces ambiguity when multiple sessions exist", async () => { const now = Date.now(); - const store = await makeStore({ + const store = await makeSessionStore({ "+1000": { updatedAt: now }, "+2000": { updatedAt: now - 10 }, }); @@ -124,7 +126,7 @@ describe("resolveHeartbeatRecipients", () => { }); it("filters wildcard allowFrom when no sessions exist", async () => { - const store = await makeStore({}); + const store = await makeSessionStore({}); const cfg: WarelayConfig = { inbound: { allowFrom: ["*"], @@ -139,7 +141,7 @@ describe("resolveHeartbeatRecipients", () => { it("merges sessions and allowFrom when --all is set", async () => { const now = Date.now(); - const store = await makeStore({ "+1000": { updatedAt: now } }); + const store = await makeSessionStore({ "+1000": { updatedAt: now } }); const cfg: WarelayConfig = { inbound: { allowFrom: ["+1999"], @@ -155,12 +157,16 @@ describe("resolveHeartbeatRecipients", () => { describe("runWebHeartbeatOnce", () => { it("skips when heartbeat token returned", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi.fn(); const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN })); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -168,55 +174,58 @@ describe("runWebHeartbeatOnce", () => { }); expect(resolver).toHaveBeenCalled(); expect(sender).not.toHaveBeenCalled(); + await store.cleanup(); }); it("sends when alert text present", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(async () => ({ text: "ALERT" })); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, replyResolver: resolver, }); expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false }); + await store.cleanup(); }); it("falls back to most recent session when no to is provided", async () => { - // Use temp directory to avoid corrupting production sessions.json - const tmpDir = await fs.mkdtemp( - path.join(os.tmpdir(), "warelay-fallback-session-"), - ); - const storePath = path.join(tmpDir, "sessions.json"); + const store = await makeSessionStore(); + const storePath = store.storePath; const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(async () => ({ text: "ALERT" })); - // Seed session store const now = Date.now(); - const store = { + const sessionEntries = { "+1222": { sessionId: "s1", updatedAt: now - 1000 }, "+1333": { sessionId: "s2", updatedAt: now }, }; - await fs.writeFile(storePath, JSON.stringify(store)); - setLoadConfigMock({ - inbound: { - allowFrom: ["+1999"], - reply: { mode: "command", session: { store: storePath } }, - }, - }); + await fs.writeFile(storePath, JSON.stringify(sessionEntries)); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1999"], + reply: { mode: "command", session: { store: storePath } }, + }, + }, to: "+1999", verbose: false, sender, replyResolver: resolver, }); expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false }); + await store.cleanup(); }); it("does not refresh updatedAt when heartbeat is skipped", async () => { @@ -356,14 +365,18 @@ describe("runWebHeartbeatOnce", () => { }); it("sends overrideBody directly and skips resolver", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi .fn() .mockResolvedValue({ messageId: "m1", toJid: "jid" }); const resolver = vi.fn(); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -374,15 +387,20 @@ describe("runWebHeartbeatOnce", () => { verbose: false, }); expect(resolver).not.toHaveBeenCalled(); + await store.cleanup(); }); it("dry-run overrideBody prints and skips send", async () => { + const store = await makeSessionStore(); const sender: typeof sendMessageWeb = vi.fn(); const resolver = vi.fn(); - setLoadConfigMock({ - inbound: { allowFrom: ["+1555"], reply: { mode: "command" } }, - }); await runWebHeartbeatOnce({ + cfg: { + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: store.storePath } }, + }, + }, to: "+1555", verbose: false, sender, @@ -392,6 +410,7 @@ describe("runWebHeartbeatOnce", () => { }); expect(sender).not.toHaveBeenCalled(); expect(resolver).not.toHaveBeenCalled(); + await store.cleanup(); }); }); @@ -507,6 +526,53 @@ describe("web auto-reply", () => { ); }); + it("skips reply heartbeat when requests are running", async () => { + const tmpDir = await fs.mkdtemp( + path.join(os.tmpdir(), "warelay-heartbeat-queue-"), + ); + const storePath = path.join(tmpDir, "sessions.json"); + await fs.writeFile(storePath, JSON.stringify({})); + + const queueSpy = vi + .spyOn(commandQueue, "getQueueSize") + .mockReturnValue(2); + const replyResolver = vi.fn(); + const listenerFactory = vi.fn(async () => { + const onClose = new Promise(() => { + // stay open until aborted + }); + return { close: vi.fn(), onClose }; + }); + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never; + + setLoadConfigMock(() => ({ + inbound: { + allowFrom: ["+1555"], + reply: { mode: "command", session: { store: storePath } }, + }, + })); + + const controller = new AbortController(); + const run = monitorWebProvider( + false, + listenerFactory, + true, + replyResolver, + runtime, + controller.signal, + { replyHeartbeatMinutes: 1, replyHeartbeatNow: true }, + ); + + try { + await Promise.resolve(); + controller.abort(); + await run; + expect(replyResolver).not.toHaveBeenCalled(); + } finally { + queueSpy.mockRestore(); + } + }); + it("falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 858a3f7a8..5fdcbbe10 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -17,6 +17,7 @@ import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; +import { getQueueSize } from "../process/command-queue.js"; import { computeBackoff, newConnectionId, @@ -739,6 +740,15 @@ export async function monitorWebProvider( } const runReplyHeartbeat = async () => { + const queued = getQueueSize(); + if (queued > 0) { + heartbeatLogger.info( + { connectionId, reason: "requests-in-flight", queued }, + "reply heartbeat skipped", + ); + console.log(success("heartbeat: skipped (requests in flight)")); + return; + } if (!replyHeartbeatMinutes) return; const tickStart = Date.now(); if (!lastInboundMsg) { From e86b507da75d29be235aa99e59f7405ec0509dd0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 06:31:01 +0000 Subject: [PATCH 5/5] Add IPC to prevent Signal session corruption from concurrent connections When the relay is running, `warelay send` and `warelay heartbeat` now communicate via Unix socket IPC (~/.warelay/relay.sock) to send messages through the relay's existing WhatsApp connection. Previously, these commands created new Baileys sockets that wrote to the same auth state files, corrupting the Signal session ratchet and causing the relay's subsequent sends to fail silently. Changes: - Add src/web/ipc.ts with Unix socket server/client - Relay starts IPC server after connecting - send command tries IPC first, falls back to direct - heartbeat uses sendWithIpcFallback helper - inbound.ts exposes sendMessage on listener object - Messages sent via IPC are added to echo detection set --- CHANGELOG.md | 2 + src/commands/send.ts | 32 +++++- src/web/auto-reply.ts | 53 +++++++++- src/web/inbound.ts | 45 +++++++++ src/web/ipc.ts | 225 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 354 insertions(+), 3 deletions(-) create mode 100644 src/web/ipc.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c66c5f794..73f6caaa1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,10 @@ - **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`. - **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses. - **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files. +- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently. ### Changes +- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise. - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. - **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter. - **Test isolation improvements:** Mock `loadConfig()` in all test files to prevent loading real user config (with emojis/prefixes) during tests. Default test config now has no prefixes/timestamps for cleaner assertions. diff --git a/src/commands/send.ts b/src/commands/send.ts index 193c407cf..45b770f74 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -1,7 +1,8 @@ import type { CliDeps } from "../cli/deps.js"; -import { info } from "../globals.js"; +import { info, success } from "../globals.js"; import type { RuntimeEnv } from "../runtime.js"; import type { Provider } from "../utils.js"; +import { sendViaIpc } from "../web/ipc.js"; export async function sendCommand( opts: { @@ -39,6 +40,34 @@ export async function sendCommand( if (waitSeconds !== 0) { runtime.log(info("Wait/poll are Twilio-only; ignored for provider=web.")); } + + // Try to send via IPC to running relay first (avoids Signal session corruption) + const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media); + if (ipcResult) { + if (ipcResult.success) { + runtime.log(success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`)); + if (opts.json) { + runtime.log( + JSON.stringify( + { + provider: "web", + via: "ipc", + to: opts.to, + messageId: ipcResult.messageId, + mediaUrl: opts.media ?? null, + }, + null, + 2, + ), + ); + } + return; + } + // IPC failed but relay is running - warn and fall back + runtime.log(info(`IPC send failed (${ipcResult.error}), falling back to direct connection`)); + } + + // Fall back to direct connection (creates new Baileys socket) const res = await deps .sendMessageWeb(opts.to, opts.message, { verbose: false, @@ -53,6 +82,7 @@ export async function sendCommand( JSON.stringify( { provider: "web", + via: "direct", to: opts.to, messageId: res.messageId, mediaUrl: opts.media ?? null, diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 5fdcbbe10..9020a21fd 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -9,12 +9,13 @@ import { resolveStorePath, saveSessionStore, } from "../config/sessions.js"; -import { danger, isVerbose, logVerbose, success } from "../globals.js"; +import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; +import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; import { getQueueSize } from "../process/command-queue.js"; @@ -28,6 +29,26 @@ import { } from "./reconnect.js"; import { getWebAuthAgeMs } from "./session.js"; +/** + * Send a message via IPC if relay is running, otherwise fall back to direct. + * This avoids Signal session corruption from multiple Baileys connections. + */ +async function sendWithIpcFallback( + to: string, + message: string, + opts: { verbose: boolean; mediaUrl?: string }, +): Promise<{ messageId: string; toJid: string }> { + const ipcResult = await sendViaIpc(to, message, opts.mediaUrl); + if (ipcResult?.success && ipcResult.messageId) { + if (opts.verbose) { + console.log(info(`Sent via relay IPC (avoiding session corruption)`)); + } + return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` }; + } + // Fall back to direct send + return sendMessageWeb(to, message, opts); +} + const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024; type WebInboundMsg = Parameters< typeof monitorWebInbox @@ -95,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: { } = opts; const _runtime = opts.runtime ?? defaultRuntime; const replyResolver = opts.replyResolver ?? getReplyFromConfig; - const sender = opts.sender ?? sendMessageWeb; + const sender = opts.sender ?? sendWithIpcFallback; const runId = newConnectionId(); const heartbeatLogger = getChildLogger({ module: "web-heartbeat", @@ -526,6 +547,8 @@ export async function monitorWebProvider( const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { + // Also add IPC-sent messages to echo detection + // (this is handled below in the IPC sendHandler) handledMessages += 1; lastMessageAt = Date.now(); const ts = msg.timestamp @@ -677,7 +700,33 @@ export async function monitorWebProvider( }, }); + // Start IPC server so `warelay send` can use this connection + // instead of creating a new one (which would corrupt Signal session) + if ("sendMessage" in listener) { + startIpcServer(async (to, message, mediaUrl) => { + let mediaBuffer: Buffer | undefined; + let mediaType: string | undefined; + if (mediaUrl) { + const media = await loadWebMedia(mediaUrl); + mediaBuffer = media.buffer; + mediaType = media.contentType; + } + const result = await listener.sendMessage(to, message, mediaBuffer, mediaType); + // Add to echo detection so we don't process our own message + if (message) { + recentlySent.add(message); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + logInfo(`📤 IPC send to ${to}: ${message.substring(0, 50)}...`, runtime); + return result; + }); + } + const closeListener = async () => { + stopIpcServer(); if (heartbeat) clearInterval(heartbeat); if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer); if (watchdogTimer) clearInterval(watchdogTimer); diff --git a/src/web/inbound.ts b/src/web/inbound.ts index 209824229..bafcac10d 100644 --- a/src/web/inbound.ts +++ b/src/web/inbound.ts @@ -212,6 +212,51 @@ export async function monitorWebInbox(options: { } }, onClose, + /** + * Send a message through this connection's socket. + * Used by IPC to avoid creating new connections. + */ + sendMessage: async ( + to: string, + text: string, + mediaBuffer?: Buffer, + mediaType?: string, + ): Promise<{ messageId: string }> => { + const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`; + let payload: AnyMessageContent; + if (mediaBuffer && mediaType) { + if (mediaType.startsWith("image/")) { + payload = { + image: mediaBuffer, + caption: text || undefined, + mimetype: mediaType, + }; + } else if (mediaType.startsWith("audio/")) { + payload = { + audio: mediaBuffer, + ptt: true, + mimetype: mediaType, + }; + } else if (mediaType.startsWith("video/")) { + payload = { + video: mediaBuffer, + caption: text || undefined, + mimetype: mediaType, + }; + } else { + payload = { + document: mediaBuffer, + fileName: "file", + caption: text || undefined, + mimetype: mediaType, + }; + } + } else { + payload = { text }; + } + const result = await sock.sendMessage(jid, payload); + return { messageId: result?.key?.id ?? "unknown" }; + }, } as const; } diff --git a/src/web/ipc.ts b/src/web/ipc.ts new file mode 100644 index 000000000..99e09e0f5 --- /dev/null +++ b/src/web/ipc.ts @@ -0,0 +1,225 @@ +/** + * IPC server for warelay relay. + * + * When the relay is running, it starts a Unix socket server that allows + * `warelay send` and `warelay heartbeat` to send messages through the + * existing WhatsApp connection instead of creating new ones. + * + * This prevents Signal session ratchet corruption from multiple connections. + */ + +import fs from "node:fs"; +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; + +import { getChildLogger } from "../logging.js"; + +const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock"); + +export interface IpcSendRequest { + type: "send"; + to: string; + message: string; + mediaUrl?: string; +} + +export interface IpcSendResponse { + success: boolean; + messageId?: string; + error?: string; +} + +type SendHandler = ( + to: string, + message: string, + mediaUrl?: string, +) => Promise<{ messageId: string }>; + +let server: net.Server | null = null; + +/** + * Start the IPC server. Called by the relay when it starts. + */ +export function startIpcServer(sendHandler: SendHandler): void { + const logger = getChildLogger({ module: "ipc-server" }); + + // Clean up stale socket file + try { + fs.unlinkSync(SOCKET_PATH); + } catch { + // Ignore if doesn't exist + } + + server = net.createServer((conn) => { + let buffer = ""; + + conn.on("data", async (data) => { + buffer += data.toString(); + + // Try to parse complete JSON messages (newline-delimited) + const lines = buffer.split("\n"); + buffer = lines.pop() ?? ""; // Keep incomplete line in buffer + + for (const line of lines) { + if (!line.trim()) continue; + + try { + const request = JSON.parse(line) as IpcSendRequest; + + if (request.type === "send") { + try { + const result = await sendHandler( + request.to, + request.message, + request.mediaUrl, + ); + const response: IpcSendResponse = { + success: true, + messageId: result.messageId, + }; + conn.write(JSON.stringify(response) + "\n"); + } catch (err) { + const response: IpcSendResponse = { + success: false, + error: String(err), + }; + conn.write(JSON.stringify(response) + "\n"); + } + } + } catch (err) { + logger.warn({ error: String(err) }, "failed to parse IPC request"); + const response: IpcSendResponse = { + success: false, + error: "Invalid request format", + }; + conn.write(JSON.stringify(response) + "\n"); + } + } + }); + + conn.on("error", (err) => { + logger.debug({ error: String(err) }, "IPC connection error"); + }); + }); + + server.listen(SOCKET_PATH, () => { + logger.info({ socketPath: SOCKET_PATH }, "IPC server started"); + // Make socket accessible + fs.chmodSync(SOCKET_PATH, 0o600); + }); + + server.on("error", (err) => { + logger.error({ error: String(err) }, "IPC server error"); + }); +} + +/** + * Stop the IPC server. Called when relay shuts down. + */ +export function stopIpcServer(): void { + if (server) { + server.close(); + server = null; + } + try { + fs.unlinkSync(SOCKET_PATH); + } catch { + // Ignore + } +} + +/** + * Check if the relay IPC server is running. + */ +export function isRelayRunning(): boolean { + try { + fs.accessSync(SOCKET_PATH); + return true; + } catch { + return false; + } +} + +/** + * Send a message through the running relay's IPC. + * Returns null if relay is not running. + */ +export async function sendViaIpc( + to: string, + message: string, + mediaUrl?: string, +): Promise { + if (!isRelayRunning()) { + return null; + } + + return new Promise((resolve) => { + const client = net.createConnection(SOCKET_PATH); + let buffer = ""; + let resolved = false; + + const timeout = setTimeout(() => { + if (!resolved) { + resolved = true; + client.destroy(); + resolve({ success: false, error: "IPC timeout" }); + } + }, 30000); // 30 second timeout + + client.on("connect", () => { + const request: IpcSendRequest = { + type: "send", + to, + message, + mediaUrl, + }; + client.write(JSON.stringify(request) + "\n"); + }); + + client.on("data", (data) => { + buffer += data.toString(); + const lines = buffer.split("\n"); + + for (const line of lines) { + if (!line.trim()) continue; + try { + const response = JSON.parse(line) as IpcSendResponse; + if (!resolved) { + resolved = true; + clearTimeout(timeout); + client.end(); + resolve(response); + } + return; + } catch { + // Keep reading + } + } + }); + + client.on("error", (err) => { + if (!resolved) { + resolved = true; + clearTimeout(timeout); + // Socket exists but can't connect - relay might have crashed + resolve(null); + } + }); + + client.on("close", () => { + if (!resolved) { + resolved = true; + clearTimeout(timeout); + resolve({ success: false, error: "Connection closed" }); + } + }); + }); +} + +/** + * Get the IPC socket path for debugging/status. + */ +export function getSocketPath(): string { + return SOCKET_PATH; +}