Merge branch 'steipete:main' into main

This commit is contained in:
shuv 2025-12-01 22:38:48 -08:00 committed by GitHub
commit bd9601905d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 898 additions and 78 deletions

View File

@ -2,7 +2,15 @@
## 1.2.3 — Unreleased
### Bug Fixes
- **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`.
- **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`.
- **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses.
- **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files.
- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently.
### Changes
- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise.
- **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives.
- **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter.
- **Test isolation improvements:** Mock `loadConfig()` in all test files to prevent loading real user config (with emojis/prefixes) during tests. Default test config now has no prefixes/timestamps for cleaner assertions.
@ -16,6 +24,12 @@
### Changes
- **Manual heartbeat sends:** `warelay heartbeat` accepts `--message/--body` with `--provider web|twilio` to push real outbound messages through the same plumbing; `--dry-run` previews payloads without sending.
## Unreleased
### Changes
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.
## 1.2.1 — 2025-11-28
### Changes

View File

@ -292,4 +292,79 @@ describe("runCommandReply", () => {
expect(meta.queuedMs).toBe(25);
expect(meta.queuedAhead).toBe(2);
});
it("handles empty result string without dumping raw JSON", async () => {
// Bug fix: Claude CLI returning {"result": ""} should not send raw JSON to WhatsApp
// The fix changed from truthy check to explicit typeof check
const runner = makeRunner({
stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
// Should NOT contain raw JSON - empty result should produce fallback message
expect(payload?.text).not.toContain('{"result"');
expect(payload?.text).toContain("command produced no output");
});
it("handles empty text string in Claude JSON", async () => {
const runner = makeRunner({
stdout: '{"text":"","duration_ms":50}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
// Empty text should produce fallback message, not raw JSON
expect(payload?.text).not.toContain('{"text"');
expect(payload?.text).toContain("command produced no output");
});
it("returns actual text when result is non-empty", async () => {
const runner = makeRunner({
stdout: '{"result":"hello world","duration_ms":50}',
});
const { payload } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
claudeOutputFormat: "json",
},
templatingCtx: noopTemplateCtx,
sendSystemOnce: false,
isNewSession: true,
isFirstTurnInSession: true,
systemSent: false,
timeoutMs: 1000,
timeoutSeconds: 1,
commandRunner: runner,
enqueue: enqueueImmediate,
});
expect(payload?.text).toBe("hello world");
});
});

View File

@ -230,7 +230,7 @@ export async function runCommandReply(
`Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`,
);
}
if (parsed?.text) {
if (typeof parsed?.text === "string") {
logVerbose(
`Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`,
);
@ -259,8 +259,11 @@ export async function runCommandReply(
console.error(
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
);
// Include any partial output or stderr in error message
const partialOut = trimmed ? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` : "";
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: Date.now() - started,
queuedMs,
@ -278,8 +281,9 @@ export async function runCommandReply(
console.error(
`Command auto-reply process killed before completion (exit code ${code ?? "unknown"})`,
);
const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: Date.now() - started,
queuedMs,
@ -379,8 +383,11 @@ export async function runCommandReply(
};
}
logError(`Command auto-reply failed after ${elapsed}ms: ${String(err)}`);
// Send error message to user so they know the command failed
const errMsg = err instanceof Error ? err.message : String(err);
const errorText = `⚠️ Command failed: ${errMsg}`;
return {
payload: undefined,
payload: { text: errorText },
meta: {
durationMs: elapsed,
queuedMs,

View File

@ -1,7 +1,8 @@
import type { CliDeps } from "../cli/deps.js";
import { info } from "../globals.js";
import { info, success } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import type { Provider } from "../utils.js";
import { sendViaIpc } from "../web/ipc.js";
export async function sendCommand(
opts: {
@ -39,6 +40,34 @@ export async function sendCommand(
if (waitSeconds !== 0) {
runtime.log(info("Wait/poll are Twilio-only; ignored for provider=web."));
}
// Try to send via IPC to running relay first (avoids Signal session corruption)
const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media);
if (ipcResult) {
if (ipcResult.success) {
runtime.log(success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`));
if (opts.json) {
runtime.log(
JSON.stringify(
{
provider: "web",
via: "ipc",
to: opts.to,
messageId: ipcResult.messageId,
mediaUrl: opts.media ?? null,
},
null,
2,
),
);
}
return;
}
// IPC failed but relay is running - warn and fall back
runtime.log(info(`IPC send failed (${ipcResult.error}), falling back to direct connection`));
}
// Fall back to direct connection (creates new Baileys socket)
const res = await deps
.sendMessageWeb(opts.to, opts.message, {
verbose: false,
@ -53,6 +82,7 @@ export async function sendCommand(
JSON.stringify(
{
provider: "web",
via: "direct",
to: opts.to,
messageId: res.messageId,
mediaUrl: opts.media ?? null,

View File

@ -1,3 +1,10 @@
// Import test-helpers FIRST to set up mocks before other imports
import {
resetBaileysMocks,
resetLoadConfigMock,
setLoadConfigMock,
} from "./test-helpers.js";
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
@ -6,7 +13,7 @@ import sharp from "sharp";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import { resolveStorePath } from "../config/sessions.js";
import * as commandQueue from "../process/command-queue.js";
import { resetLogger, setLoggerOverride } from "../logging.js";
import {
HEARTBEAT_PROMPT,
@ -18,11 +25,18 @@ import {
stripHeartbeatToken,
} from "./auto-reply.js";
import type { sendMessageWeb } from "./outbound.js";
import {
resetBaileysMocks,
resetLoadConfigMock,
setLoadConfigMock,
} from "./test-helpers.js";
const makeSessionStore = async (
entries: Record<string, unknown> = {},
): Promise<{ storePath: string; cleanup: () => Promise<void> }> => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-session-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: () => fs.rm(dir, { recursive: true, force: true }),
};
};
describe("heartbeat helpers", () => {
it("strips heartbeat token and skips when only token", () => {
@ -78,19 +92,9 @@ describe("heartbeat helpers", () => {
});
describe("resolveHeartbeatRecipients", () => {
const makeStore = async (entries: Record<string, { updatedAt: number }>) => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "warelay-heartbeat-"));
const storePath = path.join(dir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify(entries));
return {
storePath,
cleanup: async () => fs.rm(dir, { recursive: true, force: true }),
};
};
it("returns the sole session recipient", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@ -105,7 +109,7 @@ describe("resolveHeartbeatRecipients", () => {
it("surfaces ambiguity when multiple sessions exist", async () => {
const now = Date.now();
const store = await makeStore({
const store = await makeSessionStore({
"+1000": { updatedAt: now },
"+2000": { updatedAt: now - 10 },
});
@ -122,7 +126,7 @@ describe("resolveHeartbeatRecipients", () => {
});
it("filters wildcard allowFrom when no sessions exist", async () => {
const store = await makeStore({});
const store = await makeSessionStore({});
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["*"],
@ -137,7 +141,7 @@ describe("resolveHeartbeatRecipients", () => {
it("merges sessions and allowFrom when --all is set", async () => {
const now = Date.now();
const store = await makeStore({ "+1000": { updatedAt: now } });
const store = await makeSessionStore({ "+1000": { updatedAt: now } });
const cfg: WarelayConfig = {
inbound: {
allowFrom: ["+1999"],
@ -153,12 +157,16 @@ describe("resolveHeartbeatRecipients", () => {
describe("runWebHeartbeatOnce", () => {
it("skips when heartbeat token returned", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn(async () => ({ text: HEARTBEAT_TOKEN }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@ -166,54 +174,58 @@ describe("runWebHeartbeatOnce", () => {
});
expect(resolver).toHaveBeenCalled();
expect(sender).not.toHaveBeenCalled();
await store.cleanup();
});
it("sends when alert text present", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1555", "ALERT", { verbose: false });
await store.cleanup();
});
it("falls back to most recent session when no to is provided", async () => {
const store = await makeSessionStore();
const storePath = store.storePath;
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn(async () => ({ text: "ALERT" }));
// Seed session store
const now = Date.now();
const store = {
const sessionEntries = {
"+1222": { sessionId: "s1", updatedAt: now - 1000 },
"+1333": { sessionId: "s2", updatedAt: now },
};
const storePath = resolveStorePath();
await fs.mkdir(resolveStorePath().replace("sessions.json", ""), {
recursive: true,
});
await fs.writeFile(storePath, JSON.stringify(store));
setLoadConfigMock({
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: {} },
},
});
await fs.writeFile(storePath, JSON.stringify(sessionEntries));
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1999"],
reply: { mode: "command", session: { store: storePath } },
},
},
to: "+1999",
verbose: false,
sender,
replyResolver: resolver,
});
expect(sender).toHaveBeenCalledWith("+1999", "ALERT", { verbose: false });
await store.cleanup();
});
it("does not refresh updatedAt when heartbeat is skipped", async () => {
@ -353,14 +365,18 @@ describe("runWebHeartbeatOnce", () => {
});
it("sends overrideBody directly and skips resolver", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi
.fn()
.mockResolvedValue({ messageId: "m1", toJid: "jid" });
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@ -371,15 +387,20 @@ describe("runWebHeartbeatOnce", () => {
verbose: false,
});
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
it("dry-run overrideBody prints and skips send", async () => {
const store = await makeSessionStore();
const sender: typeof sendMessageWeb = vi.fn();
const resolver = vi.fn();
setLoadConfigMock({
inbound: { allowFrom: ["+1555"], reply: { mode: "command" } },
});
await runWebHeartbeatOnce({
cfg: {
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: store.storePath } },
},
},
to: "+1555",
verbose: false,
sender,
@ -389,6 +410,7 @@ describe("runWebHeartbeatOnce", () => {
});
expect(sender).not.toHaveBeenCalled();
expect(resolver).not.toHaveBeenCalled();
await store.cleanup();
});
});
@ -504,6 +526,53 @@ describe("web auto-reply", () => {
);
});
it("skips reply heartbeat when requests are running", async () => {
const tmpDir = await fs.mkdtemp(
path.join(os.tmpdir(), "warelay-heartbeat-queue-"),
);
const storePath = path.join(tmpDir, "sessions.json");
await fs.writeFile(storePath, JSON.stringify({}));
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockReturnValue(2);
const replyResolver = vi.fn();
const listenerFactory = vi.fn(async () => {
const onClose = new Promise<void>(() => {
// stay open until aborted
});
return { close: vi.fn(), onClose };
});
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() } as never;
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["+1555"],
reply: { mode: "command", session: { store: storePath } },
},
}));
const controller = new AbortController();
const run = monitorWebProvider(
false,
listenerFactory,
true,
replyResolver,
runtime,
controller.signal,
{ replyHeartbeatMinutes: 1, replyHeartbeatNow: true },
);
try {
await Promise.resolve();
controller.abort();
await run;
expect(replyResolver).not.toHaveBeenCalled();
} finally {
queueSpy.mockRestore();
}
});
it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined);
@ -947,6 +1016,16 @@ describe("web auto-reply", () => {
});
it("prefixes body with same-phone marker when from === to", async () => {
// Enable messagePrefix for same-phone mode testing
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: "[same-phone]",
responsePrefix: undefined,
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
@ -974,12 +1053,11 @@ describe("web auto-reply", () => {
sendMedia: vi.fn(),
});
// The resolver should receive a prefixed body (the exact marker depends on config)
// Key test: body should start with some marker and end with original message
// The resolver should receive a prefixed body with the configured marker
const callArg = resolver.mock.calls[0]?.[0] as { Body?: string };
expect(callArg?.Body).toBeDefined();
expect(callArg?.Body).toMatch(/^\[.*\] hello$/);
expect(callArg?.Body).not.toBe("hello"); // Should be prefixed
expect(callArg?.Body).toBe("[same-phone] hello");
resetLoadConfigMock();
});
it("does not prefix body when from !== to", async () => {
@ -1019,4 +1097,135 @@ describe("web auto-reply", () => {
expect(callArg?.Body).toBeDefined();
expect(callArg?.Body).toMatch(/hello$/);
});
it("applies responsePrefix to regular replies", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
const resolver = vi.fn().mockResolvedValue({ text: "hello there" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "hi",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// Reply should have responsePrefix prepended
expect(reply).toHaveBeenCalledWith("🦞 hello there");
resetLoadConfigMock();
});
it("skips responsePrefix for HEARTBEAT_OK responses", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Resolver returns exact HEARTBEAT_OK
const resolver = vi.fn().mockResolvedValue({ text: HEARTBEAT_TOKEN });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "test",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// HEARTBEAT_OK should NOT have prefix - warelay needs exact match
expect(reply).toHaveBeenCalledWith(HEARTBEAT_TOKEN);
resetLoadConfigMock();
});
it("does not double-prefix if responsePrefix already present", async () => {
setLoadConfigMock(() => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: "🦞",
timestampPrefix: false,
},
}));
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const reply = vi.fn();
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Resolver returns text that already has prefix
const resolver = vi.fn().mockResolvedValue({ text: "🦞 already prefixed" });
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
await capturedOnMessage?.({
body: "test",
from: "+1555",
to: "+2666",
id: "msg1",
sendComposing: vi.fn(),
reply,
sendMedia: vi.fn(),
});
// Should not double-prefix
expect(reply).toHaveBeenCalledWith("🦞 already prefixed");
resetLoadConfigMock();
});
});

View File

@ -9,14 +9,16 @@ import {
resolveStorePath,
saveSessionStore,
} from "../config/sessions.js";
import { danger, isVerbose, logVerbose, success } from "../globals.js";
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js";
import { loadWebMedia } from "./media.js";
import { sendMessageWeb } from "./outbound.js";
import { getQueueSize } from "../process/command-queue.js";
import {
computeBackoff,
newConnectionId,
@ -27,6 +29,26 @@ import {
} from "./reconnect.js";
import { getWebAuthAgeMs } from "./session.js";
/**
* Send a message via IPC if relay is running, otherwise fall back to direct.
* This avoids Signal session corruption from multiple Baileys connections.
*/
async function sendWithIpcFallback(
to: string,
message: string,
opts: { verbose: boolean; mediaUrl?: string },
): Promise<{ messageId: string; toJid: string }> {
const ipcResult = await sendViaIpc(to, message, opts.mediaUrl);
if (ipcResult?.success && ipcResult.messageId) {
if (opts.verbose) {
console.log(info(`Sent via relay IPC (avoiding session corruption)`));
}
return { messageId: ipcResult.messageId, toJid: `${to}@s.whatsapp.net` };
}
// Fall back to direct send
return sendMessageWeb(to, message, opts);
}
const DEFAULT_WEB_MEDIA_BYTES = 5 * 1024 * 1024;
type WebInboundMsg = Parameters<
typeof monitorWebInbox
@ -94,7 +116,7 @@ export async function runWebHeartbeatOnce(opts: {
} = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const sender = opts.sender ?? sendMessageWeb;
const sender = opts.sender ?? sendWithIpcFallback;
const runId = newConnectionId();
const heartbeatLogger = getChildLogger({
module: "web-heartbeat",
@ -525,6 +547,8 @@ export async function monitorWebProvider(
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
// Also add IPC-sent messages to echo detection
// (this is handled below in the IPC sendHandler)
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
@ -676,7 +700,33 @@ export async function monitorWebProvider(
},
});
// Start IPC server so `warelay send` can use this connection
// instead of creating a new one (which would corrupt Signal session)
if ("sendMessage" in listener) {
startIpcServer(async (to, message, mediaUrl) => {
let mediaBuffer: Buffer | undefined;
let mediaType: string | undefined;
if (mediaUrl) {
const media = await loadWebMedia(mediaUrl);
mediaBuffer = media.buffer;
mediaType = media.contentType;
}
const result = await listener.sendMessage(to, message, mediaBuffer, mediaType);
// Add to echo detection so we don't process our own message
if (message) {
recentlySent.add(message);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
logInfo(`📤 IPC send to ${to}: ${message.substring(0, 50)}...`, runtime);
return result;
});
}
const closeListener = async () => {
stopIpcServer();
if (heartbeat) clearInterval(heartbeat);
if (replyHeartbeatTimer) clearInterval(replyHeartbeatTimer);
if (watchdogTimer) clearInterval(watchdogTimer);
@ -739,6 +789,15 @@ export async function monitorWebProvider(
}
const runReplyHeartbeat = async () => {
const queued = getQueueSize();
if (queued > 0) {
heartbeatLogger.info(
{ connectionId, reason: "requests-in-flight", queued },
"reply heartbeat skipped",
);
console.log(success("heartbeat: skipped (requests in flight)"));
return;
}
if (!replyHeartbeatMinutes) return;
const tickStart = Date.now();
if (!lastInboundMsg) {
@ -862,9 +921,16 @@ export async function monitorWebProvider(
return;
}
// Apply response prefix if configured (same as regular messages)
let finalText = stripped.text;
const responsePrefix = cfg.inbound?.responsePrefix;
if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) {
finalText = `${responsePrefix} ${finalText}`;
}
const cleanedReply: ReplyPayload = {
...replyResult,
text: stripped.text,
text: finalText,
};
await deliverWebReply({

View File

@ -212,6 +212,51 @@ export async function monitorWebInbox(options: {
}
},
onClose,
/**
* Send a message through this connection's socket.
* Used by IPC to avoid creating new connections.
*/
sendMessage: async (
to: string,
text: string,
mediaBuffer?: Buffer,
mediaType?: string,
): Promise<{ messageId: string }> => {
const jid = `${to.replace(/^\+/, "")}@s.whatsapp.net`;
let payload: AnyMessageContent;
if (mediaBuffer && mediaType) {
if (mediaType.startsWith("image/")) {
payload = {
image: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else if (mediaType.startsWith("audio/")) {
payload = {
audio: mediaBuffer,
ptt: true,
mimetype: mediaType,
};
} else if (mediaType.startsWith("video/")) {
payload = {
video: mediaBuffer,
caption: text || undefined,
mimetype: mediaType,
};
} else {
payload = {
document: mediaBuffer,
fileName: "file",
caption: text || undefined,
mimetype: mediaType,
};
}
} else {
payload = { text };
}
const result = await sock.sendMessage(jid, payload);
return { messageId: result?.key?.id ?? "unknown" };
},
} as const;
}

225
src/web/ipc.ts Normal file
View File

@ -0,0 +1,225 @@
/**
* IPC server for warelay relay.
*
* When the relay is running, it starts a Unix socket server that allows
* `warelay send` and `warelay heartbeat` to send messages through the
* existing WhatsApp connection instead of creating new ones.
*
* This prevents Signal session ratchet corruption from multiple connections.
*/
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
import path from "node:path";
import { getChildLogger } from "../logging.js";
const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock");
export interface IpcSendRequest {
type: "send";
to: string;
message: string;
mediaUrl?: string;
}
export interface IpcSendResponse {
success: boolean;
messageId?: string;
error?: string;
}
type SendHandler = (
to: string,
message: string,
mediaUrl?: string,
) => Promise<{ messageId: string }>;
let server: net.Server | null = null;
/**
* Start the IPC server. Called by the relay when it starts.
*/
export function startIpcServer(sendHandler: SendHandler): void {
const logger = getChildLogger({ module: "ipc-server" });
// Clean up stale socket file
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore if doesn't exist
}
server = net.createServer((conn) => {
let buffer = "";
conn.on("data", async (data) => {
buffer += data.toString();
// Try to parse complete JSON messages (newline-delimited)
const lines = buffer.split("\n");
buffer = lines.pop() ?? ""; // Keep incomplete line in buffer
for (const line of lines) {
if (!line.trim()) continue;
try {
const request = JSON.parse(line) as IpcSendRequest;
if (request.type === "send") {
try {
const result = await sendHandler(
request.to,
request.message,
request.mediaUrl,
);
const response: IpcSendResponse = {
success: true,
messageId: result.messageId,
};
conn.write(JSON.stringify(response) + "\n");
} catch (err) {
const response: IpcSendResponse = {
success: false,
error: String(err),
};
conn.write(JSON.stringify(response) + "\n");
}
}
} catch (err) {
logger.warn({ error: String(err) }, "failed to parse IPC request");
const response: IpcSendResponse = {
success: false,
error: "Invalid request format",
};
conn.write(JSON.stringify(response) + "\n");
}
}
});
conn.on("error", (err) => {
logger.debug({ error: String(err) }, "IPC connection error");
});
});
server.listen(SOCKET_PATH, () => {
logger.info({ socketPath: SOCKET_PATH }, "IPC server started");
// Make socket accessible
fs.chmodSync(SOCKET_PATH, 0o600);
});
server.on("error", (err) => {
logger.error({ error: String(err) }, "IPC server error");
});
}
/**
* Stop the IPC server. Called when relay shuts down.
*/
export function stopIpcServer(): void {
if (server) {
server.close();
server = null;
}
try {
fs.unlinkSync(SOCKET_PATH);
} catch {
// Ignore
}
}
/**
* Check if the relay IPC server is running.
*/
export function isRelayRunning(): boolean {
try {
fs.accessSync(SOCKET_PATH);
return true;
} catch {
return false;
}
}
/**
* Send a message through the running relay's IPC.
* Returns null if relay is not running.
*/
export async function sendViaIpc(
to: string,
message: string,
mediaUrl?: string,
): Promise<IpcSendResponse | null> {
if (!isRelayRunning()) {
return null;
}
return new Promise((resolve) => {
const client = net.createConnection(SOCKET_PATH);
let buffer = "";
let resolved = false;
const timeout = setTimeout(() => {
if (!resolved) {
resolved = true;
client.destroy();
resolve({ success: false, error: "IPC timeout" });
}
}, 30000); // 30 second timeout
client.on("connect", () => {
const request: IpcSendRequest = {
type: "send",
to,
message,
mediaUrl,
};
client.write(JSON.stringify(request) + "\n");
});
client.on("data", (data) => {
buffer += data.toString();
const lines = buffer.split("\n");
for (const line of lines) {
if (!line.trim()) continue;
try {
const response = JSON.parse(line) as IpcSendResponse;
if (!resolved) {
resolved = true;
clearTimeout(timeout);
client.end();
resolve(response);
}
return;
} catch {
// Keep reading
}
}
});
client.on("error", (err) => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
// Socket exists but can't connect - relay might have crashed
resolve(null);
}
});
client.on("close", () => {
if (!resolved) {
resolved = true;
clearTimeout(timeout);
resolve({ success: false, error: "Connection closed" });
}
});
});
}
/**
* Get the IPC socket path for debugging/status.
*/
export function getSocketPath(): string {
return SOCKET_PATH;
}

View File

@ -9,15 +9,17 @@ vi.mock("../media/store.js", () => ({
}),
}));
const mockLoadConfig = vi.fn().mockReturnValue({
inbound: {
allowFrom: ["*"], // Allow all in tests
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn().mockReturnValue({
inbound: {
allowFrom: ["*"], // Allow all in tests
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
}),
loadConfig: () => mockLoadConfig(),
}));
vi.mock("./session.js", () => {
@ -227,4 +229,146 @@ describe("web monitor inbox", () => {
]);
await listener.close();
});
it("blocks messages from unauthorized senders not in allowFrom", async () => {
// Test for auto-recovery fix: early allowFrom filtering prevents Bad MAC errors
// from unauthorized senders corrupting sessions
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111"], // Only allow +111
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
// Message from unauthorized sender +999 (not in allowFrom)
const upsert = {
type: "notify",
messages: [
{
key: { id: "unauth1", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "unauthorized message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should NOT call onMessage for unauthorized senders
expect(onMessage).not.toHaveBeenCalled();
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
it("allows messages from senders in allowFrom list", async () => {
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111", "+999"], // Allow +999
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
const upsert = {
type: "notify",
messages: [
{
key: { id: "auth1", fromMe: false, remoteJid: "999@s.whatsapp.net" },
message: { conversation: "authorized message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should call onMessage for authorized senders
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "authorized message", from: "+999" }),
);
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
it("allows same-phone messages even if not in allowFrom", async () => {
// Same-phone mode: when from === selfJid, should always be allowed
// This allows users to message themselves even with restrictive allowFrom
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["+111"], // Only allow +111, but self is +123
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
const onMessage = vi.fn();
const listener = await monitorWebInbox({ verbose: false, onMessage });
const sock = await createWaSocket();
// Message from self (sock.user.id is "123@s.whatsapp.net" in mock)
const upsert = {
type: "notify",
messages: [
{
key: { id: "self1", fromMe: false, remoteJid: "123@s.whatsapp.net" },
message: { conversation: "self message" },
messageTimestamp: 1_700_000_000,
},
],
};
sock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setImmediate(resolve));
// Should allow self-messages even if not in allowFrom
expect(onMessage).toHaveBeenCalledWith(
expect.objectContaining({ body: "self message", from: "+123" }),
);
// Reset mock for other tests
mockLoadConfig.mockReturnValue({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
await listener.close();
});
});

View File

@ -3,32 +3,37 @@ import { vi } from "vitest";
import type { MockBaileysSocket } from "../../test/mocks/baileys.js";
import { createMockBaileys } from "../../test/mocks/baileys.js";
let loadConfigMock: () => unknown = () => ({
// Use globalThis to store the mock config so it survives vi.mock hoisting
const CONFIG_KEY = Symbol.for("warelay:testConfigMock");
const DEFAULT_CONFIG = {
inbound: {
allowFrom: ["*"], // Allow all in tests by default
messagePrefix: undefined, // No message prefix in tests
responsePrefix: undefined, // No response prefix in tests
timestampPrefix: false, // No timestamp in tests
},
});
};
export function setLoadConfigMock(fn: () => unknown) {
loadConfigMock = fn;
// Initialize default if not set
if (!(globalThis as Record<symbol, unknown>)[CONFIG_KEY]) {
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
}
export function setLoadConfigMock(fn: (() => unknown) | unknown) {
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] =
typeof fn === "function" ? fn : () => fn;
}
export function resetLoadConfigMock() {
loadConfigMock = () => ({
inbound: {
allowFrom: ["*"],
messagePrefix: undefined,
responsePrefix: undefined,
timestampPrefix: false,
},
});
(globalThis as Record<symbol, unknown>)[CONFIG_KEY] = () => DEFAULT_CONFIG;
}
vi.mock("../config/config.js", () => ({
loadConfig: () => loadConfigMock(),
loadConfig: () => {
const getter = (globalThis as Record<symbol, unknown>)[CONFIG_KEY];
if (typeof getter === "function") return getter();
return DEFAULT_CONFIG;
},
}));
vi.mock("../media/store.js", () => ({