Merge upstream/main: v1.3.0 with batching, agents, security fixes

Merged 11 commits from upstream/main:
- feat(web): batch inbound messages
- Agents: add pluggable CLIs (claude, codex, opencode, pi)
- chore: bump version to 1.3.0
- chore(security): harden ipc socket
- chore(security): purge session store on logout
- chore(logs): rotate daily and prune after 24h
- Fix path traversal vulnerability in media server
- fix(media): block symlink traversal

Conflict resolution:
- package.json: took upstream version 1.3.0
- src/web/auto-reply.ts: merged batching with heartbeat pre-hook logic
- src/web/auto-reply.test.ts: kept upstream import statement
This commit is contained in:
Kyle Crommett 2025-12-02 11:46:43 -08:00
commit 685952a298
29 changed files with 1302 additions and 295 deletions

View File

@ -1,6 +1,19 @@
# Changelog # Changelog
## 1.2.3 — Unreleased ## Unreleased
### Security
- Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior.
- `warelay logout` now also prunes the shared session store (`~/.warelay/sessions.json`) alongside WhatsApp Web credentials, reducing leftover state after unlinking.
- Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention.
- Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. (Thanks @joaohlisboa)
## 1.3.0 — 2025-12-02
### Highlights
- **Pluggable agents (Claude, Pi, Codex, Opencode):** New `inbound.reply.agent` block chooses the CLI and parser per command reply; per-agent argv builders inject the right flags/identity/prompt handling and parse NDJSON streams, enabling Pi/Codex swaps without changing templates.
- **Safety stop words for agents:** If an inbound message is exactly `stop`, `esc`, `abort`, `wait`, or `exit`, warelay immediately replies “Agent was aborted.”, kills the pending agent run, and marks the session so the next prompt is prefixed with a reminder that the previous run was aborted.
- **Agent session reliability:** Only Claude currently returns a `session_id` that warelay persists; other agents (Gemini, Opencode, Codex, Pi) dont emit stable session identifiers, so multi-turn continuity may reset between runs for those harnesses.
### Bug Fixes ### Bug Fixes
- **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. - **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`.
@ -8,9 +21,11 @@
- **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses. - **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses.
- **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files. - **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files.
- **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently. - **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently.
- **Web send media kinds:** `sendMessageWeb` now honors media kind when sending via WhatsApp Web: audio → PTT with correct opus mimetype, video → video, image → image, other → document with filename. Previously all media were sent as images, breaking audio/video/doc sends.
### Changes ### Changes
- **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise. - **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise.
- **Batched inbound messaging with timestamps:** When multiple WhatsApp messages queue up, theyre sent to the agent in one combined batch, each line timestamped consistently to preserve ordering and context.
- **Typing indicator after IPC send:** After sending a message via IPC (e.g., `warelay send`), the relay now automatically shows the typing indicator ("composing") to signal that more messages may be coming. - **Typing indicator after IPC send:** After sending a message via IPC (e.g., `warelay send`), the relay now automatically shows the typing indicator ("composing") to signal that more messages may be coming.
- **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives.
- **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter. - **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter.

View File

@ -137,6 +137,16 @@ warelay supports running on the same phone number you message from—you chat wi
} }
``` ```
#### Abort trigger words
- If an inbound body is exactly `stop`, `esc`, `abort`, `wait`, or `exit`, the command/agent run is skipped and the user immediately gets `Agent was aborted.`.
- The session is tagged so the *next* prompt sent to the agent is prefixed with a short reminder that the previous run was aborted; the hint clears after that turn.
#### Agent choices
- `inbound.reply.agent.kind` can be `claude`, `opencode`, `pi`, `codex`, or `gemini`.
- Gemini CLI supports `--output-format text|json|stream-json`; warelay auto-adds it when you set `agent.format`.
- Session defaults: Claude uses `--session-id/--resume`, Codex/Opencode/Pi use `--session`, and Gemini defaults to `--resume` for session resumes (new sessions need no flag). Override via `sessionArgNew/sessionArgResume` if you prefer custom flags.
- Reliability note: only Claude reliably returns a `session_id` that warelay can persist and reuse. Other harnesses currently dont emit a stable session identifier, so multi-turn continuity may reset between runs for those agents (Pi does not auto-compact, but still doesnt expose a session id).
#### Heartbeat pings (command mode) #### Heartbeat pings (command mode)
- When `heartbeatMinutes` is set (default 10 for `mode: "command"`), the relay periodically runs your command/Claude session with a heartbeat prompt. - When `heartbeatMinutes` is set (default 10 for `mode: "command"`), the relay periodically runs your command/Claude session with a heartbeat prompt.
- Heartbeat body is `HEARTBEAT ultrathink` (so the model can recognize the probe); if Claude replies exactly `HEARTBEAT_OK`, the message is suppressed; otherwise the reply (or media) is forwarded. Suppressions are still logged so you know the heartbeat ran. - Heartbeat body is `HEARTBEAT ultrathink` (so the model can recognize the probe); if Claude replies exactly `HEARTBEAT_OK`, the message is suppressed; otherwise the reply (or media) is forwarded. Suppressions are still logged so you know the heartbeat ran.
@ -145,7 +155,7 @@ warelay supports running on the same phone number you message from—you chat wi
- When multiple active sessions exist, `warelay heartbeat` requires `--to <E.164>` or `--all`; if `allowFrom` is just `"*"`, you must choose a target with one of those flags. - When multiple active sessions exist, `warelay heartbeat` requires `--to <E.164>` or `--all`; if `allowFrom` is just `"*"`, you must choose a target with one of those flags.
### Logging (optional) ### Logging (optional)
- File logs are written to `/tmp/warelay/warelay.log` by default. Levels: `silent | fatal | error | warn | info | debug | trace` (CLI `--verbose` forces `debug`). Web-provider inbound/outbound entries include message bodies and auto-reply text for easier auditing. - File logs are written to `/tmp/warelay/warelay-YYYY-MM-DD.log` by default (rotated daily; files older than 24h are pruned). Levels: `silent | fatal | error | warn | info | debug | trace` (CLI `--verbose` forces `debug`). Web-provider inbound/outbound entries include message bodies and auto-reply text for easier auditing.
- Override in `~/.warelay/warelay.json`: - Override in `~/.warelay/warelay.json`:
```json5 ```json5
@ -208,7 +218,7 @@ Templating tokens: `{{Body}}`, `{{BodyStripped}}`, `{{From}}`, `{{To}}`, `{{Mess
## FAQ & Safety ## FAQ & Safety
- Twilio errors: **63016 “permission to send an SMS has not been enabled”** → ensure your number is WhatsApp-enabled; **63007 template not approved** → send a free-form session message within 24h or use an approved template; **63112 policy violation** → adjust content, shorten to <1600 chars, avoid links that trigger spam filters. Re-run `pnpm warelay status` to see the exact Twilio response body. - Twilio errors: **63016 “permission to send an SMS has not been enabled”** → ensure your number is WhatsApp-enabled; **63007 template not approved** → send a free-form session message within 24h or use an approved template; **63112 policy violation** → adjust content, shorten to <1600 chars, avoid links that trigger spam filters. Re-run `pnpm warelay status` to see the exact Twilio response body.
- Does this store my messages? warelay only writes `~/.warelay/warelay.json` (config), `~/.warelay/credentials/` (WhatsApp Web auth), and `~/.warelay/sessions.json` (session IDs + timestamps). It does **not** persist message bodies beyond the session store. Logs stream to stdout/stderr and also `/tmp/warelay/warelay.log` (configurable via `logging.file`). - Does this store my messages? warelay only writes `~/.warelay/warelay.json` (config), `~/.warelay/credentials/` (WhatsApp Web auth), and `~/.warelay/sessions.json` (session IDs + timestamps). It does **not** persist message bodies beyond the session store. Logs stream to stdout/stderr and also `/tmp/warelay/warelay-YYYY-MM-DD.log` (configurable via `logging.file`).
- Personal WhatsApp safety: Automation on personal accounts can be rate-limited or logged out by WhatsApp. Use `--provider web` sparingly, keep messages human-like, and re-run `login` if the session is dropped. - Personal WhatsApp safety: Automation on personal accounts can be rate-limited or logged out by WhatsApp. Use `--provider web` sparingly, keep messages human-like, and re-run `login` if the session is dropped.
- Limits to remember: WhatsApp text limit ~1600 chars; avoid rapid bursts—space sends by a few seconds; keep webhook replies under a couple seconds for good UX; command auto-replies time out after 600s by default. - Limits to remember: WhatsApp text limit ~1600 chars; avoid rapid bursts—space sends by a few seconds; keep webhook replies under a couple seconds for good UX; command auto-replies time out after 600s by default.
- Deploy / keep running: Use `tmux` or `screen` for ad-hoc (`tmux new -s warelay -- pnpm warelay relay --provider twilio`). For long-running hosts, wrap `pnpm warelay relay ...` or `pnpm warelay webhook --ingress tailscale ...` in a systemd service or macOS LaunchAgent; ensure environment variables are loaded in that context. - Deploy / keep running: Use `tmux` or `screen` for ad-hoc (`tmux new -s warelay -- pnpm warelay relay --provider twilio`). For long-running hosts, wrap `pnpm warelay relay ...` or `pnpm warelay webhook --ingress tailscale ...` in a systemd service or macOS LaunchAgent; ensure environment variables are loaded in that context.

77
docs/agent.md Normal file
View File

@ -0,0 +1,77 @@
# Agent Abstraction Refactor Plan
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
## Overview
- Introduce a pluggable agent layer (`src/agents/*`), selected by config.
- Normalize config (`agent` block) and remove `claudeOutputFormat` legacy knobs.
- Provide per-agent argv builders and output parsers (including NDJSON streams).
- Preserve MEDIA-token handling and shared queue/heartbeat behavior.
## Configuration
- New shape (no backward compat):
```json5
inbound: {
reply: {
mode: "command",
agent: {
kind: "claude" | "opencode" | "pi" | "codex",
format?: "text" | "json",
identityPrefix?: string
},
command: ["claude", "{{Body}}"],
cwd?: string,
session?: { ... },
timeoutSeconds?: number,
bodyPrefix?: string,
mediaUrl?: string,
mediaMaxMb?: number,
typingIntervalSeconds?: number,
heartbeatMinutes?: number
}
}
```
- Validation moves to `config.ts` (new `AgentKind`/`AgentConfig` types).
- If `agent` is missing → config error.
## Agent modules
- `src/agents/types.ts` `AgentKind`, `AgentSpec`:
- `buildArgs(argv: string[], body: string, ctx: { sessionId?, isNewSession?, sendSystemOnce?, systemSent?, identityPrefix? }): string[]`
- `parse(stdout: string): { text?: string; mediaUrls?: string[]; meta?: AgentMeta }`
- `src/agents/claude.ts` current flag injection (`--output-format`, `-p`), identity prepend.
- `src/agents/opencode.ts` reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix.
- `src/agents/pi.ts` parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags.
- `src/agents/codex.ts` parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only.
- Shared MEDIA extraction stays in `media/parse.ts`.
## Command runner changes
- `runCommandReply`:
- Resolve agent spec from config.
- Apply `buildArgs` (handles identity prepend and session args per agent).
- Run command; send stdout to `spec.parse``text`, `mediaUrls`, `meta` (stored as `agentMeta`).
- Remove `claudeMeta` naming; tests updated to `agentMeta`.
## Sessions
- Session arg defaults become agent-specific (Claude: `--resume/--session-id`; Opencode/Pi/Codex: `--session`).
- Still overridable via `sessionArgNew/sessionArgResume` in config.
## Tests
- Update existing tests to new config (no `claudeOutputFormat`).
- Add fixtures:
- Opencode NDJSON sample (from PR #5) → parsed text + meta.
- Codex NDJSON sample (captured: thread/turn/item/usage) → parsed text.
- Pi NDJSON sample (AssistantMessageEvent) → parsed text.
- Ensure MEDIA token parsing works on agent text output.
## Docs
- README: rename “Claude-aware” → “Multi-agent (Claude, Codex, Pi, Opencode)”.
- New short guide per agent (Opencode doc from PR #5; add Codex/Pi snippets).
- Mention identityPrefix override and session arg differences.
## Migration
- Breaking change: configs must specify `agent`. Remove old `claudeOutputFormat` keys.
- Provide migration note in CHANGELOG 1.3.x.
## Out of scope
- No media binary support; still relies on MEDIA tokens in text.
- No UI changes; WhatsApp/Twilio plumbing unchanged.

View File

@ -1,6 +1,6 @@
{ {
"name": "warelay", "name": "warelay",
"version": "1.2.3-typing-fix", "version": "1.3.0",
"description": "WhatsApp relay CLI (send, monitor, webhook, auto-reply) using Twilio", "description": "WhatsApp relay CLI (send, monitor, webhook, auto-reply) using Twilio",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",

118
src/agents/agents.test.ts Normal file
View File

@ -0,0 +1,118 @@
import { describe, expect, it } from "vitest";
import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js";
import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js";
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
describe("agent buildArgs + parseOutput helpers", () => {
it("claudeSpec injects flags and identity once", () => {
const argv = ["claude", "hi"];
const built = claudeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built).toContain("--output-format");
expect(built).toContain("json");
expect(built).toContain("-p");
expect(built.at(-1)).toContain(CLAUDE_IDENTITY_PREFIX);
const builtNoIdentity = claudeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: false,
sessionId: "sess",
sendSystemOnce: true,
systemSent: true,
identityPrefix: undefined,
format: "json",
});
expect(builtNoIdentity.at(-1)).not.toContain(CLAUDE_IDENTITY_PREFIX);
});
it("opencodeSpec adds format flag and identity prefix when needed", () => {
const argv = ["opencode", "body"];
const built = opencodeSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built).toContain("--format");
expect(built).toContain("json");
expect(built.at(-1)).toContain(OPENCODE_IDENTITY_PREFIX);
});
it("piSpec parses final assistant message and preserves usage meta", () => {
const stdout = [
'{"type":"message_start","message":{"role":"assistant"}}',
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
].join("\n");
const parsed = piSpec.parseOutput(stdout);
expect(parsed.text).toBe("hello world");
expect(parsed.meta?.provider).toBe("inflection");
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
});
it("codexSpec parses agent_message and aggregates usage", () => {
const stdout = [
'{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}',
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
].join("\n");
const parsed = codexSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi there");
const usage = parsed.meta?.usage as {
input?: number;
output?: number;
cacheRead?: number;
total?: number;
};
expect(usage?.input).toBe(50);
expect(usage?.output).toBe(10);
expect(usage?.cacheRead).toBe(5);
expect(usage?.total).toBe(65);
});
it("opencodeSpec parses streamed events and summarizes meta", () => {
const stdout = [
'{"type":"step_start","timestamp":0}',
'{"type":"text","part":{"text":"hi"}}',
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
].join("\n");
const parsed = opencodeSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi");
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
});
it("codexSpec buildArgs enforces exec/json/sandbox defaults", () => {
const argv = ["codex", "hello world"];
const built = codexSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built[1]).toBe("exec");
expect(built).toContain("--json");
expect(built).toContain("--skip-git-repo-check");
expect(built).toContain("read-only");
});
});

67
src/agents/claude.ts Normal file
View File

@ -0,0 +1,67 @@
import path from "node:path";
import {
CLAUDE_BIN,
CLAUDE_IDENTITY_PREFIX,
type ClaudeJsonParseResult,
parseClaudeJson,
summarizeClaudeMetadata,
} from "../auto-reply/claude.js";
import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined {
if (!parsed?.parsed) return undefined;
const summary = summarizeClaudeMetadata(parsed.parsed);
return summary ? { extra: { summary } } : undefined;
}
export const claudeSpec: AgentSpec = {
kind: "claude",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN,
buildArgs: (ctx) => {
// Split around the body so we can inject flags without losing the body
// position. This keeps templated prompts intact even when we add flags.
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
const wantsOutputFormat = typeof ctx.format === "string";
if (wantsOutputFormat) {
const hasOutputFormat = argv.some(
(part) =>
part === "--output-format" || part.startsWith("--output-format="),
);
if (!hasOutputFormat) {
const outputFormat = ctx.format ?? "json";
beforeBody.push("--output-format", outputFormat);
}
}
const hasPrintFlag = argv.some(
(part) => part === "-p" || part === "--print",
);
if (!hasPrintFlag) {
beforeBody.push("-p");
}
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? CLAUDE_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: (rawStdout) => {
const parsed = parseClaudeJson(rawStdout);
const text = parsed?.text ?? rawStdout.trim();
return {
text: text?.trim(),
meta: toMeta(parsed),
};
},
};

79
src/agents/codex.ts Normal file
View File

@ -0,0 +1,79 @@
import path from "node:path";
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
function parseCodexJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let text: string | undefined;
let meta: AgentMeta | undefined;
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
item?: { type?: string; text?: string };
usage?: unknown;
};
// Codex streams multiple events; capture the last agent_message text and
// the final turn usage for cost/telemetry.
if (
ev.type === "item.completed" &&
ev.item?.type === "agent_message" &&
typeof ev.item.text === "string"
) {
text = ev.item.text;
}
if (
ev.type === "turn.completed" &&
ev.usage &&
typeof ev.usage === "object"
) {
const u = ev.usage as {
input_tokens?: number;
cached_input_tokens?: number;
output_tokens?: number;
};
meta = {
usage: {
input: u.input_tokens,
output: u.output_tokens,
cacheRead: u.cached_input_tokens,
total:
(u.input_tokens ?? 0) +
(u.output_tokens ?? 0) +
(u.cached_input_tokens ?? 0),
},
};
}
} catch {
// ignore
}
}
return { text: text?.trim(), meta };
}
export const codexSpec: AgentSpec = {
kind: "codex",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "codex",
buildArgs: (ctx) => {
const argv = [...ctx.argv];
const hasExec = argv.length > 0 && argv[1] === "exec";
if (!hasExec) {
argv.splice(1, 0, "exec");
}
// Ensure JSON output
if (!argv.includes("--json")) {
argv.splice(argv.length - 1, 0, "--json");
}
// Safety defaults
if (!argv.includes("--skip-git-repo-check")) {
argv.splice(argv.length - 1, 0, "--skip-git-repo-check");
}
if (!argv.some((p) => p === "--sandbox" || p.startsWith("--sandbox="))) {
argv.splice(argv.length - 1, 0, "--sandbox", "read-only");
}
return argv;
},
parseOutput: parseCodexJson,
};

18
src/agents/index.ts Normal file
View File

@ -0,0 +1,18 @@
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
import type { AgentKind, AgentSpec } from "./types.js";
const specs: Record<AgentKind, AgentSpec> = {
claude: claudeSpec,
codex: codexSpec,
opencode: opencodeSpec,
pi: piSpec,
};
export function getAgentSpec(kind: AgentKind): AgentSpec {
return specs[kind];
}
export { AgentKind, AgentMeta, AgentParseResult } from "./types.js";

62
src/agents/opencode.ts Normal file
View File

@ -0,0 +1,62 @@
import path from "node:path";
import {
OPENCODE_BIN,
OPENCODE_IDENTITY_PREFIX,
parseOpencodeJson,
summarizeOpencodeMetadata,
} from "../auto-reply/opencode.js";
import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(
parsed: ReturnType<typeof parseOpencodeJson>,
): AgentMeta | undefined {
const summary = summarizeOpencodeMetadata(parsed.meta);
return summary ? { extra: { summary } } : undefined;
}
export const opencodeSpec: AgentSpec = {
kind: "opencode",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN,
buildArgs: (ctx) => {
// Split around the body so we can insert flags without losing the prompt.
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
const wantsJson = ctx.format === "json";
// Ensure format json for parsing
if (wantsJson) {
const hasFormat = [...beforeBody, body, ...afterBody].some(
(part) => part === "--format" || part.startsWith("--format="),
);
if (!hasFormat) {
beforeBody.push("--format", "json");
}
}
// Session args default to --session
// Identity prefix
// Opencode streams text tokens; we still seed an identity so the agent
// keeps context on first turn.
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: (rawStdout) => {
const parsed = parseOpencodeJson(rawStdout);
const text = parsed.text ?? rawStdout.trim();
return {
text: text?.trim(),
meta: toMeta(parsed),
};
},
};

75
src/agents/pi.ts Normal file
View File

@ -0,0 +1,75 @@
import path from "node:path";
import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
type PiAssistantMessage = {
role?: string;
content?: Array<{ type?: string; text?: string }>;
usage?: { input?: number; output?: number };
model?: string;
provider?: string;
stopReason?: string;
};
function parsePiJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let lastMessage: PiAssistantMessage | undefined;
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
message?: PiAssistantMessage;
};
// Pi emits a stream; we only care about the terminal assistant message_end.
if (ev.type === "message_end" && ev.message?.role === "assistant") {
lastMessage = ev.message;
}
} catch {
// ignore
}
}
const text =
lastMessage?.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
?.trim() ?? undefined;
const meta: AgentMeta | undefined = lastMessage
? {
model: lastMessage.model,
provider: lastMessage.provider,
stopReason: lastMessage.stopReason,
usage: lastMessage.usage,
}
: undefined;
return { text, meta };
}
export const piSpec: AgentSpec = {
kind: "pi",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi",
buildArgs: (ctx) => {
const argv = [...ctx.argv];
// Non-interactive print + JSON
if (!argv.includes("-p") && !argv.includes("--print")) {
argv.splice(argv.length - 1, 0, "-p");
}
if (
ctx.format === "json" &&
!argv.includes("--mode") &&
!argv.some((a) => a === "--mode")
) {
argv.splice(argv.length - 1, 0, "--mode", "json");
}
// Session defaults
// Identity prefix optional; Pi usually doesn't need it, but allow injection
if (!(ctx.sendSystemOnce && ctx.systemSent) && argv[ctx.bodyIndex]) {
const existingBody = argv[ctx.bodyIndex];
argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody]
.filter(Boolean)
.join("\n\n");
}
return argv;
},
parseOutput: parsePiJson,
};

41
src/agents/types.ts Normal file
View File

@ -0,0 +1,41 @@
export type AgentKind = "claude" | "opencode" | "pi" | "codex";
export type AgentMeta = {
model?: string;
provider?: string;
stopReason?: string;
usage?: {
input?: number;
output?: number;
cacheRead?: number;
cacheWrite?: number;
total?: number;
};
extra?: Record<string, unknown>;
};
export type AgentParseResult = {
text?: string;
mediaUrls?: string[];
meta?: AgentMeta;
};
export type BuildArgsContext = {
argv: string[];
bodyIndex: number; // index of prompt/body argument in argv
isNewSession: boolean;
sessionId?: string;
sendSystemOnce: boolean;
systemSent: boolean;
identityPrefix?: string;
format?: "text" | "json";
sessionArgNew?: string[];
sessionArgResume?: string[];
};
export interface AgentSpec {
kind: AgentKind;
isInvocation: (argv: string[]) => boolean;
buildArgs: (ctx: BuildArgsContext) => string[];
parseOutput: (rawStdout: string) => AgentParseResult;
}

View File

@ -160,3 +160,6 @@ export function parseClaudeJsonText(raw: string): string | undefined {
const parsed = parseClaudeJson(raw); const parsed = parseClaudeJson(raw);
return parsed?.text; return parsed?.text;
} }
// Re-export from command-reply for backwards compatibility
export { summarizeClaudeMetadata } from "./command-reply.js";

View File

@ -70,7 +70,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
@ -98,7 +98,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: true, sendSystemOnce: true,
@ -121,7 +121,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: true, sendSystemOnce: true,
@ -144,7 +144,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: true, sendSystemOnce: true,
@ -167,6 +167,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["cli", "{{Body}}"], command: ["cli", "{{Body}}"],
agent: { kind: "claude" },
session: { session: {
sessionArgNew: ["--new", "{{SessionId}}"], sessionArgNew: ["--new", "{{SessionId}}"],
sessionArgResume: ["--resume", "{{SessionId}}"], sessionArgResume: ["--resume", "{{SessionId}}"],
@ -192,7 +193,11 @@ describe("runCommandReply", () => {
throw { stdout: "partial output here", killed: true, signal: "SIGKILL" }; throw { stdout: "partial output here", killed: true, signal: "SIGKILL" };
}); });
const { payload, meta } = await runCommandReply({ const { payload, meta } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"] }, reply: {
mode: "command",
command: ["echo", "hi"],
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
isNewSession: true, isNewSession: true,
@ -213,7 +218,12 @@ describe("runCommandReply", () => {
throw { stdout: "", killed: true, signal: "SIGKILL" }; throw { stdout: "", killed: true, signal: "SIGKILL" };
}); });
const { payload } = await runCommandReply({ const { payload } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work" }, reply: {
mode: "command",
command: ["echo", "hi"],
cwd: "/tmp/work",
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
isNewSession: true, isNewSession: true,
@ -235,7 +245,12 @@ describe("runCommandReply", () => {
stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`, stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`,
}); });
const { payload } = await runCommandReply({ const { payload } = await runCommandReply({
reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1 }, reply: {
mode: "command",
command: ["echo", "hi"],
mediaMaxMb: 1,
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
isNewSession: true, isNewSession: true,
@ -259,7 +274,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
@ -271,14 +286,18 @@ describe("runCommandReply", () => {
commandRunner: runner, commandRunner: runner,
enqueue: enqueueImmediate, enqueue: enqueueImmediate,
}); });
expect(meta.claudeMeta).toContain("duration=50ms"); expect(meta.agentMeta?.extra?.summary).toContain("duration=50ms");
expect(meta.claudeMeta).toContain("tool_calls=1"); expect(meta.agentMeta?.extra?.summary).toContain("tool_calls=1");
}); });
it("captures queue wait metrics in meta", async () => { it("captures queue wait metrics in meta", async () => {
const runner = makeRunner({ stdout: "ok" }); const runner = makeRunner({ stdout: "ok" });
const { meta } = await runCommandReply({ const { meta } = await runCommandReply({
reply: { mode: "command", command: ["echo", "{{Body}}"] }, reply: {
mode: "command",
command: ["echo", "{{Body}}"],
agent: { kind: "claude" },
},
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
isNewSession: true, isNewSession: true,
@ -303,7 +322,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
@ -328,7 +347,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,
@ -353,7 +372,7 @@ describe("runCommandReply", () => {
reply: { reply: {
mode: "command", mode: "command",
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json", agent: { kind: "claude", format: "json" },
}, },
templatingCtx: noopTemplateCtx, templatingCtx: noopTemplateCtx,
sendSystemOnce: false, sendSystemOnce: false,

View File

@ -1,18 +1,14 @@
import fs from "node:fs/promises"; import fs from "node:fs/promises";
import path from "node:path"; import path from "node:path";
import { type AgentKind, getAgentSpec } from "../agents/index.js";
import type { AgentMeta } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js"; import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js"; import { isVerbose, logVerbose } from "../globals.js";
import { logError } from "../logger.js"; import { logError } from "../logger.js";
import { splitMediaFromOutput } from "../media/parse.js"; import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js"; import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js"; import type { runCommandWithTimeout } from "../process/exec.js";
import {
CLAUDE_BIN,
CLAUDE_IDENTITY_PREFIX,
type ClaudeJsonParseResult,
parseClaudeJson,
} from "./claude.js";
import { applyTemplate, type TemplateContext } from "./templating.js"; import { applyTemplate, type TemplateContext } from "./templating.js";
import type { ReplyPayload } from "./types.js"; import type { ReplyPayload } from "./types.js";
@ -42,7 +38,7 @@ export type CommandReplyMeta = {
exitCode?: number | null; exitCode?: number | null;
signal?: string | null; signal?: string | null;
killed?: boolean; killed?: boolean;
claudeMeta?: string; agentMeta?: AgentMeta;
}; };
export type CommandReplyResult = { export type CommandReplyResult = {
@ -119,6 +115,9 @@ export async function runCommandReply(
if (!reply.command?.length) { if (!reply.command?.length) {
throw new Error("reply.command is required for mode=command"); throw new Error("reply.command is required for mode=command");
} }
const agentCfg = reply.agent ?? { kind: "claude" };
const agentKind: AgentKind = agentCfg.kind ?? "claude";
const agent = getAgentSpec(agentKind);
let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); let argv = reply.command.map((part) => applyTemplate(part, templatingCtx));
const templatePrefix = const templatePrefix =
@ -129,41 +128,24 @@ export async function runCommandReply(
argv = [argv[0], templatePrefix, ...argv.slice(1)]; argv = [argv[0], templatePrefix, ...argv.slice(1)];
} }
// Ensure Claude commands can emit plain text by forcing --output-format when configured. // Default body index is last arg
if ( let bodyIndex = Math.max(argv.length - 1, 0);
reply.claudeOutputFormat &&
argv.length > 0 &&
path.basename(argv[0]) === CLAUDE_BIN
) {
const hasOutputFormat = argv.some(
(part) =>
part === "--output-format" || part.startsWith("--output-format="),
);
const insertBeforeBody = Math.max(argv.length - 1, 0);
if (!hasOutputFormat) {
argv = [
...argv.slice(0, insertBeforeBody),
"--output-format",
reply.claudeOutputFormat,
...argv.slice(insertBeforeBody),
];
}
const hasPrintFlag = argv.some(
(part) => part === "-p" || part === "--print",
);
if (!hasPrintFlag) {
const insertIdx = Math.max(argv.length - 1, 0);
argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)];
}
}
// Inject session args if configured (use resume for existing, session-id for new) // Session args prepared (templated) and injected generically
if (reply.session) { if (reply.session) {
const defaultNew =
agentCfg.kind === "claude"
? ["--session-id", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultResume =
agentCfg.kind === "claude"
? ["--resume", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const sessionArgList = ( const sessionArgList = (
isNewSession isNewSession
? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"]) ? (reply.session.sessionArgNew ?? defaultNew)
: (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"]) : (reply.session.sessionArgResume ?? defaultResume)
).map((part) => applyTemplate(part, templatingCtx)); ).map((p) => applyTemplate(p, templatingCtx));
if (sessionArgList.length) { if (sessionArgList.length) {
const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true; const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true;
const insertAt = const insertAt =
@ -173,22 +155,24 @@ export async function runCommandReply(
...sessionArgList, ...sessionArgList,
...argv.slice(insertAt), ...argv.slice(insertAt),
]; ];
bodyIndex = Math.max(argv.length - 1, 0);
} }
} }
let finalArgv = argv; const shouldApplyAgent = agent.isInvocation(argv);
const isClaudeInvocation = const finalArgv = shouldApplyAgent
finalArgv.length > 0 && path.basename(finalArgv[0]) === CLAUDE_BIN; ? agent.buildArgs({
const shouldPrependIdentity = argv,
isClaudeInvocation && !(sendSystemOnce && systemSent); bodyIndex,
if (shouldPrependIdentity && finalArgv.length > 0) { isNewSession,
const bodyIdx = finalArgv.length - 1; sessionId: templatingCtx.SessionId,
const existingBody = finalArgv[bodyIdx] ?? ""; sendSystemOnce,
finalArgv = [ systemSent,
...finalArgv.slice(0, bodyIdx), identityPrefix: agentCfg.identityPrefix,
[CLAUDE_IDENTITY_PREFIX, existingBody].filter(Boolean).join("\n\n"), format: agentCfg.format,
]; })
} : argv;
logVerbose( logVerbose(
`Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, `Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
); );
@ -217,28 +201,14 @@ export async function runCommandReply(
if (stderr?.trim()) { if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
} }
let parsed: ClaudeJsonParseResult | undefined;
if ( const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
trimmed && // Treat empty string as "no content" so we can fall back to the friendly
(reply.claudeOutputFormat === "json" || isClaudeInvocation) // "(command produced no output)" message instead of echoing raw JSON.
) { if (parsed && parsed.text !== undefined) {
parsed = parseClaudeJson(trimmed);
if (parsed?.parsed && isVerbose()) {
const summary = summarizeClaudeMetadata(parsed.parsed);
if (summary) logVerbose(`Claude JSON meta: ${summary}`);
logVerbose(
`Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`,
);
}
if (typeof parsed?.text === "string") {
logVerbose(
`Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`,
);
trimmed = parsed.text.trim(); trimmed = parsed.text.trim();
} else {
logVerbose("Claude JSON parse failed; returning raw stdout");
}
} }
const { text: cleanedText, mediaUrls: mediaFound } = const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed); splitMediaFromOutput(trimmed);
trimmed = cleanedText; trimmed = cleanedText;
@ -249,7 +219,7 @@ export async function runCommandReply(
logVerbose("No MEDIA token extracted from final text"); logVerbose("No MEDIA token extracted from final text");
} }
if (!trimmed && !mediaFromCommand) { if (!trimmed && !mediaFromCommand) {
const meta = parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined; const meta = parsed?.meta?.extra?.summary ?? undefined;
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`;
logVerbose("No text/media produced; injecting fallback notice to user"); logVerbose("No text/media produced; injecting fallback notice to user");
} }
@ -273,9 +243,7 @@ export async function runCommandReply(
exitCode: code, exitCode: code,
signal, signal,
killed, killed,
claudeMeta: parsed agentMeta: parsed?.meta,
? summarizeClaudeMetadata(parsed.parsed)
: undefined,
}, },
}; };
} }
@ -293,9 +261,7 @@ export async function runCommandReply(
exitCode: code, exitCode: code,
signal, signal,
killed, killed,
claudeMeta: parsed agentMeta: parsed?.meta,
? summarizeClaudeMetadata(parsed.parsed)
: undefined,
}, },
}; };
} }
@ -343,7 +309,7 @@ export async function runCommandReply(
exitCode: code, exitCode: code,
signal, signal,
killed, killed,
claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined, agentMeta: parsed?.meta,
}; };
if (isVerbose()) { if (isVerbose()) {
logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`); logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`);

104
src/auto-reply/opencode.ts Normal file
View File

@ -0,0 +1,104 @@
// Helpers specific to Opencode CLI output/argv handling.
// Preferred binary name for Opencode CLI invocations.
export const OPENCODE_BIN = "opencode";
export const OPENCODE_IDENTITY_PREFIX =
"You are Openclawd running on the user's Mac via warelay. Your scratchpad is /Users/steipete/openclawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK.";
export type OpencodeJsonParseResult = {
text?: string;
parsed: unknown[];
valid: boolean;
meta?: {
durationMs?: number;
cost?: number;
tokens?: {
input?: number;
output?: number;
};
};
};
export function parseOpencodeJson(raw: string): OpencodeJsonParseResult {
const lines = raw.split(/\n+/).filter((s) => s.trim());
const parsed: unknown[] = [];
let text = "";
let valid = false;
let startTime: number | undefined;
let endTime: number | undefined;
let cost = 0;
let inputTokens = 0;
let outputTokens = 0;
for (const line of lines) {
try {
const event = JSON.parse(line);
parsed.push(event);
if (event && typeof event === "object") {
// Opencode emits a stream of events.
if (event.type === "step_start") {
valid = true;
if (typeof event.timestamp === "number") {
if (startTime === undefined || event.timestamp < startTime) {
startTime = event.timestamp;
}
}
}
if (event.type === "text" && event.part?.text) {
text += event.part.text;
valid = true;
}
if (event.type === "step_finish") {
valid = true;
if (typeof event.timestamp === "number") {
endTime = event.timestamp;
}
if (event.part) {
if (typeof event.part.cost === "number") {
cost += event.part.cost;
}
if (event.part.tokens) {
inputTokens += event.part.tokens.input || 0;
outputTokens += event.part.tokens.output || 0;
}
}
}
}
} catch {
// ignore non-JSON lines
}
}
const meta: OpencodeJsonParseResult["meta"] = {};
if (startTime !== undefined && endTime !== undefined) {
meta.durationMs = endTime - startTime;
}
if (cost > 0) meta.cost = cost;
if (inputTokens > 0 || outputTokens > 0) {
meta.tokens = { input: inputTokens, output: outputTokens };
}
return {
text: text || undefined,
parsed,
valid: valid && parsed.length > 0,
meta: Object.keys(meta).length > 0 ? meta : undefined,
};
}
export function summarizeOpencodeMetadata(
meta: OpencodeJsonParseResult["meta"],
): string | undefined {
if (!meta) return undefined;
const parts: string[] = [];
if (meta.durationMs !== undefined)
parts.push(`duration=${meta.durationMs}ms`);
if (meta.cost !== undefined) parts.push(`cost=$${meta.cost.toFixed(4)}`);
if (meta.tokens) {
parts.push(`tokens=${meta.tokens.input}+${meta.tokens.output}`);
}
return parts.length ? parts.join(", ") : undefined;
}

View File

@ -280,8 +280,8 @@ export async function getReplyFromConfig(
timeoutSeconds, timeoutSeconds,
commandRunner, commandRunner,
}); });
if (meta.claudeMeta && isVerbose()) { if (meta.agentMeta && isVerbose()) {
logVerbose(`Claude JSON meta: ${meta.claudeMeta}`); logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
} }
return payload; return payload;
} finally { } finally {

View File

@ -4,6 +4,10 @@ import type { CliDeps } from "../cli/deps.js";
import type { RuntimeEnv } from "../runtime.js"; import type { RuntimeEnv } from "../runtime.js";
import { sendCommand } from "./send.js"; import { sendCommand } from "./send.js";
vi.mock("../web/ipc.js", () => ({
sendViaIpc: vi.fn().mockResolvedValue(null),
}));
const runtime: RuntimeEnv = { const runtime: RuntimeEnv = {
log: vi.fn(), log: vi.fn(),
error: vi.fn(), error: vi.fn(),

View File

@ -5,8 +5,9 @@ import path from "node:path";
import JSON5 from "json5"; import JSON5 from "json5";
import { z } from "zod"; import { z } from "zod";
import type { AgentKind } from "../agents/index.js";
export type ReplyMode = "text" | "command"; export type ReplyMode = "text" | "command";
export type ClaudeOutputFormat = "text" | "json" | "stream-json";
export type SessionScope = "per-sender" | "global"; export type SessionScope = "per-sender" | "global";
export type SessionConfig = { export type SessionConfig = {
@ -58,18 +59,22 @@ export type WarelayConfig = {
}; };
reply?: { reply?: {
mode: ReplyMode; mode: ReplyMode;
text?: string; // for mode=text, can contain {{Body}} text?: string;
command?: string[]; // for mode=command, argv with templates command?: string[];
cwd?: string; // working directory for command execution cwd?: string;
template?: string; // prepend template string when building command/prompt template?: string;
timeoutSeconds?: number; // optional command timeout; defaults to 600s timeoutSeconds?: number;
bodyPrefix?: string; // optional string prepended to Body before templating bodyPrefix?: string;
mediaUrl?: string; // optional media attachment (path or URL) mediaUrl?: string;
session?: SessionConfig; session?: SessionConfig;
claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format mediaMaxMb?: number;
mediaMaxMb?: number; // optional cap for outbound media (default 5MB) typingIntervalSeconds?: number;
typingIntervalSeconds?: number; // how often to refresh typing indicator while command runs heartbeatMinutes?: number;
heartbeatMinutes?: number; // auto-ping cadence for command mode agent?: {
kind: AgentKind;
format?: "text" | "json";
identityPrefix?: string;
};
}; };
}; };
web?: WebConfig; web?: WebConfig;
@ -109,13 +114,17 @@ const ReplySchema = z
}) })
.optional(), .optional(),
heartbeatMinutes: z.number().int().nonnegative().optional(), heartbeatMinutes: z.number().int().nonnegative().optional(),
claudeOutputFormat: z agent: z
.union([ .object({
z.literal("text"), kind: z.union([
z.literal("json"), z.literal("claude"),
z.literal("stream-json"), z.literal("opencode"),
z.undefined(), z.literal("pi"),
]) z.literal("codex"),
]),
format: z.union([z.literal("text"), z.literal("json")]).optional(),
identityPrefix: z.string().optional(),
})
.optional(), .optional(),
}) })
.refine( .refine(

View File

@ -762,7 +762,7 @@ describe("config and templating", () => {
reply: { reply: {
mode: "command" as const, mode: "command" as const,
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "text" as const, agent: { kind: "claude", format: "text" as const },
}, },
}, },
}; };
@ -802,7 +802,7 @@ describe("config and templating", () => {
reply: { reply: {
mode: "command" as const, mode: "command" as const,
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
claudeOutputFormat: "json" as const, agent: { kind: "claude", format: "json" as const },
}, },
}, },
}; };
@ -830,7 +830,7 @@ describe("config and templating", () => {
reply: { reply: {
mode: "command" as const, mode: "command" as const,
command: ["claude", "{{Body}}"], command: ["claude", "{{Body}}"],
// No claudeOutputFormat set on purpose agent: { kind: "claude" },
}, },
}, },
}; };

View File

@ -7,7 +7,11 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import { setVerbose } from "./globals.js"; import { setVerbose } from "./globals.js";
import { logDebug, logError, logInfo, logSuccess, logWarn } from "./logger.js"; import { logDebug, logError, logInfo, logSuccess, logWarn } from "./logger.js";
import { resetLogger, setLoggerOverride } from "./logging.js"; import {
DEFAULT_LOG_DIR,
resetLogger,
setLoggerOverride,
} from "./logging.js";
import type { RuntimeEnv } from "./runtime.js"; import type { RuntimeEnv } from "./runtime.js";
describe("logger helpers", () => { describe("logger helpers", () => {
@ -67,6 +71,28 @@ describe("logger helpers", () => {
expect(content).toContain("warn-only"); expect(content).toContain("warn-only");
cleanup(logPath); cleanup(logPath);
}); });
it("uses daily rolling default log file and prunes old ones", () => {
resetLogger();
setLoggerOverride({}); // force defaults regardless of user config
const today = new Date().toISOString().slice(0, 10);
const todayPath = path.join(DEFAULT_LOG_DIR, `warelay-${today}.log`);
// create an old file to be pruned
const oldPath = path.join(DEFAULT_LOG_DIR, "warelay-2000-01-01.log");
fs.mkdirSync(DEFAULT_LOG_DIR, { recursive: true });
fs.writeFileSync(oldPath, "old");
fs.utimesSync(oldPath, new Date(0), new Date(0));
cleanup(todayPath);
logInfo("roll-me");
expect(fs.existsSync(todayPath)).toBe(true);
expect(fs.readFileSync(todayPath, "utf-8")).toContain("roll-me");
expect(fs.existsSync(oldPath)).toBe(false);
cleanup(todayPath);
});
}); });
function pathForTest() { function pathForTest() {

View File

@ -6,8 +6,12 @@ import pino, { type Bindings, type LevelWithSilent, type Logger } from "pino";
import { loadConfig, type WarelayConfig } from "./config/config.js"; import { loadConfig, type WarelayConfig } from "./config/config.js";
import { isVerbose } from "./globals.js"; import { isVerbose } from "./globals.js";
const DEFAULT_LOG_DIR = path.join(os.tmpdir(), "warelay"); export const DEFAULT_LOG_DIR = path.join(os.tmpdir(), "warelay");
export const DEFAULT_LOG_FILE = path.join(DEFAULT_LOG_DIR, "warelay.log"); export const DEFAULT_LOG_FILE = path.join(DEFAULT_LOG_DIR, "warelay.log"); // legacy single-file path
const LOG_PREFIX = "warelay";
const LOG_SUFFIX = ".log";
const MAX_LOG_AGE_MS = 24 * 60 * 60 * 1000; // 24h
const ALLOWED_LEVELS: readonly LevelWithSilent[] = [ const ALLOWED_LEVELS: readonly LevelWithSilent[] = [
"silent", "silent",
@ -46,7 +50,7 @@ function resolveSettings(): ResolvedSettings {
const cfg: WarelayConfig["logging"] | undefined = const cfg: WarelayConfig["logging"] | undefined =
overrideSettings ?? loadConfig().logging; overrideSettings ?? loadConfig().logging;
const level = normalizeLevel(cfg?.level); const level = normalizeLevel(cfg?.level);
const file = cfg?.file ?? DEFAULT_LOG_FILE; const file = cfg?.file ?? defaultRollingPathForToday();
return { level, file }; return { level, file };
} }
@ -57,6 +61,10 @@ function settingsChanged(a: ResolvedSettings | null, b: ResolvedSettings) {
function buildLogger(settings: ResolvedSettings): Logger { function buildLogger(settings: ResolvedSettings): Logger {
fs.mkdirSync(path.dirname(settings.file), { recursive: true }); fs.mkdirSync(path.dirname(settings.file), { recursive: true });
// Clean up stale rolling logs when using a dated log filename.
if (isRollingPath(settings.file)) {
pruneOldRollingLogs(path.dirname(settings.file));
}
const destination = pino.destination({ const destination = pino.destination({
dest: settings.file, dest: settings.file,
mkdir: true, mkdir: true,
@ -104,3 +112,39 @@ export function resetLogger() {
cachedSettings = null; cachedSettings = null;
overrideSettings = null; overrideSettings = null;
} }
function defaultRollingPathForToday(): string {
const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD
return path.join(DEFAULT_LOG_DIR, `${LOG_PREFIX}-${today}${LOG_SUFFIX}`);
}
function isRollingPath(file: string): boolean {
const base = path.basename(file);
return (
base.startsWith(`${LOG_PREFIX}-`) &&
base.endsWith(LOG_SUFFIX) &&
base.length === `${LOG_PREFIX}-YYYY-MM-DD${LOG_SUFFIX}`.length
);
}
function pruneOldRollingLogs(dir: string): void {
try {
const entries = fs.readdirSync(dir, { withFileTypes: true });
const cutoff = Date.now() - MAX_LOG_AGE_MS;
for (const entry of entries) {
if (!entry.isFile()) continue;
if (!entry.name.startsWith(`${LOG_PREFIX}-`) || !entry.name.endsWith(LOG_SUFFIX)) continue;
const fullPath = path.join(dir, entry.name);
try {
const stat = fs.statSync(fullPath);
if (stat.mtimeMs < cutoff) {
fs.rmSync(fullPath, { force: true });
}
} catch {
// ignore errors during pruning
}
}
} catch {
// ignore missing dir or read errors
}
}

View File

@ -49,4 +49,27 @@ describe("media server", () => {
await expect(fs.stat(file)).rejects.toThrow(); await expect(fs.stat(file)).rejects.toThrow();
await new Promise((r) => server.close(r)); await new Promise((r) => server.close(r));
}); });
it("blocks path traversal attempts", async () => {
const server = await startMediaServer(0, 5_000);
const port = (server.address() as AddressInfo).port;
// URL-encoded "../" to bypass client-side path normalization
const res = await fetch(`http://localhost:${port}/media/%2e%2e%2fpackage.json`);
expect(res.status).toBe(400);
expect(await res.text()).toBe("invalid path");
await new Promise((r) => server.close(r));
});
it("blocks symlink escaping outside media dir", async () => {
const target = path.join(process.cwd(), "package.json"); // outside MEDIA_DIR
const link = path.join(MEDIA_DIR, "link-out");
await fs.symlink(target, link);
const server = await startMediaServer(0, 5_000);
const port = (server.address() as AddressInfo).port;
const res = await fetch(`http://localhost:${port}/media/link-out`);
expect(res.status).toBe(400);
expect(await res.text()).toBe("invalid path");
await new Promise((r) => server.close(r));
});
}); });

View File

@ -17,19 +17,31 @@ export function attachMediaRoutes(
app.get("/media/:id", async (req, res) => { app.get("/media/:id", async (req, res) => {
const id = req.params.id; const id = req.params.id;
const file = path.join(mediaDir, id); const mediaRoot = (await fs.realpath(mediaDir)) + path.sep;
const file = path.resolve(mediaRoot, id);
try { try {
const stat = await fs.stat(file); const lstat = await fs.lstat(file);
if (lstat.isSymbolicLink()) {
res.status(400).send("invalid path");
return;
}
const realPath = await fs.realpath(file);
if (!realPath.startsWith(mediaRoot)) {
res.status(400).send("invalid path");
return;
}
const stat = await fs.stat(realPath);
if (Date.now() - stat.mtimeMs > ttlMs) { if (Date.now() - stat.mtimeMs > ttlMs) {
await fs.rm(file).catch(() => {}); await fs.rm(realPath).catch(() => {});
res.status(410).send("expired"); res.status(410).send("expired");
return; return;
} }
res.sendFile(file); res.sendFile(realPath);
// best-effort single-use cleanup after response ends // best-effort single-use cleanup after response ends
res.on("finish", () => { res.on("finish", () => {
setTimeout(() => { setTimeout(() => {
fs.rm(file).catch(() => {}); fs.rm(realPath).catch(() => {});
}, 500); }, 500);
}); });
} catch { } catch {

View File

@ -1,5 +1,4 @@
// Import test-helpers FIRST to set up mocks before other imports import "./test-helpers.js";
import crypto from "node:crypto"; import crypto from "node:crypto";
import fs from "node:fs/promises"; import fs from "node:fs/promises";
import os from "node:os"; import os from "node:os";
@ -570,6 +569,78 @@ describe("web auto-reply", () => {
} }
}); });
it("batches inbound messages while queue is busy and preserves timestamps", async () => {
vi.useFakeTimers();
const originalMax = process.getMaxListeners();
process.setMaxListeners?.(1); // force low to confirm bump
const sendMedia = vi.fn();
const reply = vi.fn().mockResolvedValue(undefined);
const sendComposing = vi.fn();
const resolver = vi.fn().mockResolvedValue({ text: "batched" });
let capturedOnMessage:
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
| undefined;
const listenerFactory = async (opts: {
onMessage: (
msg: import("./inbound.js").WebInboundMessage,
) => Promise<void>;
}) => {
capturedOnMessage = opts.onMessage;
return { close: vi.fn() };
};
// Queue starts busy, then frees after one polling tick.
let queueBusy = true;
const queueSpy = vi
.spyOn(commandQueue, "getQueueSize")
.mockImplementation(() => (queueBusy ? 1 : 0));
setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } }));
await monitorWebProvider(false, listenerFactory, false, resolver);
expect(capturedOnMessage).toBeDefined();
// Two messages from the same sender with fixed timestamps
await capturedOnMessage?.({
body: "first",
from: "+1",
to: "+2",
id: "m1",
timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC
sendComposing,
reply,
sendMedia,
});
await capturedOnMessage?.({
body: "second",
from: "+1",
to: "+2",
id: "m2",
timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC
sendComposing,
reply,
sendMedia,
});
// Let the queued batch flush once the queue is free
queueBusy = false;
vi.advanceTimersByTime(200);
expect(resolver).toHaveBeenCalledTimes(1);
const args = resolver.mock.calls[0][0];
expect(args.Body).toContain("[Jan 1 00:00] [warelay] first");
expect(args.Body).toContain("[Jan 1 01:00] [warelay] second");
// Max listeners bumped to avoid warnings in multi-instance test runs
expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50);
queueSpy.mockRestore();
process.setMaxListeners?.(originalMax);
vi.useRealTimers();
});
it("falls back to text when media send fails", async () => { it("falls back to text when media send fails", async () => {
const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const sendMedia = vi.fn().mockRejectedValue(new Error("boom"));
const reply = vi.fn().mockResolvedValue(undefined); const reply = vi.fn().mockResolvedValue(undefined);

View File

@ -16,7 +16,7 @@ import {
import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js"; import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
import { getQueueSize } from "../process/command-queue.js"; import { enqueueCommand, getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js"; import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js"; import { monitorWebInbox } from "./inbound.js";
@ -551,6 +551,13 @@ export async function monitorWebProvider(
}), }),
); );
// Avoid noisy MaxListenersExceeded warnings in test environments where
// multiple relay instances may be constructed.
const currentMaxListeners = process.getMaxListeners?.() ?? 10;
if (process.setMaxListeners && currentMaxListeners < 50) {
process.setMaxListeners(50);
}
let sigintStop = false; let sigintStop = false;
const handleSigint = () => { const handleSigint = () => {
sigintStop = true; sigintStop = true;
@ -580,107 +587,103 @@ export async function monitorWebProvider(
const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages
const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute
const listener = await (listenerFactory ?? monitorWebInbox)({ // Batch inbound messages while command queue is busy, then send one
verbose, // combined prompt with per-message timestamps (inbound-only behavior).
onMessage: async (msg) => { type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout };
// Also add IPC-sent messages to echo detection const pendingBatches = new Map<string, PendingBatch>();
// (this is handled below in the IPC sendHandler)
handledMessages += 1;
lastMessageAt = Date.now();
const ts = msg.timestamp
? new Date(msg.timestamp).toISOString()
: new Date().toISOString();
const correlationId = msg.id ?? newConnectionId();
replyLogger.info(
{
connectionId,
correlationId,
from: msg.from,
to: msg.to,
body: msg.body,
mediaType: msg.mediaType ?? null,
mediaPath: msg.mediaPath ?? null,
},
"inbound web message",
);
console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); const formatTimestamp = (ts?: number) => {
// Detect same-phone mode (self-messaging)
const isSamePhoneMode = msg.from === msg.to;
if (isSamePhoneMode) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body); // Remove from set to allow future identical messages
return;
}
logVerbose(
`Echo check: message not in recent set (size: ${recentlySent.size})`,
);
lastInboundMsg = msg;
// Build timestamp prefix (default: enabled with UTC)
// Can be: true (UTC), false (disabled), or "America/New_York" (custom timezone)
let timestampStr = "";
const tsCfg = cfg.inbound?.timestampPrefix; const tsCfg = cfg.inbound?.timestampPrefix;
const tsEnabled = tsCfg !== false; // default true const tsEnabled = tsCfg !== false; // default true
if (tsEnabled) { if (!tsEnabled) return "";
const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; const tz = typeof tsCfg === "string" ? tsCfg : "UTC";
const now = new Date(); const date = ts ? new Date(ts) : new Date();
try { try {
// Format: "Nov 29 06:30" - compact but informative return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
timestampStr = `[${now.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `;
} catch { } catch {
// Fallback to UTC if timezone invalid return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `;
timestampStr = `[${now.toISOString().slice(5, 16).replace("T", " ")}] `;
}
} }
};
const buildLine = (msg: WebInboundMsg) => {
// Build message prefix: explicit config > default based on allowFrom // Build message prefix: explicit config > default based on allowFrom
// If allowFrom is configured, user likely has a specific setup - no default prefix
// If no allowFrom, add "[warelay]" so AI knows it's coming through warelay
let messagePrefix = cfg.inbound?.messagePrefix; let messagePrefix = cfg.inbound?.messagePrefix;
if (messagePrefix === undefined) { if (messagePrefix === undefined) {
const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0;
messagePrefix = hasAllowFrom ? "" : "[warelay]"; messagePrefix = hasAllowFrom ? "" : "[warelay]";
} }
const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; const prefixStr = messagePrefix ? `${messagePrefix} ` : "";
const bodyForCommand = `${timestampStr}${prefixStr}${msg.body}`; return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`;
};
const replyResult = await (replyResolver ?? getReplyFromConfig)( const processBatch = async (from: string) => {
const batch = pendingBatches.get(from);
if (!batch || batch.messages.length === 0) return;
if (getQueueSize() > 0) {
// Wait until command queue is free to run the combined prompt.
batch.timer = setTimeout(() => void processBatch(from), 150);
return;
}
pendingBatches.delete(from);
const messages = batch.messages;
const latest = messages[messages.length - 1];
const combinedBody = messages.map(buildLine).join("\n");
// Echo detection uses combined body so we don't respond twice.
if (recentlySent.has(combinedBody)) {
logVerbose(`Skipping auto-reply: detected echo for combined batch`);
recentlySent.delete(combinedBody);
return;
}
const correlationId = latest.id ?? newConnectionId();
replyLogger.info(
{ {
Body: bodyForCommand, connectionId,
From: msg.from, correlationId,
To: msg.to, from,
MessageSid: msg.id, to: latest.to,
MediaPath: msg.mediaPath, body: combinedBody,
MediaUrl: msg.mediaUrl, mediaType: latest.mediaType ?? null,
MediaType: msg.mediaType, mediaPath: latest.mediaPath ?? null,
}, batchSize: messages.length,
{
onReplyStart: msg.sendComposing,
}, },
"inbound web message (batched)",
); );
const tsDisplay = latest.timestamp
? new Date(latest.timestamp).toISOString()
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
),
);
if ( if (
!replyResult || !replyResult ||
(!replyResult.text && (!replyResult.text &&
!replyResult.mediaUrl && !replyResult.mediaUrl &&
!replyResult.mediaUrls?.length) !replyResult.mediaUrls?.length)
) { ) {
logVerbose( logVerbose("Skipping auto-reply: no text/media returned from resolver");
"Skipping auto-reply: no text/media returned from resolver",
);
return; return;
} }
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
const responsePrefix = cfg.inbound?.responsePrefix; const responsePrefix = cfg.inbound?.responsePrefix;
if ( if (
@ -688,7 +691,6 @@ export async function monitorWebProvider(
replyResult.text && replyResult.text &&
replyResult.text.trim() !== HEARTBEAT_TOKEN replyResult.text.trim() !== HEARTBEAT_TOKEN
) { ) {
// Only add prefix if not already present
if (!replyResult.text.startsWith(responsePrefix)) { if (!replyResult.text.startsWith(responsePrefix)) {
replyResult.text = `${responsePrefix} ${replyResult.text}`; replyResult.text = `${responsePrefix} ${replyResult.text}`;
} }
@ -697,20 +699,19 @@ export async function monitorWebProvider(
try { try {
await deliverWebReply({ await deliverWebReply({
replyResult, replyResult,
msg, msg: latest,
maxMediaBytes, maxMediaBytes,
replyLogger, replyLogger,
runtime, runtime,
connectionId, connectionId,
}); });
// Track sent message to prevent echo loops
if (replyResult.text) { if (replyResult.text) {
recentlySent.add(replyResult.text); recentlySent.add(replyResult.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
logVerbose( logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
); );
// Keep set bounded - remove oldest if too large
if (recentlySent.size > MAX_RECENT_MESSAGES) { if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value; const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey); if (firstKey) recentlySent.delete(firstKey);
@ -720,7 +721,7 @@ export async function monitorWebProvider(
if (isVerbose()) { if (isVerbose()) {
console.log( console.log(
success( success(
`↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`, `↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
), ),
); );
} else { } else {
@ -732,11 +733,48 @@ export async function monitorWebProvider(
} }
} catch (err) { } catch (err) {
console.error( console.error(
danger( danger(`Failed sending web auto-reply to ${from}: ${String(err)}`),
`Failed sending web auto-reply to ${msg.from}: ${String(err)}`,
),
); );
} }
};
const enqueueBatch = async (msg: WebInboundMsg) => {
const bucket = pendingBatches.get(msg.from) ?? { messages: [] };
bucket.messages.push(msg);
pendingBatches.set(msg.from, bucket);
// Process immediately when queue is free; otherwise wait until it drains.
if (getQueueSize() === 0) {
await processBatch(msg.from);
} else {
bucket.timer =
bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150);
}
};
const listener = await (listenerFactory ?? monitorWebInbox)({
verbose,
onMessage: async (msg) => {
handledMessages += 1;
lastMessageAt = Date.now();
lastInboundMsg = msg;
// Same-phone mode logging retained
if (msg.from === msg.to) {
logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`);
}
// Skip if this is a message we just sent (echo detection)
if (recentlySent.has(msg.body)) {
console.log(`⏭️ Skipping echo: detected recently sent message`);
logVerbose(
`Skipping auto-reply: detected echo (message matches recently sent text)`,
);
recentlySent.delete(msg.body);
return;
}
return enqueueBatch(msg);
}, },
}); });
@ -957,19 +995,24 @@ export async function monitorWebProvider(
preHookResult.context, preHookResult.context,
); );
const replyResult = await (replyResolver ?? getReplyFromConfig)( const hbFrom = lastInboundMsg.from;
const hbTo = lastInboundMsg.to;
const hbComposing = lastInboundMsg.sendComposing;
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{ {
Body: heartbeatPrompt, Body: heartbeatPrompt,
From: lastInboundMsg.from, From: hbFrom,
To: lastInboundMsg.to, To: hbTo,
MessageSid: snapshot.entry?.sessionId, MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined, MediaPath: undefined,
MediaUrl: undefined, MediaUrl: undefined,
MediaType: undefined, MediaType: undefined,
}, },
{ {
onReplyStart: lastInboundMsg.sendComposing, onReplyStart: hbComposing,
}, },
),
); );
if ( if (

63
src/web/ipc.test.ts Normal file
View File

@ -0,0 +1,63 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
vi.mock("../logging.js", () => ({
getChildLogger: () => ({
info: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
}),
}));
const originalHome = process.env.HOME;
afterEach(() => {
process.env.HOME = originalHome;
vi.resetModules();
});
describe("ipc hardening", () => {
it("creates private socket dir and socket with tight perms", async () => {
const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "warelay-home-"));
process.env.HOME = tmpHome;
vi.resetModules();
const ipc = await import("./ipc.js");
const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" });
ipc.startIpcServer(sendHandler);
const dirStat = fs.lstatSync(path.join(tmpHome, ".warelay", "ipc"));
expect(dirStat.mode & 0o777).toBe(0o700);
expect(ipc.isRelayRunning()).toBe(true);
const socketStat = fs.lstatSync(ipc.getSocketPath());
expect(socketStat.isSocket()).toBe(true);
if (typeof process.getuid === "function") {
expect(socketStat.uid).toBe(process.getuid());
}
ipc.stopIpcServer();
expect(ipc.isRelayRunning()).toBe(false);
});
it("refuses to start when IPC dir is a symlink", async () => {
const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "warelay-home-"));
const warelayDir = path.join(tmpHome, ".warelay");
fs.mkdirSync(warelayDir, { recursive: true });
fs.symlinkSync("/tmp", path.join(warelayDir, "ipc"));
process.env.HOME = tmpHome;
vi.resetModules();
const ipc = await import("./ipc.js");
const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" });
expect(() => ipc.startIpcServer(sendHandler)).toThrow(/symlink/i);
});
});

View File

@ -15,7 +15,8 @@ import path from "node:path";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock"); const SOCKET_DIR = path.join(os.homedir(), ".warelay", "ipc");
const SOCKET_PATH = path.join(SOCKET_DIR, "relay.sock");
export interface IpcSendRequest { export interface IpcSendRequest {
type: "send"; type: "send";
@ -44,11 +45,21 @@ let server: net.Server | null = null;
export function startIpcServer(sendHandler: SendHandler): void { export function startIpcServer(sendHandler: SendHandler): void {
const logger = getChildLogger({ module: "ipc-server" }); const logger = getChildLogger({ module: "ipc-server" });
// Clean up stale socket file ensureSocketDir();
try {
assertSafeSocketPath(SOCKET_PATH);
} catch (err) {
logger.error({ error: String(err) }, "Refusing to start IPC server");
throw err;
}
// Clean up stale socket file (only if safe to do so)
try { try {
fs.unlinkSync(SOCKET_PATH); fs.unlinkSync(SOCKET_PATH);
} catch { } catch (err) {
// Ignore if doesn't exist if ((err as NodeJS.ErrnoException).code !== "ENOENT") {
throw err;
}
} }
server = net.createServer((conn) => { server = net.createServer((conn) => {
@ -78,13 +89,13 @@ export function startIpcServer(sendHandler: SendHandler): void {
success: true, success: true,
messageId: result.messageId, messageId: result.messageId,
}; };
conn.write(JSON.stringify(response) + "\n"); conn.write(`${JSON.stringify(response)}\n`);
} catch (err) { } catch (err) {
const response: IpcSendResponse = { const response: IpcSendResponse = {
success: false, success: false,
error: String(err), error: String(err),
}; };
conn.write(JSON.stringify(response) + "\n"); conn.write(`${JSON.stringify(response)}\n`);
} }
} }
} catch (err) { } catch (err) {
@ -93,7 +104,7 @@ export function startIpcServer(sendHandler: SendHandler): void {
success: false, success: false,
error: "Invalid request format", error: "Invalid request format",
}; };
conn.write(JSON.stringify(response) + "\n"); conn.write(`${JSON.stringify(response)}\n`);
} }
} }
}); });
@ -134,6 +145,7 @@ export function stopIpcServer(): void {
*/ */
export function isRelayRunning(): boolean { export function isRelayRunning(): boolean {
try { try {
assertSafeSocketPath(SOCKET_PATH);
fs.accessSync(SOCKET_PATH); fs.accessSync(SOCKET_PATH);
return true; return true;
} catch { } catch {
@ -174,7 +186,7 @@ export async function sendViaIpc(
message, message,
mediaUrl, mediaUrl,
}; };
client.write(JSON.stringify(request) + "\n"); client.write(`${JSON.stringify(request)}\n`);
}); });
client.on("data", (data) => { client.on("data", (data) => {
@ -198,7 +210,7 @@ export async function sendViaIpc(
} }
}); });
client.on("error", (err) => { client.on("error", (_err) => {
if (!resolved) { if (!resolved) {
resolved = true; resolved = true;
clearTimeout(timeout); clearTimeout(timeout);
@ -223,3 +235,43 @@ export async function sendViaIpc(
export function getSocketPath(): string { export function getSocketPath(): string {
return SOCKET_PATH; return SOCKET_PATH;
} }
function ensureSocketDir(): void {
try {
const stat = fs.lstatSync(SOCKET_DIR);
if (stat.isSymbolicLink()) {
throw new Error(`IPC dir is a symlink: ${SOCKET_DIR}`);
}
if (!stat.isDirectory()) {
throw new Error(`IPC dir is not a directory: ${SOCKET_DIR}`);
}
// Enforce private permissions
fs.chmodSync(SOCKET_DIR, 0o700);
if (typeof process.getuid === "function" && stat.uid !== process.getuid()) {
throw new Error(`IPC dir owned by different user: ${SOCKET_DIR}`);
}
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
fs.mkdirSync(SOCKET_DIR, { recursive: true, mode: 0o700 });
return;
}
throw err;
}
}
function assertSafeSocketPath(socketPath: string): void {
try {
const stat = fs.lstatSync(socketPath);
if (stat.isSymbolicLink()) {
throw new Error(`Refusing IPC socket symlink: ${socketPath}`);
}
if (typeof process.getuid === "function" && stat.uid !== process.getuid()) {
throw new Error(`IPC socket owned by different user: ${socketPath}`);
}
} catch (err) {
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
return; // Missing is fine; creation will happen next.
}
throw err;
}
}

View File

@ -35,6 +35,8 @@ describe("web logout", () => {
const credsDir = path.join(tmpDir, ".warelay", "credentials"); const credsDir = path.join(tmpDir, ".warelay", "credentials");
fs.mkdirSync(credsDir, { recursive: true }); fs.mkdirSync(credsDir, { recursive: true });
fs.writeFileSync(path.join(credsDir, "creds.json"), "{}"); fs.writeFileSync(path.join(credsDir, "creds.json"), "{}");
const sessionsPath = path.join(tmpDir, ".warelay", "sessions.json");
fs.writeFileSync(sessionsPath, "{}");
const { logoutWeb, WA_WEB_AUTH_DIR } = await import("./session.js"); const { logoutWeb, WA_WEB_AUTH_DIR } = await import("./session.js");
expect(WA_WEB_AUTH_DIR.startsWith(tmpDir)).toBe(true); expect(WA_WEB_AUTH_DIR.startsWith(tmpDir)).toBe(true);
@ -42,6 +44,7 @@ describe("web logout", () => {
expect(result).toBe(true); expect(result).toBe(true);
expect(fs.existsSync(credsDir)).toBe(false); expect(fs.existsSync(credsDir)).toBe(false);
expect(fs.existsSync(sessionsPath)).toBe(false);
}); });
it("no-ops when nothing to delete", async () => { it("no-ops when nothing to delete", async () => {

View File

@ -12,6 +12,7 @@ import {
} from "@whiskeysockets/baileys"; } from "@whiskeysockets/baileys";
import qrcode from "qrcode-terminal"; import qrcode from "qrcode-terminal";
import { SESSION_STORE_DEFAULT } from "../config/sessions.js";
import { danger, info, success } from "../globals.js"; import { danger, info, success } from "../globals.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
@ -160,6 +161,8 @@ export async function logoutWeb(runtime: RuntimeEnv = defaultRuntime) {
return false; return false;
} }
await fs.rm(WA_WEB_AUTH_DIR, { recursive: true, force: true }); await fs.rm(WA_WEB_AUTH_DIR, { recursive: true, force: true });
// Also drop session store to clear lingering per-sender state after logout.
await fs.rm(SESSION_STORE_DEFAULT, { force: true });
runtime.log( runtime.log(
success( success(
"Cleared WhatsApp Web credentials. Run `warelay login --provider web` to relink.", "Cleared WhatsApp Web credentials. Run `warelay login --provider web` to relink.",