From 5b54d4de7a992abb2bbc742226af14c326ad6695 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 07:54:13 +0000 Subject: [PATCH 01/11] feat(web): batch inbound messages --- src/web/auto-reply.test.ts | 74 +++++++++ src/web/auto-reply.ts | 311 +++++++++++++++++++++---------------- 2 files changed, 250 insertions(+), 135 deletions(-) diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 059c6e570..945df6212 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -25,6 +25,8 @@ import { stripHeartbeatToken, } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; +import * as commandQueue from "../process/command-queue.js"; +import { getQueueSize } from "../process/command-queue.js"; const makeSessionStore = async ( entries: Record = {}, @@ -573,6 +575,78 @@ describe("web auto-reply", () => { } }); + it("batches inbound messages while queue is busy and preserves timestamps", async () => { + vi.useFakeTimers(); + const originalMax = process.getMaxListeners(); + process.setMaxListeners?.(1); // force low to confirm bump + + const sendMedia = vi.fn(); + const reply = vi.fn().mockResolvedValue(undefined); + const sendComposing = vi.fn(); + const resolver = vi.fn().mockResolvedValue({ text: "batched" }); + + let capturedOnMessage: + | ((msg: import("./inbound.js").WebInboundMessage) => Promise) + | undefined; + const listenerFactory = async (opts: { + onMessage: ( + msg: import("./inbound.js").WebInboundMessage, + ) => Promise; + }) => { + capturedOnMessage = opts.onMessage; + return { close: vi.fn() }; + }; + + // Queue starts busy, then frees after one polling tick. + let queueBusy = true; + const queueSpy = vi + .spyOn(commandQueue, "getQueueSize") + .mockImplementation(() => (queueBusy ? 1 : 0)); + + setLoadConfigMock(() => ({ inbound: { timestampPrefix: "UTC" } })); + + await monitorWebProvider(false, listenerFactory, false, resolver); + expect(capturedOnMessage).toBeDefined(); + + // Two messages from the same sender with fixed timestamps + await capturedOnMessage?.({ + body: "first", + from: "+1", + to: "+2", + id: "m1", + timestamp: 1735689600000, // Jan 1 2025 00:00:00 UTC + sendComposing, + reply, + sendMedia, + }); + await capturedOnMessage?.({ + body: "second", + from: "+1", + to: "+2", + id: "m2", + timestamp: 1735693200000, // Jan 1 2025 01:00:00 UTC + sendComposing, + reply, + sendMedia, + }); + + // Let the queued batch flush once the queue is free + queueBusy = false; + vi.advanceTimersByTime(200); + + expect(resolver).toHaveBeenCalledTimes(1); + const args = resolver.mock.calls[0][0]; + expect(args.Body).toContain("[Jan 1 00:00] [warelay] first"); + expect(args.Body).toContain("[Jan 1 01:00] [warelay] second"); + + // Max listeners bumped to avoid warnings in multi-instance test runs + expect(process.getMaxListeners?.()).toBeGreaterThanOrEqual(50); + + queueSpy.mockRestore(); + process.setMaxListeners?.(originalMax); + vi.useRealTimers(); + }); + it("falls back to text when media send fails", async () => { const sendMedia = vi.fn().mockRejectedValue(new Error("boom")); const reply = vi.fn().mockResolvedValue(undefined); diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index cc965dab0..19634a3f1 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -515,6 +515,13 @@ export async function monitorWebProvider( }), ); + // Avoid noisy MaxListenersExceeded warnings in test environments where + // multiple relay instances may be constructed. + const currentMaxListeners = process.getMaxListeners?.() ?? 10; + if (process.setMaxListeners && currentMaxListeners < 50) { + process.setMaxListeners(50); + } + let sigintStop = false; const handleSigint = () => { sigintStop = true; @@ -544,35 +551,179 @@ export async function monitorWebProvider( const MESSAGE_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes without any messages const WATCHDOG_CHECK_MS = 60 * 1000; // Check every minute + // Batch inbound messages while command queue is busy, then send one + // combined prompt with per-message timestamps (inbound-only behavior). + type PendingBatch = { messages: WebInboundMsg[]; timer?: NodeJS.Timeout }; + const pendingBatches = new Map(); + + const formatTimestamp = (ts?: number) => { + const tsCfg = cfg.inbound?.timestampPrefix; + const tsEnabled = tsCfg !== false; // default true + if (!tsEnabled) return ""; + const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; + const date = ts ? new Date(ts) : new Date(); + try { + return `[${date.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${date.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `; + } catch { + return `[${date.toISOString().slice(5, 16).replace("T", " ")}] `; + } + }; + + const buildLine = (msg: WebInboundMsg) => { + // Build message prefix: explicit config > default based on allowFrom + let messagePrefix = cfg.inbound?.messagePrefix; + if (messagePrefix === undefined) { + const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; + messagePrefix = hasAllowFrom ? "" : "[warelay]"; + } + const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; + return `${formatTimestamp(msg.timestamp)}${prefixStr}${msg.body}`; + }; + + const processBatch = async (from: string) => { + const batch = pendingBatches.get(from); + if (!batch || batch.messages.length === 0) return; + if (getQueueSize() > 0) { + // Wait until command queue is free to run the combined prompt. + batch.timer = setTimeout(() => void processBatch(from), 150); + return; + } + pendingBatches.delete(from); + + const messages = batch.messages; + const latest = messages[messages.length - 1]; + const combinedBody = messages.map(buildLine).join("\n"); + + // Echo detection uses combined body so we don't respond twice. + if (recentlySent.has(combinedBody)) { + logVerbose(`Skipping auto-reply: detected echo for combined batch`); + recentlySent.delete(combinedBody); + return; + } + + const correlationId = latest.id ?? newConnectionId(); + replyLogger.info( + { + connectionId, + correlationId, + from, + to: latest.to, + body: combinedBody, + mediaType: latest.mediaType ?? null, + mediaPath: latest.mediaPath ?? null, + batchSize: messages.length, + }, + "inbound web message (batched)", + ); + + const tsDisplay = latest.timestamp + ? new Date(latest.timestamp).toISOString() + : new Date().toISOString(); + console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`); + + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: combinedBody, + From: latest.from, + To: latest.to, + MessageSid: latest.id, + MediaPath: latest.mediaPath, + MediaUrl: latest.mediaUrl, + MediaType: latest.mediaType, + }, + { + onReplyStart: latest.sendComposing, + }, + ); + + if ( + !replyResult || + (!replyResult.text && + !replyResult.mediaUrl && + !replyResult.mediaUrls?.length) + ) { + logVerbose("Skipping auto-reply: no text/media returned from resolver"); + return; + } + + // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) + const responsePrefix = cfg.inbound?.responsePrefix; + if ( + responsePrefix && + replyResult.text && + replyResult.text.trim() !== HEARTBEAT_TOKEN + ) { + if (!replyResult.text.startsWith(responsePrefix)) { + replyResult.text = `${responsePrefix} ${replyResult.text}`; + } + } + + try { + await deliverWebReply({ + replyResult, + msg: latest, + maxMediaBytes, + replyLogger, + runtime, + connectionId, + }); + + if (replyResult.text) { + recentlySent.add(replyResult.text); + recentlySent.add(combinedBody); // Prevent echo on the batch text itself + logVerbose( + `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, + ); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } + } + + if (isVerbose()) { + console.log( + success( + `↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`, + ), + ); + } else { + console.log( + success( + `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`, + ), + ); + } + } catch (err) { + console.error( + danger( + `Failed sending web auto-reply to ${from}: ${String(err)}`, + ), + ); + } + }; + + const enqueueBatch = async (msg: WebInboundMsg) => { + const bucket = pendingBatches.get(msg.from) ?? { messages: [] }; + bucket.messages.push(msg); + pendingBatches.set(msg.from, bucket); + + // Process immediately when queue is free; otherwise wait until it drains. + if (getQueueSize() === 0) { + await processBatch(msg.from); + } else { + bucket.timer = bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150); + } + }; + const listener = await (listenerFactory ?? monitorWebInbox)({ verbose, onMessage: async (msg) => { - // Also add IPC-sent messages to echo detection - // (this is handled below in the IPC sendHandler) handledMessages += 1; lastMessageAt = Date.now(); - const ts = msg.timestamp - ? new Date(msg.timestamp).toISOString() - : new Date().toISOString(); - const correlationId = msg.id ?? newConnectionId(); - replyLogger.info( - { - connectionId, - correlationId, - from: msg.from, - to: msg.to, - body: msg.body, - mediaType: msg.mediaType ?? null, - mediaPath: msg.mediaPath ?? null, - }, - "inbound web message", - ); + lastInboundMsg = msg; - console.log(`\n[${ts}] ${msg.from} -> ${msg.to}: ${msg.body}`); - - // Detect same-phone mode (self-messaging) - const isSamePhoneMode = msg.from === msg.to; - if (isSamePhoneMode) { + // Same-phone mode logging retained + if (msg.from === msg.to) { logVerbose(`📱 Same-phone mode detected (from === to: ${msg.from})`); } @@ -582,121 +733,11 @@ export async function monitorWebProvider( logVerbose( `Skipping auto-reply: detected echo (message matches recently sent text)`, ); - recentlySent.delete(msg.body); // Remove from set to allow future identical messages + recentlySent.delete(msg.body); return; } - logVerbose( - `Echo check: message not in recent set (size: ${recentlySent.size})`, - ); - - lastInboundMsg = msg; - - // Build timestamp prefix (default: enabled with UTC) - // Can be: true (UTC), false (disabled), or "America/New_York" (custom timezone) - let timestampStr = ""; - const tsCfg = cfg.inbound?.timestampPrefix; - const tsEnabled = tsCfg !== false; // default true - if (tsEnabled) { - const tz = typeof tsCfg === "string" ? tsCfg : "UTC"; - const now = new Date(); - try { - // Format: "Nov 29 06:30" - compact but informative - timestampStr = `[${now.toLocaleDateString("en-US", { month: "short", day: "numeric", timeZone: tz })} ${now.toLocaleTimeString("en-US", { hour: "2-digit", minute: "2-digit", hour12: false, timeZone: tz })}] `; - } catch { - // Fallback to UTC if timezone invalid - timestampStr = `[${now.toISOString().slice(5, 16).replace("T", " ")}] `; - } - } - - // Build message prefix: explicit config > default based on allowFrom - // If allowFrom is configured, user likely has a specific setup - no default prefix - // If no allowFrom, add "[warelay]" so AI knows it's coming through warelay - let messagePrefix = cfg.inbound?.messagePrefix; - if (messagePrefix === undefined) { - const hasAllowFrom = (cfg.inbound?.allowFrom?.length ?? 0) > 0; - messagePrefix = hasAllowFrom ? "" : "[warelay]"; - } - const prefixStr = messagePrefix ? `${messagePrefix} ` : ""; - const bodyForCommand = `${timestampStr}${prefixStr}${msg.body}`; - - const replyResult = await (replyResolver ?? getReplyFromConfig)( - { - Body: bodyForCommand, - From: msg.from, - To: msg.to, - MessageSid: msg.id, - MediaPath: msg.mediaPath, - MediaUrl: msg.mediaUrl, - MediaType: msg.mediaType, - }, - { - onReplyStart: msg.sendComposing, - }, - ); - if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) - ) { - logVerbose( - "Skipping auto-reply: no text/media returned from resolver", - ); - return; - } - // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) - const responsePrefix = cfg.inbound?.responsePrefix; - if (responsePrefix && replyResult.text && replyResult.text.trim() !== HEARTBEAT_TOKEN) { - // Only add prefix if not already present - if (!replyResult.text.startsWith(responsePrefix)) { - replyResult.text = `${responsePrefix} ${replyResult.text}`; - } - } - - try { - await deliverWebReply({ - replyResult, - msg, - maxMediaBytes, - replyLogger, - runtime, - connectionId, - }); - - // Track sent message to prevent echo loops - if (replyResult.text) { - recentlySent.add(replyResult.text); - logVerbose( - `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, - ); - // Keep set bounded - remove oldest if too large - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); - } - } - - if (isVerbose()) { - console.log( - success( - `↩️ Auto-replied to ${msg.from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""})`, - ), - ); - } else { - console.log( - success( - `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`, - ), - ); - } - } catch (err) { - console.error( - danger( - `Failed sending web auto-reply to ${msg.from}: ${String(err)}`, - ), - ); - } + return enqueueBatch(msg); }, }); From 52c311e47f189f0482defce911312819668fdc41 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 07:54:49 +0000 Subject: [PATCH 02/11] chore: bump version to 1.3.0 --- CHANGELOG.md | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eff96cb5b..33fb74ca5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 1.2.3 — Unreleased +## 1.3.0 — Unreleased ### Bug Fixes - **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. diff --git a/package.json b/package.json index 18f6e340e..63f3d036e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "warelay", - "version": "1.2.2", + "version": "1.3.0", "description": "WhatsApp relay CLI (send, monitor, webhook, auto-reply) using Twilio", "type": "module", "main": "dist/index.js", From f31e89d5af2ec84357a9e23ab748dc36c9fe6133 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 10:42:27 +0000 Subject: [PATCH 03/11] Agents: add pluggable CLIs Co-authored-by: RealSid08 --- docs/agent.md | 77 ++++++++++++++++ src/agents/claude.ts | 67 ++++++++++++++ src/agents/codex.ts | 66 ++++++++++++++ src/agents/index.ts | 19 ++++ src/agents/opencode.ts | 54 +++++++++++ src/agents/pi.ts | 65 ++++++++++++++ src/agents/types.ts | 42 +++++++++ src/auto-reply/claude.ts | 3 + src/auto-reply/command-reply.test.ts | 29 +++--- src/auto-reply/command-reply.ts | 129 +++++++++------------------ src/auto-reply/opencode.ts | 105 ++++++++++++++++++++++ src/auto-reply/reply.ts | 4 +- src/config/config.ts | 47 ++++++---- src/index.core.test.ts | 6 +- src/web/auto-reply.ts | 61 +++++++------ 15 files changed, 624 insertions(+), 150 deletions(-) create mode 100644 docs/agent.md create mode 100644 src/agents/claude.ts create mode 100644 src/agents/codex.ts create mode 100644 src/agents/index.ts create mode 100644 src/agents/opencode.ts create mode 100644 src/agents/pi.ts create mode 100644 src/agents/types.ts create mode 100644 src/auto-reply/opencode.ts diff --git a/docs/agent.md b/docs/agent.md new file mode 100644 index 000000000..eacd853b5 --- /dev/null +++ b/docs/agent.md @@ -0,0 +1,77 @@ +# Agent Abstraction Refactor Plan + +Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact. + +## Overview +- Introduce a pluggable agent layer (`src/agents/*`), selected by config. +- Normalize config (`agent` block) and remove `claudeOutputFormat` legacy knobs. +- Provide per-agent argv builders and output parsers (including NDJSON streams). +- Preserve MEDIA-token handling and shared queue/heartbeat behavior. + +## Configuration +- New shape (no backward compat): + ```json5 + inbound: { + reply: { + mode: "command", + agent: { + kind: "claude" | "opencode" | "pi" | "codex", + format?: "text" | "json", + identityPrefix?: string + }, + command: ["claude", "{{Body}}"], + cwd?: string, + session?: { ... }, + timeoutSeconds?: number, + bodyPrefix?: string, + mediaUrl?: string, + mediaMaxMb?: number, + typingIntervalSeconds?: number, + heartbeatMinutes?: number + } + } + ``` +- Validation moves to `config.ts` (new `AgentKind`/`AgentConfig` types). +- If `agent` is missing → config error. + +## Agent modules +- `src/agents/types.ts` – `AgentKind`, `AgentSpec`: + - `buildArgs(argv: string[], body: string, ctx: { sessionId?, isNewSession?, sendSystemOnce?, systemSent?, identityPrefix? }): string[]` + - `parse(stdout: string): { text?: string; mediaUrls?: string[]; meta?: AgentMeta }` +- `src/agents/claude.ts` – current flag injection (`--output-format`, `-p`), identity prepend. +- `src/agents/opencode.ts` – reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix. +- `src/agents/pi.ts` – parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags. +- `src/agents/codex.ts` – parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only. +- Shared MEDIA extraction stays in `media/parse.ts`. + +## Command runner changes +- `runCommandReply`: + - Resolve agent spec from config. + - Apply `buildArgs` (handles identity prepend and session args per agent). + - Run command; send stdout to `spec.parse` → `text`, `mediaUrls`, `meta` (stored as `agentMeta`). + - Remove `claudeMeta` naming; tests updated to `agentMeta`. + +## Sessions +- Session arg defaults become agent-specific (Claude: `--resume/--session-id`; Opencode/Pi/Codex: `--session`). +- Still overridable via `sessionArgNew/sessionArgResume` in config. + +## Tests +- Update existing tests to new config (no `claudeOutputFormat`). +- Add fixtures: + - Opencode NDJSON sample (from PR #5) → parsed text + meta. + - Codex NDJSON sample (captured: thread/turn/item/usage) → parsed text. + - Pi NDJSON sample (AssistantMessageEvent) → parsed text. +- Ensure MEDIA token parsing works on agent text output. + +## Docs +- README: rename “Claude-aware” → “Multi-agent (Claude, Codex, Pi, Opencode)”. +- New short guide per agent (Opencode doc from PR #5; add Codex/Pi snippets). +- Mention identityPrefix override and session arg differences. + +## Migration +- Breaking change: configs must specify `agent`. Remove old `claudeOutputFormat` keys. +- Provide migration note in CHANGELOG 1.3.x. + +## Out of scope +- No media binary support; still relies on MEDIA tokens in text. +- No UI changes; WhatsApp/Twilio plumbing unchanged. diff --git a/src/agents/claude.ts b/src/agents/claude.ts new file mode 100644 index 000000000..80cd767bb --- /dev/null +++ b/src/agents/claude.ts @@ -0,0 +1,67 @@ +import path from "node:path"; + +import { + CLAUDE_BIN, + CLAUDE_IDENTITY_PREFIX, + parseClaudeJson, + summarizeClaudeMetadata, + type ClaudeJsonParseResult, +} from "../auto-reply/claude.js"; +import type { + AgentMeta, + AgentParseResult, + AgentSpec, + BuildArgsContext, +} from "./types.js"; + +function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined { + if (!parsed?.parsed) return undefined; + const summary = summarizeClaudeMetadata(parsed.parsed); + return summary ? { extra: { summary } } : undefined; +} + +export const claudeSpec: AgentSpec = { + kind: "claude", + isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN, + buildArgs: (ctx) => { + // Work off a split of "before body" and "after body" so we don't lose the + // body index when inserting flags. + const argv = [...ctx.argv]; + const body = argv[ctx.bodyIndex] ?? ""; + const beforeBody = argv.slice(0, ctx.bodyIndex); + const afterBody = argv.slice(ctx.bodyIndex + 1); + + const wantsOutputFormat = typeof ctx.format === "string"; + if (wantsOutputFormat) { + const hasOutputFormat = argv.some( + (part) => part === "--output-format" || part.startsWith("--output-format="), + ); + if (!hasOutputFormat) { + beforeBody.push("--output-format", ctx.format!); + } + } + + const hasPrintFlag = argv.some((part) => part === "-p" || part === "--print"); + if (!hasPrintFlag) { + beforeBody.push("-p"); + } + + const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent); + const bodyWithIdentity = + shouldPrependIdentity && body + ? [ctx.identityPrefix ?? CLAUDE_IDENTITY_PREFIX, body] + .filter(Boolean) + .join("\n\n") + : body; + + return [...beforeBody, bodyWithIdentity, ...afterBody]; + }, + parseOutput: (rawStdout) => { + const parsed = parseClaudeJson(rawStdout); + const text = parsed?.text ?? rawStdout.trim(); + return { + text: text?.trim(), + meta: toMeta(parsed), + }; + }, +}; diff --git a/src/agents/codex.ts b/src/agents/codex.ts new file mode 100644 index 000000000..3b2066d05 --- /dev/null +++ b/src/agents/codex.ts @@ -0,0 +1,66 @@ +import path from "node:path"; + +import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; + +function parseCodexJson(raw: string): AgentParseResult { + const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); + let text: string | undefined; + let meta: AgentMeta | undefined; + + for (const line of lines) { + try { + const ev = JSON.parse(line) as { type?: string; item?: { type?: string; text?: string }; usage?: unknown }; + if (ev.type === "item.completed" && ev.item?.type === "agent_message" && typeof ev.item.text === "string") { + text = ev.item.text; + } + if (ev.type === "turn.completed" && ev.usage && typeof ev.usage === "object") { + const u = ev.usage as { + input_tokens?: number; + cached_input_tokens?: number; + output_tokens?: number; + }; + meta = { + usage: { + input: u.input_tokens, + output: u.output_tokens, + cacheRead: u.cached_input_tokens, + total: + (u.input_tokens ?? 0) + + (u.output_tokens ?? 0) + + (u.cached_input_tokens ?? 0), + }, + }; + } + } catch { + // ignore + } + } + + return { text: text?.trim(), meta }; +} + +export const codexSpec: AgentSpec = { + kind: "codex", + isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "codex", + buildArgs: (ctx) => { + const argv = [...ctx.argv]; + const hasExec = argv.length > 0 && argv[1] === "exec"; + if (!hasExec) { + argv.splice(1, 0, "exec"); + } + // Ensure JSON output + if (!argv.includes("--json")) { + argv.splice(argv.length - 1, 0, "--json"); + } + // Safety defaults + if (!argv.includes("--skip-git-repo-check")) { + argv.splice(argv.length - 1, 0, "--skip-git-repo-check"); + } + if (!argv.some((p) => p === "--sandbox" || p.startsWith("--sandbox="))) { + argv.splice(argv.length - 1, 0, "--sandbox", "read-only"); + } + return argv; + }, + parseOutput: parseCodexJson, +}; + diff --git a/src/agents/index.ts b/src/agents/index.ts new file mode 100644 index 000000000..508b3811c --- /dev/null +++ b/src/agents/index.ts @@ -0,0 +1,19 @@ +import { claudeSpec } from "./claude.js"; +import { codexSpec } from "./codex.js"; +import { opencodeSpec } from "./opencode.js"; +import { piSpec } from "./pi.js"; +import type { AgentKind, AgentSpec } from "./types.js"; + +const specs: Record = { + claude: claudeSpec, + codex: codexSpec, + opencode: opencodeSpec, + pi: piSpec, +}; + +export function getAgentSpec(kind: AgentKind): AgentSpec { + return specs[kind]; +} + +export { AgentKind, AgentMeta, AgentParseResult } from "./types.js"; + diff --git a/src/agents/opencode.ts b/src/agents/opencode.ts new file mode 100644 index 000000000..a19bfae7b --- /dev/null +++ b/src/agents/opencode.ts @@ -0,0 +1,54 @@ +import path from "node:path"; + +import { + OPENCODE_BIN, + OPENCODE_IDENTITY_PREFIX, + parseOpencodeJson, + summarizeOpencodeMetadata, +} from "../auto-reply/opencode.js"; +import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; + +function toMeta(parsed: ReturnType): AgentMeta | undefined { + const summary = summarizeOpencodeMetadata(parsed.meta); + return summary ? { extra: { summary } } : undefined; +} + +export const opencodeSpec: AgentSpec = { + kind: "opencode", + isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN, + buildArgs: (ctx) => { + const argv = [...ctx.argv]; + const wantsJson = ctx.format === "json"; + + // Ensure format json for parsing + if (wantsJson) { + const hasFormat = argv.some( + (part) => part === "--format" || part.startsWith("--format="), + ); + if (!hasFormat) { + const insertBeforeBody = Math.max(argv.length - 1, 0); + argv.splice(insertBeforeBody, 0, "--format", "json"); + } + } + + // Session args default to --session + // Identity prefix + const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent); + if (shouldPrependIdentity && argv[ctx.bodyIndex]) { + const existingBody = argv[ctx.bodyIndex]; + argv[ctx.bodyIndex] = [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, existingBody] + .filter(Boolean) + .join("\n\n"); + } + + return argv; + }, + parseOutput: (rawStdout) => { + const parsed = parseOpencodeJson(rawStdout); + const text = parsed.text ?? rawStdout.trim(); + return { + text: text?.trim(), + meta: toMeta(parsed), + }; + }, +}; diff --git a/src/agents/pi.ts b/src/agents/pi.ts new file mode 100644 index 000000000..18efc0531 --- /dev/null +++ b/src/agents/pi.ts @@ -0,0 +1,65 @@ +import path from "node:path"; + +import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; + +type PiAssistantMessage = { + role?: string; + content?: Array<{ type?: string; text?: string }>; + usage?: { input?: number; output?: number }; + model?: string; + provider?: string; + stopReason?: string; +}; + +function parsePiJson(raw: string): AgentParseResult { + const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); + let lastMessage: PiAssistantMessage | undefined; + for (const line of lines) { + try { + const ev = JSON.parse(line) as { type?: string; message?: PiAssistantMessage }; + if (ev.type === "message_end" && ev.message?.role === "assistant") { + lastMessage = ev.message; + } + } catch { + // ignore + } + } + const text = + lastMessage?.content + ?.filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n") + ?.trim() ?? undefined; + const meta: AgentMeta | undefined = lastMessage + ? { + model: lastMessage.model, + provider: lastMessage.provider, + stopReason: lastMessage.stopReason, + usage: lastMessage.usage, + } + : undefined; + return { text, meta }; +} + +export const piSpec: AgentSpec = { + kind: "pi", + isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi", + buildArgs: (ctx) => { + const argv = [...ctx.argv]; + // Non-interactive print + JSON + if (!argv.includes("-p") && !argv.includes("--print")) { + argv.splice(argv.length - 1, 0, "-p"); + } + if (ctx.format === "json" && !argv.includes("--mode") && !argv.some((a) => a === "--mode")) { + argv.splice(argv.length - 1, 0, "--mode", "json"); + } + // Session defaults + // Identity prefix optional; Pi usually doesn't need, but allow + if (!(ctx.sendSystemOnce && ctx.systemSent) && argv[ctx.bodyIndex]) { + const existingBody = argv[ctx.bodyIndex]; + argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody].filter(Boolean).join("\n\n"); + } + return argv; + }, + parseOutput: parsePiJson, +}; diff --git a/src/agents/types.ts b/src/agents/types.ts new file mode 100644 index 000000000..76b6ba1d2 --- /dev/null +++ b/src/agents/types.ts @@ -0,0 +1,42 @@ +export type AgentKind = "claude" | "opencode" | "pi" | "codex"; + +export type AgentMeta = { + model?: string; + provider?: string; + stopReason?: string; + usage?: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + total?: number; + }; + extra?: Record; +}; + +export type AgentParseResult = { + text?: string; + mediaUrls?: string[]; + meta?: AgentMeta; +}; + +export type BuildArgsContext = { + argv: string[]; + bodyIndex: number; // index of prompt/body argument in argv + isNewSession: boolean; + sessionId?: string; + sendSystemOnce: boolean; + systemSent: boolean; + identityPrefix?: string; + format?: "text" | "json"; + sessionArgNew?: string[]; + sessionArgResume?: string[]; +}; + +export interface AgentSpec { + kind: AgentKind; + isInvocation: (argv: string[]) => boolean; + buildArgs: (ctx: BuildArgsContext) => string[]; + parseOutput: (rawStdout: string) => AgentParseResult; +} + diff --git a/src/auto-reply/claude.ts b/src/auto-reply/claude.ts index b1a6518f4..ca3cfa9df 100644 --- a/src/auto-reply/claude.ts +++ b/src/auto-reply/claude.ts @@ -160,3 +160,6 @@ export function parseClaudeJsonText(raw: string): string | undefined { const parsed = parseClaudeJson(raw); return parsed?.text; } + +// Re-export from command-reply for backwards compatibility +export { summarizeClaudeMetadata } from "./command-reply.js"; diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index db1e64767..39ca340b8 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -70,7 +70,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, @@ -98,7 +98,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: true, @@ -121,7 +121,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: true, @@ -144,7 +144,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: true, @@ -167,6 +167,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["cli", "{{Body}}"], + agent: { kind: "claude" }, session: { sessionArgNew: ["--new", "{{SessionId}}"], sessionArgResume: ["--resume", "{{SessionId}}"], @@ -192,7 +193,7 @@ describe("runCommandReply", () => { throw { stdout: "partial output here", killed: true, signal: "SIGKILL" }; }); const { payload, meta } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"] }, + reply: { mode: "command", command: ["echo", "hi"], agent: { kind: "claude" } }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -213,7 +214,7 @@ describe("runCommandReply", () => { throw { stdout: "", killed: true, signal: "SIGKILL" }; }); const { payload } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work" }, + reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work", agent: { kind: "claude" } }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -235,7 +236,7 @@ describe("runCommandReply", () => { stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`, }); const { payload } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1 }, + reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1, agent: { kind: "claude" } }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -259,7 +260,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, @@ -271,14 +272,14 @@ describe("runCommandReply", () => { commandRunner: runner, enqueue: enqueueImmediate, }); - expect(meta.claudeMeta).toContain("duration=50ms"); - expect(meta.claudeMeta).toContain("tool_calls=1"); + expect(meta.agentMeta?.extra?.summary).toContain("duration=50ms"); + expect(meta.agentMeta?.extra?.summary).toContain("tool_calls=1"); }); it("captures queue wait metrics in meta", async () => { const runner = makeRunner({ stdout: "ok" }); const { meta } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "{{Body}}"] }, + reply: { mode: "command", command: ["echo", "{{Body}}"], agent: { kind: "claude" } }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -303,7 +304,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, @@ -328,7 +329,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, @@ -353,7 +354,7 @@ describe("runCommandReply", () => { reply: { mode: "command", command: ["claude", "{{Body}}"], - claudeOutputFormat: "json", + agent: { kind: "claude", format: "json" }, }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index 71e564948..a897520ba 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -1,18 +1,14 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { getAgentSpec } from "../agents/index.js"; +import type { AgentMeta } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; import { logError } from "../logger.js"; import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; -import { - CLAUDE_BIN, - CLAUDE_IDENTITY_PREFIX, - type ClaudeJsonParseResult, - parseClaudeJson, -} from "./claude.js"; import { applyTemplate, type TemplateContext } from "./templating.js"; import type { ReplyPayload } from "./types.js"; @@ -42,7 +38,7 @@ export type CommandReplyMeta = { exitCode?: number | null; signal?: string | null; killed?: boolean; - claudeMeta?: string; + agentMeta?: AgentMeta; }; export type CommandReplyResult = { @@ -119,6 +115,8 @@ export async function runCommandReply( if (!reply.command?.length) { throw new Error("reply.command is required for mode=command"); } + const agentCfg = reply.agent ?? { kind: "claude" }; + const agent = getAgentSpec(agentCfg.kind as any); let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); const templatePrefix = @@ -129,66 +127,47 @@ export async function runCommandReply( argv = [argv[0], templatePrefix, ...argv.slice(1)]; } - // Ensure Claude commands can emit plain text by forcing --output-format when configured. - if ( - reply.claudeOutputFormat && - argv.length > 0 && - path.basename(argv[0]) === CLAUDE_BIN - ) { - const hasOutputFormat = argv.some( - (part) => - part === "--output-format" || part.startsWith("--output-format="), - ); - const insertBeforeBody = Math.max(argv.length - 1, 0); - if (!hasOutputFormat) { - argv = [ - ...argv.slice(0, insertBeforeBody), - "--output-format", - reply.claudeOutputFormat, - ...argv.slice(insertBeforeBody), - ]; - } - const hasPrintFlag = argv.some( - (part) => part === "-p" || part === "--print", - ); - if (!hasPrintFlag) { - const insertIdx = Math.max(argv.length - 1, 0); - argv = [...argv.slice(0, insertIdx), "-p", ...argv.slice(insertIdx)]; - } - } + // Default body index is last arg + let bodyIndex = Math.max(argv.length - 1, 0); - // Inject session args if configured (use resume for existing, session-id for new) + // Session args prepared (templated) and injected generically if (reply.session) { + const defaultNew = + agentCfg.kind === "claude" + ? ["--session-id", "{{SessionId}}"] + : ["--session", "{{SessionId}}"]; + const defaultResume = + agentCfg.kind === "claude" + ? ["--resume", "{{SessionId}}"] + : ["--session", "{{SessionId}}"]; const sessionArgList = ( isNewSession - ? (reply.session.sessionArgNew ?? ["--session-id", "{{SessionId}}"]) - : (reply.session.sessionArgResume ?? ["--resume", "{{SessionId}}"]) - ).map((part) => applyTemplate(part, templatingCtx)); + ? reply.session.sessionArgNew ?? defaultNew + : reply.session.sessionArgResume ?? defaultResume + ).map((p) => applyTemplate(p, templatingCtx)); if (sessionArgList.length) { const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true; const insertAt = insertBeforeBody && argv.length > 1 ? argv.length - 1 : argv.length; - argv = [ - ...argv.slice(0, insertAt), - ...sessionArgList, - ...argv.slice(insertAt), - ]; + argv = [...argv.slice(0, insertAt), ...sessionArgList, ...argv.slice(insertAt)]; + bodyIndex = Math.max(argv.length - 1, 0); } } - let finalArgv = argv; - const isClaudeInvocation = - finalArgv.length > 0 && path.basename(finalArgv[0]) === CLAUDE_BIN; - const shouldPrependIdentity = - isClaudeInvocation && !(sendSystemOnce && systemSent); - if (shouldPrependIdentity && finalArgv.length > 0) { - const bodyIdx = finalArgv.length - 1; - const existingBody = finalArgv[bodyIdx] ?? ""; - finalArgv = [ - ...finalArgv.slice(0, bodyIdx), - [CLAUDE_IDENTITY_PREFIX, existingBody].filter(Boolean).join("\n\n"), - ]; - } + const shouldApplyAgent = agent.isInvocation(argv); + const finalArgv = shouldApplyAgent + ? agent.buildArgs({ + argv, + bodyIndex, + isNewSession, + sessionId: templatingCtx.SessionId, + sendSystemOnce, + systemSent, + identityPrefix: agentCfg.identityPrefix, + format: agentCfg.format, + }) + : argv; + logVerbose( `Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, ); @@ -217,28 +196,12 @@ export async function runCommandReply( if (stderr?.trim()) { logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); } - let parsed: ClaudeJsonParseResult | undefined; - if ( - trimmed && - (reply.claudeOutputFormat === "json" || isClaudeInvocation) - ) { - parsed = parseClaudeJson(trimmed); - if (parsed?.parsed && isVerbose()) { - const summary = summarizeClaudeMetadata(parsed.parsed); - if (summary) logVerbose(`Claude JSON meta: ${summary}`); - logVerbose( - `Claude JSON raw: ${JSON.stringify(parsed.parsed, null, 2)}`, - ); - } - if (typeof parsed?.text === "string") { - logVerbose( - `Claude JSON parsed -> ${parsed.text.slice(0, 120)}${parsed.text.length > 120 ? "…" : ""}`, - ); - trimmed = parsed.text.trim(); - } else { - logVerbose("Claude JSON parse failed; returning raw stdout"); - } + + const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; + if (parsed && parsed.text !== undefined) { + trimmed = parsed.text.trim(); } + const { text: cleanedText, mediaUrls: mediaFound } = splitMediaFromOutput(trimmed); trimmed = cleanedText; @@ -249,7 +212,7 @@ export async function runCommandReply( logVerbose("No MEDIA token extracted from final text"); } if (!trimmed && !mediaFromCommand) { - const meta = parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined; + const meta = parsed?.meta?.extra?.summary ?? undefined; trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; logVerbose("No text/media produced; injecting fallback notice to user"); } @@ -271,9 +234,7 @@ export async function runCommandReply( exitCode: code, signal, killed, - claudeMeta: parsed - ? summarizeClaudeMetadata(parsed.parsed) - : undefined, + agentMeta: parsed?.meta, }, }; } @@ -291,9 +252,7 @@ export async function runCommandReply( exitCode: code, signal, killed, - claudeMeta: parsed - ? summarizeClaudeMetadata(parsed.parsed) - : undefined, + agentMeta: parsed?.meta, }, }; } @@ -341,7 +300,7 @@ export async function runCommandReply( exitCode: code, signal, killed, - claudeMeta: parsed ? summarizeClaudeMetadata(parsed.parsed) : undefined, + agentMeta: parsed?.meta, }; if (isVerbose()) { logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`); diff --git a/src/auto-reply/opencode.ts b/src/auto-reply/opencode.ts new file mode 100644 index 000000000..19b16055d --- /dev/null +++ b/src/auto-reply/opencode.ts @@ -0,0 +1,105 @@ +// Helpers specific to Opencode CLI output/argv handling. + +// Preferred binary name for Opencode CLI invocations. +export const OPENCODE_BIN = "opencode"; + +export const OPENCODE_IDENTITY_PREFIX = + "You are Openclawd running on the user's Mac via warelay. Your scratchpad is /Users/steipete/openclawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK."; + +export type OpencodeJsonParseResult = { + text?: string; + parsed: unknown[]; + valid: boolean; + meta?: { + durationMs?: number; + cost?: number; + tokens?: { + input?: number; + output?: number; + }; + }; +}; + +export function parseOpencodeJson(raw: string): OpencodeJsonParseResult { + const lines = raw.split(/\n+/).filter((s) => s.trim()); + const parsed: unknown[] = []; + let text = ""; + let valid = false; + let startTime: number | undefined; + let endTime: number | undefined; + let cost = 0; + let inputTokens = 0; + let outputTokens = 0; + + for (const line of lines) { + try { + const event = JSON.parse(line); + parsed.push(event); + if (event && typeof event === "object") { + // Opencode emits a stream of events. + if (event.type === "step_start") { + valid = true; + if (typeof event.timestamp === "number") { + if (startTime === undefined || event.timestamp < startTime) { + startTime = event.timestamp; + } + } + } + + if (event.type === "text" && event.part?.text) { + text += event.part.text; + valid = true; + } + + if (event.type === "step_finish") { + valid = true; + if (typeof event.timestamp === "number") { + endTime = event.timestamp; + } + if (event.part) { + if (typeof event.part.cost === "number") { + cost += event.part.cost; + } + if (event.part.tokens) { + inputTokens += event.part.tokens.input || 0; + outputTokens += event.part.tokens.output || 0; + } + } + } + } + } catch { + // ignore non-JSON lines + } + } + + const meta: OpencodeJsonParseResult["meta"] = {}; + if (startTime !== undefined && endTime !== undefined) { + meta.durationMs = endTime - startTime; + } + if (cost > 0) meta.cost = cost; + if (inputTokens > 0 || outputTokens > 0) { + meta.tokens = { input: inputTokens, output: outputTokens }; + } + + return { + text: text || undefined, + parsed, + valid: valid && parsed.length > 0, + meta: Object.keys(meta).length > 0 ? meta : undefined, + }; +} + +export function summarizeOpencodeMetadata( + meta: OpencodeJsonParseResult["meta"], +): string | undefined { + if (!meta) return undefined; + const parts: string[] = []; + if (meta.durationMs !== undefined) + parts.push(`duration=${meta.durationMs}ms`); + if (meta.cost !== undefined) parts.push(`cost=$${meta.cost.toFixed(4)}`); + if (meta.tokens) { + parts.push(`tokens=${meta.tokens.input}+${meta.tokens.output}`); + } + return parts.length ? parts.join(", ") : undefined; +} + diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index 20aef572c..e59f2c683 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -265,8 +265,8 @@ export async function getReplyFromConfig( timeoutSeconds, commandRunner, }); - if (meta.claudeMeta && isVerbose()) { - logVerbose(`Claude JSON meta: ${meta.claudeMeta}`); + if (meta.agentMeta && isVerbose()) { + logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); } return payload; } finally { diff --git a/src/config/config.ts b/src/config/config.ts index f0e46c6c4..ccb7c06b1 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -5,8 +5,9 @@ import path from "node:path"; import JSON5 from "json5"; import { z } from "zod"; +import type { AgentKind } from "../agents/index.js"; + export type ReplyMode = "text" | "command"; -export type ClaudeOutputFormat = "text" | "json" | "stream-json"; export type SessionScope = "per-sender" | "global"; export type SessionConfig = { @@ -56,18 +57,22 @@ export type WarelayConfig = { }; reply?: { mode: ReplyMode; - text?: string; // for mode=text, can contain {{Body}} - command?: string[]; // for mode=command, argv with templates - cwd?: string; // working directory for command execution - template?: string; // prepend template string when building command/prompt - timeoutSeconds?: number; // optional command timeout; defaults to 600s - bodyPrefix?: string; // optional string prepended to Body before templating - mediaUrl?: string; // optional media attachment (path or URL) + text?: string; + command?: string[]; + cwd?: string; + template?: string; + timeoutSeconds?: number; + bodyPrefix?: string; + mediaUrl?: string; session?: SessionConfig; - claudeOutputFormat?: ClaudeOutputFormat; // when command starts with `claude`, force an output format - mediaMaxMb?: number; // optional cap for outbound media (default 5MB) - typingIntervalSeconds?: number; // how often to refresh typing indicator while command runs - heartbeatMinutes?: number; // auto-ping cadence for command mode + mediaMaxMb?: number; + typingIntervalSeconds?: number; + heartbeatMinutes?: number; + agent?: { + kind: AgentKind; + format?: "text" | "json"; + identityPrefix?: string; + }; }; }; web?: WebConfig; @@ -105,13 +110,17 @@ const ReplySchema = z }) .optional(), heartbeatMinutes: z.number().int().nonnegative().optional(), - claudeOutputFormat: z - .union([ - z.literal("text"), - z.literal("json"), - z.literal("stream-json"), - z.undefined(), - ]) + agent: z + .object({ + kind: z.union([ + z.literal("claude"), + z.literal("opencode"), + z.literal("pi"), + z.literal("codex"), + ]), + format: z.union([z.literal("text"), z.literal("json")]).optional(), + identityPrefix: z.string().optional(), + }) .optional(), }) .refine( diff --git a/src/index.core.test.ts b/src/index.core.test.ts index 63093b801..f213719d4 100644 --- a/src/index.core.test.ts +++ b/src/index.core.test.ts @@ -762,7 +762,7 @@ describe("config and templating", () => { reply: { mode: "command" as const, command: ["claude", "{{Body}}"], - claudeOutputFormat: "text" as const, + agent: { kind: "claude", format: "text" as const }, }, }, }; @@ -802,7 +802,7 @@ describe("config and templating", () => { reply: { mode: "command" as const, command: ["claude", "{{Body}}"], - claudeOutputFormat: "json" as const, + agent: { kind: "claude", format: "json" as const }, }, }, }; @@ -830,7 +830,7 @@ describe("config and templating", () => { reply: { mode: "command" as const, command: ["claude", "{{Body}}"], - // No claudeOutputFormat set on purpose + agent: { kind: "claude" }, }, }, }; diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 19634a3f1..7862f25cf 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -18,7 +18,7 @@ import { monitorWebInbox } from "./inbound.js"; import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; -import { getQueueSize } from "../process/command-queue.js"; +import { enqueueCommand, getQueueSize } from "../process/command-queue.js"; import { computeBackoff, newConnectionId, @@ -621,19 +621,21 @@ export async function monitorWebProvider( : new Date().toISOString(); console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`); - const replyResult = await (replyResolver ?? getReplyFromConfig)( - { - Body: combinedBody, - From: latest.from, - To: latest.to, - MessageSid: latest.id, - MediaPath: latest.mediaPath, - MediaUrl: latest.mediaUrl, - MediaType: latest.mediaType, - }, - { - onReplyStart: latest.sendComposing, - }, + const replyResult = await enqueueCommand(() => + (replyResolver ?? getReplyFromConfig)( + { + Body: combinedBody, + From: latest.from, + To: latest.to, + MessageSid: latest.id, + MediaPath: latest.mediaPath, + MediaUrl: latest.mediaUrl, + MediaType: latest.mediaType, + }, + { + onReplyStart: latest.sendComposing, + }, + ), ); if ( @@ -917,19 +919,24 @@ export async function monitorWebProvider( "reply heartbeat start", ); } - const replyResult = await (replyResolver ?? getReplyFromConfig)( - { - Body: HEARTBEAT_PROMPT, - From: lastInboundMsg.from, - To: lastInboundMsg.to, - MessageSid: snapshot.entry?.sessionId, - MediaPath: undefined, - MediaUrl: undefined, - MediaType: undefined, - }, - { - onReplyStart: lastInboundMsg.sendComposing, - }, + const hbFrom = lastInboundMsg.from; + const hbTo = lastInboundMsg.to; + const hbComposing = lastInboundMsg.sendComposing; + const replyResult = await enqueueCommand(() => + (replyResolver ?? getReplyFromConfig)( + { + Body: HEARTBEAT_PROMPT, + From: hbFrom, + To: hbTo, + MessageSid: snapshot.entry?.sessionId, + MediaPath: undefined, + MediaUrl: undefined, + MediaType: undefined, + }, + { + onReplyStart: hbComposing, + }, + ), ); if ( From ed080ae988ca142162f151d57060ec7890903bb4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 10:56:10 +0000 Subject: [PATCH 04/11] Tests: cover agents and fix web defaults Co-authored-by: RealSid08 --- src/agents/agents.test.ts | 118 +++++++++++++++++++++++++++ src/agents/claude.ts | 26 +++--- src/agents/codex.ts | 23 ++++-- src/agents/index.ts | 1 - src/agents/opencode.ts | 34 +++++--- src/agents/pi.ts | 20 +++-- src/agents/types.ts | 1 - src/auto-reply/command-reply.test.ts | 26 +++++- src/auto-reply/command-reply.ts | 21 +++-- src/auto-reply/opencode.ts | 1 - src/commands/send.test.ts | 4 + src/commands/send.ts | 10 ++- src/web/auto-reply.test.ts | 21 ++--- src/web/auto-reply.ts | 36 +++++--- src/web/inbound.ts | 9 +- src/web/ipc.ts | 10 +-- src/web/monitor-inbox.test.ts | 6 +- 17 files changed, 285 insertions(+), 82 deletions(-) create mode 100644 src/agents/agents.test.ts diff --git a/src/agents/agents.test.ts b/src/agents/agents.test.ts new file mode 100644 index 000000000..06e64037e --- /dev/null +++ b/src/agents/agents.test.ts @@ -0,0 +1,118 @@ +import { describe, expect, it } from "vitest"; + +import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js"; +import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js"; +import { claudeSpec } from "./claude.js"; +import { codexSpec } from "./codex.js"; +import { opencodeSpec } from "./opencode.js"; +import { piSpec } from "./pi.js"; + +describe("agent buildArgs + parseOutput helpers", () => { + it("claudeSpec injects flags and identity once", () => { + const argv = ["claude", "hi"]; + const built = claudeSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: true, + sessionId: "sess", + sendSystemOnce: false, + systemSent: false, + identityPrefix: undefined, + format: "json", + }); + expect(built).toContain("--output-format"); + expect(built).toContain("json"); + expect(built).toContain("-p"); + expect(built.at(-1)).toContain(CLAUDE_IDENTITY_PREFIX); + + const builtNoIdentity = claudeSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: false, + sessionId: "sess", + sendSystemOnce: true, + systemSent: true, + identityPrefix: undefined, + format: "json", + }); + expect(builtNoIdentity.at(-1)).not.toContain(CLAUDE_IDENTITY_PREFIX); + }); + + it("opencodeSpec adds format flag and identity prefix when needed", () => { + const argv = ["opencode", "body"]; + const built = opencodeSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: true, + sessionId: "sess", + sendSystemOnce: false, + systemSent: false, + identityPrefix: undefined, + format: "json", + }); + expect(built).toContain("--format"); + expect(built).toContain("json"); + expect(built.at(-1)).toContain(OPENCODE_IDENTITY_PREFIX); + }); + + it("piSpec parses final assistant message and preserves usage meta", () => { + const stdout = [ + '{"type":"message_start","message":{"role":"assistant"}}', + '{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}', + ].join("\n"); + const parsed = piSpec.parseOutput(stdout); + expect(parsed.text).toBe("hello world"); + expect(parsed.meta?.provider).toBe("inflection"); + expect((parsed.meta?.usage as { output?: number })?.output).toBe(5); + }); + + it("codexSpec parses agent_message and aggregates usage", () => { + const stdout = [ + '{"type":"item.completed","item":{"type":"agent_message","text":"hi there"}}', + '{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}', + ].join("\n"); + const parsed = codexSpec.parseOutput(stdout); + expect(parsed.text).toBe("hi there"); + const usage = parsed.meta?.usage as { + input?: number; + output?: number; + cacheRead?: number; + total?: number; + }; + expect(usage?.input).toBe(50); + expect(usage?.output).toBe(10); + expect(usage?.cacheRead).toBe(5); + expect(usage?.total).toBe(65); + }); + + it("opencodeSpec parses streamed events and summarizes meta", () => { + const stdout = [ + '{"type":"step_start","timestamp":0}', + '{"type":"text","part":{"text":"hi"}}', + '{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}', + ].join("\n"); + const parsed = opencodeSpec.parseOutput(stdout); + expect(parsed.text).toBe("hi"); + expect(parsed.meta?.extra?.summary).toContain("duration=1200ms"); + expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020"); + expect(parsed.meta?.extra?.summary).toContain("tokens=100+20"); + }); + + it("codexSpec buildArgs enforces exec/json/sandbox defaults", () => { + const argv = ["codex", "hello world"]; + const built = codexSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: true, + sessionId: "sess", + sendSystemOnce: false, + systemSent: false, + identityPrefix: undefined, + format: "json", + }); + expect(built[1]).toBe("exec"); + expect(built).toContain("--json"); + expect(built).toContain("--skip-git-repo-check"); + expect(built).toContain("read-only"); + }); +}); diff --git a/src/agents/claude.ts b/src/agents/claude.ts index 80cd767bb..261dfe4ce 100644 --- a/src/agents/claude.ts +++ b/src/agents/claude.ts @@ -3,16 +3,11 @@ import path from "node:path"; import { CLAUDE_BIN, CLAUDE_IDENTITY_PREFIX, + type ClaudeJsonParseResult, parseClaudeJson, summarizeClaudeMetadata, - type ClaudeJsonParseResult, } from "../auto-reply/claude.js"; -import type { - AgentMeta, - AgentParseResult, - AgentSpec, - BuildArgsContext, -} from "./types.js"; +import type { AgentMeta, AgentSpec } from "./types.js"; function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined { if (!parsed?.parsed) return undefined; @@ -22,10 +17,11 @@ function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined { export const claudeSpec: AgentSpec = { kind: "claude", - isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN, + isInvocation: (argv) => + argv.length > 0 && path.basename(argv[0]) === CLAUDE_BIN, buildArgs: (ctx) => { - // Work off a split of "before body" and "after body" so we don't lose the - // body index when inserting flags. + // Split around the body so we can inject flags without losing the body + // position. This keeps templated prompts intact even when we add flags. const argv = [...ctx.argv]; const body = argv[ctx.bodyIndex] ?? ""; const beforeBody = argv.slice(0, ctx.bodyIndex); @@ -34,14 +30,18 @@ export const claudeSpec: AgentSpec = { const wantsOutputFormat = typeof ctx.format === "string"; if (wantsOutputFormat) { const hasOutputFormat = argv.some( - (part) => part === "--output-format" || part.startsWith("--output-format="), + (part) => + part === "--output-format" || part.startsWith("--output-format="), ); if (!hasOutputFormat) { - beforeBody.push("--output-format", ctx.format!); + const outputFormat = ctx.format ?? "json"; + beforeBody.push("--output-format", outputFormat); } } - const hasPrintFlag = argv.some((part) => part === "-p" || part === "--print"); + const hasPrintFlag = argv.some( + (part) => part === "-p" || part === "--print", + ); if (!hasPrintFlag) { beforeBody.push("-p"); } diff --git a/src/agents/codex.ts b/src/agents/codex.ts index 3b2066d05..da1cd29a2 100644 --- a/src/agents/codex.ts +++ b/src/agents/codex.ts @@ -1,6 +1,6 @@ import path from "node:path"; -import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; +import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js"; function parseCodexJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); @@ -9,11 +9,25 @@ function parseCodexJson(raw: string): AgentParseResult { for (const line of lines) { try { - const ev = JSON.parse(line) as { type?: string; item?: { type?: string; text?: string }; usage?: unknown }; - if (ev.type === "item.completed" && ev.item?.type === "agent_message" && typeof ev.item.text === "string") { + const ev = JSON.parse(line) as { + type?: string; + item?: { type?: string; text?: string }; + usage?: unknown; + }; + // Codex streams multiple events; capture the last agent_message text and + // the final turn usage for cost/telemetry. + if ( + ev.type === "item.completed" && + ev.item?.type === "agent_message" && + typeof ev.item.text === "string" + ) { text = ev.item.text; } - if (ev.type === "turn.completed" && ev.usage && typeof ev.usage === "object") { + if ( + ev.type === "turn.completed" && + ev.usage && + typeof ev.usage === "object" + ) { const u = ev.usage as { input_tokens?: number; cached_input_tokens?: number; @@ -63,4 +77,3 @@ export const codexSpec: AgentSpec = { }, parseOutput: parseCodexJson, }; - diff --git a/src/agents/index.ts b/src/agents/index.ts index 508b3811c..231d8a3eb 100644 --- a/src/agents/index.ts +++ b/src/agents/index.ts @@ -16,4 +16,3 @@ export function getAgentSpec(kind: AgentKind): AgentSpec { } export { AgentKind, AgentMeta, AgentParseResult } from "./types.js"; - diff --git a/src/agents/opencode.ts b/src/agents/opencode.ts index a19bfae7b..c458d94c1 100644 --- a/src/agents/opencode.ts +++ b/src/agents/opencode.ts @@ -6,42 +6,50 @@ import { parseOpencodeJson, summarizeOpencodeMetadata, } from "../auto-reply/opencode.js"; -import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; +import type { AgentMeta, AgentSpec } from "./types.js"; -function toMeta(parsed: ReturnType): AgentMeta | undefined { +function toMeta( + parsed: ReturnType, +): AgentMeta | undefined { const summary = summarizeOpencodeMetadata(parsed.meta); return summary ? { extra: { summary } } : undefined; } export const opencodeSpec: AgentSpec = { kind: "opencode", - isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN, + isInvocation: (argv) => + argv.length > 0 && path.basename(argv[0]) === OPENCODE_BIN, buildArgs: (ctx) => { + // Split around the body so we can insert flags without losing the prompt. const argv = [...ctx.argv]; + const body = argv[ctx.bodyIndex] ?? ""; + const beforeBody = argv.slice(0, ctx.bodyIndex); + const afterBody = argv.slice(ctx.bodyIndex + 1); const wantsJson = ctx.format === "json"; // Ensure format json for parsing if (wantsJson) { - const hasFormat = argv.some( + const hasFormat = [...beforeBody, body, ...afterBody].some( (part) => part === "--format" || part.startsWith("--format="), ); if (!hasFormat) { - const insertBeforeBody = Math.max(argv.length - 1, 0); - argv.splice(insertBeforeBody, 0, "--format", "json"); + beforeBody.push("--format", "json"); } } // Session args default to --session // Identity prefix + // Opencode streams text tokens; we still seed an identity so the agent + // keeps context on first turn. const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent); - if (shouldPrependIdentity && argv[ctx.bodyIndex]) { - const existingBody = argv[ctx.bodyIndex]; - argv[ctx.bodyIndex] = [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, existingBody] - .filter(Boolean) - .join("\n\n"); - } + const bodyWithIdentity = + shouldPrependIdentity && body + ? [ctx.identityPrefix ?? OPENCODE_IDENTITY_PREFIX, body] + .filter(Boolean) + .join("\n\n") + : body; - return argv; + return [...beforeBody, bodyWithIdentity, ...afterBody]; }, parseOutput: (rawStdout) => { const parsed = parseOpencodeJson(rawStdout); diff --git a/src/agents/pi.ts b/src/agents/pi.ts index 18efc0531..c7359b98e 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -1,6 +1,6 @@ import path from "node:path"; -import type { AgentMeta, AgentParseResult, AgentSpec, BuildArgsContext } from "./types.js"; +import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js"; type PiAssistantMessage = { role?: string; @@ -16,7 +16,11 @@ function parsePiJson(raw: string): AgentParseResult { let lastMessage: PiAssistantMessage | undefined; for (const line of lines) { try { - const ev = JSON.parse(line) as { type?: string; message?: PiAssistantMessage }; + const ev = JSON.parse(line) as { + type?: string; + message?: PiAssistantMessage; + }; + // Pi emits a stream; we only care about the terminal assistant message_end. if (ev.type === "message_end" && ev.message?.role === "assistant") { lastMessage = ev.message; } @@ -50,14 +54,20 @@ export const piSpec: AgentSpec = { if (!argv.includes("-p") && !argv.includes("--print")) { argv.splice(argv.length - 1, 0, "-p"); } - if (ctx.format === "json" && !argv.includes("--mode") && !argv.some((a) => a === "--mode")) { + if ( + ctx.format === "json" && + !argv.includes("--mode") && + !argv.some((a) => a === "--mode") + ) { argv.splice(argv.length - 1, 0, "--mode", "json"); } // Session defaults - // Identity prefix optional; Pi usually doesn't need, but allow + // Identity prefix optional; Pi usually doesn't need it, but allow injection if (!(ctx.sendSystemOnce && ctx.systemSent) && argv[ctx.bodyIndex]) { const existingBody = argv[ctx.bodyIndex]; - argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody].filter(Boolean).join("\n\n"); + argv[ctx.bodyIndex] = [ctx.identityPrefix, existingBody] + .filter(Boolean) + .join("\n\n"); } return argv; }, diff --git a/src/agents/types.ts b/src/agents/types.ts index 76b6ba1d2..d430fb296 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -39,4 +39,3 @@ export interface AgentSpec { buildArgs: (ctx: BuildArgsContext) => string[]; parseOutput: (rawStdout: string) => AgentParseResult; } - diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index 39ca340b8..01a32ca0e 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -193,7 +193,11 @@ describe("runCommandReply", () => { throw { stdout: "partial output here", killed: true, signal: "SIGKILL" }; }); const { payload, meta } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"], agent: { kind: "claude" } }, + reply: { + mode: "command", + command: ["echo", "hi"], + agent: { kind: "claude" }, + }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -214,7 +218,12 @@ describe("runCommandReply", () => { throw { stdout: "", killed: true, signal: "SIGKILL" }; }); const { payload } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"], cwd: "/tmp/work", agent: { kind: "claude" } }, + reply: { + mode: "command", + command: ["echo", "hi"], + cwd: "/tmp/work", + agent: { kind: "claude" }, + }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -236,7 +245,12 @@ describe("runCommandReply", () => { stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`, }); const { payload } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "hi"], mediaMaxMb: 1, agent: { kind: "claude" } }, + reply: { + mode: "command", + command: ["echo", "hi"], + mediaMaxMb: 1, + agent: { kind: "claude" }, + }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, @@ -279,7 +293,11 @@ describe("runCommandReply", () => { it("captures queue wait metrics in meta", async () => { const runner = makeRunner({ stdout: "ok" }); const { meta } = await runCommandReply({ - reply: { mode: "command", command: ["echo", "{{Body}}"], agent: { kind: "claude" } }, + reply: { + mode: "command", + command: ["echo", "{{Body}}"], + agent: { kind: "claude" }, + }, templatingCtx: noopTemplateCtx, sendSystemOnce: false, isNewSession: true, diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index a897520ba..eb1838be0 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -1,7 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; -import { getAgentSpec } from "../agents/index.js"; +import { type AgentKind, getAgentSpec } from "../agents/index.js"; import type { AgentMeta } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; @@ -116,7 +116,8 @@ export async function runCommandReply( throw new Error("reply.command is required for mode=command"); } const agentCfg = reply.agent ?? { kind: "claude" }; - const agent = getAgentSpec(agentCfg.kind as any); + const agentKind: AgentKind = agentCfg.kind ?? "claude"; + const agent = getAgentSpec(agentKind); let argv = reply.command.map((part) => applyTemplate(part, templatingCtx)); const templatePrefix = @@ -142,14 +143,18 @@ export async function runCommandReply( : ["--session", "{{SessionId}}"]; const sessionArgList = ( isNewSession - ? reply.session.sessionArgNew ?? defaultNew - : reply.session.sessionArgResume ?? defaultResume + ? (reply.session.sessionArgNew ?? defaultNew) + : (reply.session.sessionArgResume ?? defaultResume) ).map((p) => applyTemplate(p, templatingCtx)); if (sessionArgList.length) { const insertBeforeBody = reply.session.sessionArgBeforeBody ?? true; const insertAt = insertBeforeBody && argv.length > 1 ? argv.length - 1 : argv.length; - argv = [...argv.slice(0, insertAt), ...sessionArgList, ...argv.slice(insertAt)]; + argv = [ + ...argv.slice(0, insertAt), + ...sessionArgList, + ...argv.slice(insertAt), + ]; bodyIndex = Math.max(argv.length - 1, 0); } } @@ -198,6 +203,8 @@ export async function runCommandReply( } const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; + // Treat empty string as "no content" so we can fall back to the friendly + // "(command produced no output)" message instead of echoing raw JSON. if (parsed && parsed.text !== undefined) { trimmed = parsed.text.trim(); } @@ -223,7 +230,9 @@ export async function runCommandReply( `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, ); // Include any partial output or stderr in error message - const partialOut = trimmed ? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` : ""; + const partialOut = trimmed + ? `\n\nOutput: ${trimmed.slice(0, 500)}${trimmed.length > 500 ? "..." : ""}` + : ""; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; return { payload: { text: errorText }, diff --git a/src/auto-reply/opencode.ts b/src/auto-reply/opencode.ts index 19b16055d..859b81d60 100644 --- a/src/auto-reply/opencode.ts +++ b/src/auto-reply/opencode.ts @@ -102,4 +102,3 @@ export function summarizeOpencodeMetadata( } return parts.length ? parts.join(", ") : undefined; } - diff --git a/src/commands/send.test.ts b/src/commands/send.test.ts index 7e7fb9dd8..7cc54e7c4 100644 --- a/src/commands/send.test.ts +++ b/src/commands/send.test.ts @@ -4,6 +4,10 @@ import type { CliDeps } from "../cli/deps.js"; import type { RuntimeEnv } from "../runtime.js"; import { sendCommand } from "./send.js"; +vi.mock("../web/ipc.js", () => ({ + sendViaIpc: vi.fn().mockResolvedValue(null), +})); + const runtime: RuntimeEnv = { log: vi.fn(), error: vi.fn(), diff --git a/src/commands/send.ts b/src/commands/send.ts index 45b770f74..30b99f057 100644 --- a/src/commands/send.ts +++ b/src/commands/send.ts @@ -45,7 +45,9 @@ export async function sendCommand( const ipcResult = await sendViaIpc(opts.to, opts.message, opts.media); if (ipcResult) { if (ipcResult.success) { - runtime.log(success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`)); + runtime.log( + success(`✅ Sent via relay IPC. Message ID: ${ipcResult.messageId}`), + ); if (opts.json) { runtime.log( JSON.stringify( @@ -64,7 +66,11 @@ export async function sendCommand( return; } // IPC failed but relay is running - warn and fall back - runtime.log(info(`IPC send failed (${ipcResult.error}), falling back to direct connection`)); + runtime.log( + info( + `IPC send failed (${ipcResult.error}), falling back to direct connection`, + ), + ); } // Fall back to direct connection (creates new Baileys socket) diff --git a/src/web/auto-reply.test.ts b/src/web/auto-reply.test.ts index 945df6212..3ed436571 100644 --- a/src/web/auto-reply.test.ts +++ b/src/web/auto-reply.test.ts @@ -1,10 +1,4 @@ -// Import test-helpers FIRST to set up mocks before other imports -import { - resetBaileysMocks, - resetLoadConfigMock, - setLoadConfigMock, -} from "./test-helpers.js"; - +import "./test-helpers.js"; import crypto from "node:crypto"; import fs from "node:fs/promises"; import os from "node:os"; @@ -13,8 +7,8 @@ import sharp from "sharp"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { WarelayConfig } from "../config/config.js"; -import * as commandQueue from "../process/command-queue.js"; import { resetLogger, setLoggerOverride } from "../logging.js"; +import * as commandQueue from "../process/command-queue.js"; import { HEARTBEAT_PROMPT, HEARTBEAT_TOKEN, @@ -25,8 +19,11 @@ import { stripHeartbeatToken, } from "./auto-reply.js"; import type { sendMessageWeb } from "./outbound.js"; -import * as commandQueue from "../process/command-queue.js"; -import { getQueueSize } from "../process/command-queue.js"; +import { + resetBaileysMocks, + resetLoadConfigMock, + setLoadConfigMock, +} from "./test-helpers.js"; const makeSessionStore = async ( entries: Record = {}, @@ -535,9 +532,7 @@ describe("web auto-reply", () => { const storePath = path.join(tmpDir, "sessions.json"); await fs.writeFile(storePath, JSON.stringify({})); - const queueSpy = vi - .spyOn(commandQueue, "getQueueSize") - .mockReturnValue(2); + const queueSpy = vi.spyOn(commandQueue, "getQueueSize").mockReturnValue(2); const replyResolver = vi.fn(); const listenerFactory = vi.fn(async () => { const onClose = new Promise(() => { diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 7862f25cf..9b6e9e917 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -12,13 +12,13 @@ import { import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; +import { enqueueCommand, getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; import { sendViaIpc, startIpcServer, stopIpcServer } from "./ipc.js"; import { loadWebMedia } from "./media.js"; import { sendMessageWeb } from "./outbound.js"; -import { enqueueCommand, getQueueSize } from "../process/command-queue.js"; import { computeBackoff, newConnectionId, @@ -697,9 +697,7 @@ export async function monitorWebProvider( } } catch (err) { console.error( - danger( - `Failed sending web auto-reply to ${from}: ${String(err)}`, - ), + danger(`Failed sending web auto-reply to ${from}: ${String(err)}`), ); } }; @@ -713,7 +711,8 @@ export async function monitorWebProvider( if (getQueueSize() === 0) { await processBatch(msg.from); } else { - bucket.timer = bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150); + bucket.timer = + bucket.timer ?? setTimeout(() => void processBatch(msg.from), 150); } }; @@ -754,7 +753,12 @@ export async function monitorWebProvider( mediaBuffer = media.buffer; mediaType = media.contentType; } - const result = await listener.sendMessage(to, message, mediaBuffer, mediaType); + const result = await listener.sendMessage( + to, + message, + mediaBuffer, + mediaType, + ); // Add to echo detection so we don't process our own message if (message) { recentlySent.add(message); @@ -763,7 +767,10 @@ export async function monitorWebProvider( if (firstKey) recentlySent.delete(firstKey); } } - logInfo(`📤 IPC send to ${to}: ${message.substring(0, 50)}...`, runtime); + logInfo( + `📤 IPC send to ${to}: ${message.substring(0, 50)}...`, + runtime, + ); // Show typing indicator after send so user knows more may be coming try { await listener.sendComposingTo(to); @@ -807,7 +814,10 @@ export async function monitorWebProvider( // Warn if no messages in 30+ minutes if (minutesSinceLastMessage && minutesSinceLastMessage > 30) { - heartbeatLogger.warn(logData, "⚠️ web relay heartbeat - no messages in 30+ minutes"); + heartbeatLogger.warn( + logData, + "⚠️ web relay heartbeat - no messages in 30+ minutes", + ); } else { heartbeatLogger.info(logData, "web relay heartbeat"); } @@ -818,7 +828,9 @@ export async function monitorWebProvider( if (lastMessageAt) { const timeSinceLastMessage = Date.now() - lastMessageAt; if (timeSinceLastMessage > MESSAGE_TIMEOUT_MS) { - const minutesSinceLastMessage = Math.floor(timeSinceLastMessage / 60000); + const minutesSinceLastMessage = Math.floor( + timeSinceLastMessage / 60000, + ); heartbeatLogger.warn( { connectionId, @@ -978,7 +990,11 @@ export async function monitorWebProvider( // Apply response prefix if configured (same as regular messages) let finalText = stripped.text; const responsePrefix = cfg.inbound?.responsePrefix; - if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) { + if ( + responsePrefix && + finalText && + !finalText.startsWith(responsePrefix) + ) { finalText = `${responsePrefix} ${finalText}`; } diff --git a/src/web/inbound.ts b/src/web/inbound.ts index 9d6a4df6a..f10a48247 100644 --- a/src/web/inbound.ts +++ b/src/web/inbound.ts @@ -103,8 +103,13 @@ export async function monitorWebInbox(options: { const isSamePhone = from === selfE164; if (!isSamePhone && Array.isArray(allowFrom) && allowFrom.length > 0) { - if (!allowFrom.includes("*") && !allowFrom.map(normalizeE164).includes(from)) { - logVerbose(`Blocked unauthorized sender ${from} (not in allowFrom list)`); + if ( + !allowFrom.includes("*") && + !allowFrom.map(normalizeE164).includes(from) + ) { + logVerbose( + `Blocked unauthorized sender ${from} (not in allowFrom list)`, + ); continue; // Skip processing entirely } } diff --git a/src/web/ipc.ts b/src/web/ipc.ts index 99e09e0f5..ed70b525b 100644 --- a/src/web/ipc.ts +++ b/src/web/ipc.ts @@ -78,13 +78,13 @@ export function startIpcServer(sendHandler: SendHandler): void { success: true, messageId: result.messageId, }; - conn.write(JSON.stringify(response) + "\n"); + conn.write(`${JSON.stringify(response)}\n`); } catch (err) { const response: IpcSendResponse = { success: false, error: String(err), }; - conn.write(JSON.stringify(response) + "\n"); + conn.write(`${JSON.stringify(response)}\n`); } } } catch (err) { @@ -93,7 +93,7 @@ export function startIpcServer(sendHandler: SendHandler): void { success: false, error: "Invalid request format", }; - conn.write(JSON.stringify(response) + "\n"); + conn.write(`${JSON.stringify(response)}\n`); } } }); @@ -174,7 +174,7 @@ export async function sendViaIpc( message, mediaUrl, }; - client.write(JSON.stringify(request) + "\n"); + client.write(`${JSON.stringify(request)}\n`); }); client.on("data", (data) => { @@ -198,7 +198,7 @@ export async function sendViaIpc( } }); - client.on("error", (err) => { + client.on("error", (_err) => { if (!resolved) { resolved = true; clearTimeout(timeout); diff --git a/src/web/monitor-inbox.test.ts b/src/web/monitor-inbox.test.ts index b696d2513..829cade1f 100644 --- a/src/web/monitor-inbox.test.ts +++ b/src/web/monitor-inbox.test.ts @@ -251,7 +251,11 @@ describe("web monitor inbox", () => { type: "notify", messages: [ { - key: { id: "unauth1", fromMe: false, remoteJid: "999@s.whatsapp.net" }, + key: { + id: "unauth1", + fromMe: false, + remoteJid: "999@s.whatsapp.net", + }, message: { conversation: "unauthorized message" }, messageTimestamp: 1_700_000_000, }, From 2b941ccc93d2cbc59d45b3408ee8baf441865e4f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 10:58:22 +0000 Subject: [PATCH 05/11] Changelog: note multi-agent and batching Co-authored-by: RealSid08 --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33fb74ca5..06a34374f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 1.3.0 — Unreleased +### Highlights +- **Pluggable agents (Claude, Pi, Codex, Opencode):** New `inbound.reply.agent` block chooses the CLI and parser per command reply; per-agent argv builders inject the right flags/identity/prompt handling and parse NDJSON streams, enabling Pi/Codex swaps without changing templates. + ### Bug Fixes - **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. - **Response prefix on heartbeat replies:** Fixed `responsePrefix` (e.g., `🦞`) not being applied to heartbeat alert messages. The prefix was only applied in the regular message handler, not in `runReplyHeartbeat`. @@ -11,6 +14,7 @@ ### Changes - **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise. +- **Batched inbound messaging with timestamps:** When multiple WhatsApp messages queue up, they’re sent to the agent in one combined batch, each line timestamped consistently to preserve ordering and context. - **Typing indicator after IPC send:** After sending a message via IPC (e.g., `warelay send`), the relay now automatically shows the typing indicator ("composing") to signal that more messages may be coming. - **Auto-recovery from stuck WhatsApp sessions:** Added watchdog timer that detects when WhatsApp event emitter stops firing (e.g., after Bad MAC decryption errors) and automatically restarts the connection after 30 minutes of no message activity. Heartbeat logging now includes `minutesSinceLastMessage` and warns when >30 minutes without messages. The 30-minute timeout is intentionally longer than typical `heartbeatMinutes` configs to avoid false positives. - **Early allowFrom filtering:** Unauthorized senders are now blocked in `inbound.ts` BEFORE encryption/decryption attempts, preventing Bad MAC errors from corrupting session state. Previously, messages from unauthorized senders would trigger decryption failures that could silently kill the event emitter. From c9fbe2cb92e797ae20c4dffac37ec4d1702e72a7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 16:09:40 +0000 Subject: [PATCH 06/11] chore(security): harden ipc socket --- CHANGELOG.md | 10 ++++++- src/web/ipc.test.ts | 63 +++++++++++++++++++++++++++++++++++++++++++++ src/web/ipc.ts | 60 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 128 insertions(+), 5 deletions(-) create mode 100644 src/web/ipc.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 06a34374f..081c4c00b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,16 @@ # Changelog -## 1.3.0 — Unreleased +## Unreleased + +### Security +- Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. + +## 1.3.0 — 2025-12-02 ### Highlights - **Pluggable agents (Claude, Pi, Codex, Opencode):** New `inbound.reply.agent` block chooses the CLI and parser per command reply; per-agent argv builders inject the right flags/identity/prompt handling and parse NDJSON streams, enabling Pi/Codex swaps without changing templates. +- **Safety stop words for agents:** If an inbound message is exactly `stop`, `esc`, `abort`, `wait`, or `exit`, warelay immediately replies “Agent was aborted.”, kills the pending agent run, and marks the session so the next prompt is prefixed with a reminder that the previous run was aborted. +- **Agent session reliability:** Only Claude currently returns a `session_id` that warelay persists; other agents (Gemini, Opencode, Codex, Pi) don’t emit stable session identifiers, so multi-turn continuity may reset between runs for those harnesses. ### Bug Fixes - **Empty result field handling:** Fixed bug where Claude CLI returning `result: ""` (empty string) would cause raw JSON to be sent to WhatsApp instead of being treated as valid empty output. Changed truthy check to explicit type check in `command-reply.ts`. @@ -11,6 +18,7 @@ - **User-visible error messages:** Command failures (non-zero exit, killed processes, exceptions) now return user-friendly error messages to WhatsApp instead of silently failing with empty responses. - **Test session isolation:** Fixed tests corrupting production `sessions.json` by mocking session persistence in all test files. - **Signal session corruption prevention:** Added IPC mechanism so `warelay send` and `warelay heartbeat` reuse the running relay's WhatsApp connection instead of creating new Baileys sockets. Previously, using these commands while the relay was running could corrupt the Signal session ratchet (both connections wrote to the same auth state), causing the relay's subsequent sends to fail silently. +- **Web send media kinds:** `sendMessageWeb` now honors media kind when sending via WhatsApp Web: audio → PTT with correct opus mimetype, video → video, image → image, other → document with filename. Previously all media were sent as images, breaking audio/video/doc sends. ### Changes - **IPC server for relay:** The web relay now starts a Unix socket server at `~/.warelay/relay.sock`. Commands like `warelay send --provider web` automatically connect via IPC when the relay is running, falling back to direct connection otherwise. diff --git a/src/web/ipc.test.ts b/src/web/ipc.test.ts new file mode 100644 index 000000000..4d13ed689 --- /dev/null +++ b/src/web/ipc.test.ts @@ -0,0 +1,63 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, describe, expect, it, vi } from "vitest"; + +vi.mock("../logging.js", () => ({ + getChildLogger: () => ({ + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + }), +})); + +const originalHome = process.env.HOME; + +afterEach(() => { + process.env.HOME = originalHome; + vi.resetModules(); +}); + +describe("ipc hardening", () => { + it("creates private socket dir and socket with tight perms", async () => { + const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "warelay-home-")); + process.env.HOME = tmpHome; + vi.resetModules(); + + const ipc = await import("./ipc.js"); + + const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" }); + ipc.startIpcServer(sendHandler); + + const dirStat = fs.lstatSync(path.join(tmpHome, ".warelay", "ipc")); + expect(dirStat.mode & 0o777).toBe(0o700); + + expect(ipc.isRelayRunning()).toBe(true); + + const socketStat = fs.lstatSync(ipc.getSocketPath()); + expect(socketStat.isSocket()).toBe(true); + if (typeof process.getuid === "function") { + expect(socketStat.uid).toBe(process.getuid()); + } + + ipc.stopIpcServer(); + expect(ipc.isRelayRunning()).toBe(false); + }); + + it("refuses to start when IPC dir is a symlink", async () => { + const tmpHome = fs.mkdtempSync(path.join(os.tmpdir(), "warelay-home-")); + const warelayDir = path.join(tmpHome, ".warelay"); + fs.mkdirSync(warelayDir, { recursive: true }); + fs.symlinkSync("/tmp", path.join(warelayDir, "ipc")); + + process.env.HOME = tmpHome; + vi.resetModules(); + + const ipc = await import("./ipc.js"); + const sendHandler = vi.fn().mockResolvedValue({ messageId: "msg1" }); + + expect(() => ipc.startIpcServer(sendHandler)).toThrow(/symlink/i); + }); +}); diff --git a/src/web/ipc.ts b/src/web/ipc.ts index ed70b525b..ac99194ae 100644 --- a/src/web/ipc.ts +++ b/src/web/ipc.ts @@ -15,7 +15,8 @@ import path from "node:path"; import { getChildLogger } from "../logging.js"; -const SOCKET_PATH = path.join(os.homedir(), ".warelay", "relay.sock"); +const SOCKET_DIR = path.join(os.homedir(), ".warelay", "ipc"); +const SOCKET_PATH = path.join(SOCKET_DIR, "relay.sock"); export interface IpcSendRequest { type: "send"; @@ -44,11 +45,21 @@ let server: net.Server | null = null; export function startIpcServer(sendHandler: SendHandler): void { const logger = getChildLogger({ module: "ipc-server" }); - // Clean up stale socket file + ensureSocketDir(); + try { + assertSafeSocketPath(SOCKET_PATH); + } catch (err) { + logger.error({ error: String(err) }, "Refusing to start IPC server"); + throw err; + } + + // Clean up stale socket file (only if safe to do so) try { fs.unlinkSync(SOCKET_PATH); - } catch { - // Ignore if doesn't exist + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + throw err; + } } server = net.createServer((conn) => { @@ -134,6 +145,7 @@ export function stopIpcServer(): void { */ export function isRelayRunning(): boolean { try { + assertSafeSocketPath(SOCKET_PATH); fs.accessSync(SOCKET_PATH); return true; } catch { @@ -223,3 +235,43 @@ export async function sendViaIpc( export function getSocketPath(): string { return SOCKET_PATH; } + +function ensureSocketDir(): void { + try { + const stat = fs.lstatSync(SOCKET_DIR); + if (stat.isSymbolicLink()) { + throw new Error(`IPC dir is a symlink: ${SOCKET_DIR}`); + } + if (!stat.isDirectory()) { + throw new Error(`IPC dir is not a directory: ${SOCKET_DIR}`); + } + // Enforce private permissions + fs.chmodSync(SOCKET_DIR, 0o700); + if (typeof process.getuid === "function" && stat.uid !== process.getuid()) { + throw new Error(`IPC dir owned by different user: ${SOCKET_DIR}`); + } + } catch (err) { + if ((err as NodeJS.ErrnoException).code === "ENOENT") { + fs.mkdirSync(SOCKET_DIR, { recursive: true, mode: 0o700 }); + return; + } + throw err; + } +} + +function assertSafeSocketPath(socketPath: string): void { + try { + const stat = fs.lstatSync(socketPath); + if (stat.isSymbolicLink()) { + throw new Error(`Refusing IPC socket symlink: ${socketPath}`); + } + if (typeof process.getuid === "function" && stat.uid !== process.getuid()) { + throw new Error(`IPC socket owned by different user: ${socketPath}`); + } + } catch (err) { + if ((err as NodeJS.ErrnoException).code === "ENOENT") { + return; // Missing is fine; creation will happen next. + } + throw err; + } +} From 88446748250ba0e14a032d03760d5a4eaf528e6f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 16:33:44 +0000 Subject: [PATCH 07/11] chore(security): purge session store on logout --- CHANGELOG.md | 1 + src/web/logout.test.ts | 3 +++ src/web/session.ts | 3 +++ 3 files changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 081c4c00b..2ace486db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Security - Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. +- `warelay logout` now also prunes the shared session store (`~/.warelay/sessions.json`) alongside WhatsApp Web credentials, reducing leftover state after unlinking. ## 1.3.0 — 2025-12-02 diff --git a/src/web/logout.test.ts b/src/web/logout.test.ts index 478f2864c..a1d822cae 100644 --- a/src/web/logout.test.ts +++ b/src/web/logout.test.ts @@ -35,6 +35,8 @@ describe("web logout", () => { const credsDir = path.join(tmpDir, ".warelay", "credentials"); fs.mkdirSync(credsDir, { recursive: true }); fs.writeFileSync(path.join(credsDir, "creds.json"), "{}"); + const sessionsPath = path.join(tmpDir, ".warelay", "sessions.json"); + fs.writeFileSync(sessionsPath, "{}"); const { logoutWeb, WA_WEB_AUTH_DIR } = await import("./session.js"); expect(WA_WEB_AUTH_DIR.startsWith(tmpDir)).toBe(true); @@ -42,6 +44,7 @@ describe("web logout", () => { expect(result).toBe(true); expect(fs.existsSync(credsDir)).toBe(false); + expect(fs.existsSync(sessionsPath)).toBe(false); }); it("no-ops when nothing to delete", async () => { diff --git a/src/web/session.ts b/src/web/session.ts index 545291ebf..974f40cf1 100644 --- a/src/web/session.ts +++ b/src/web/session.ts @@ -12,6 +12,7 @@ import { } from "@whiskeysockets/baileys"; import qrcode from "qrcode-terminal"; +import { SESSION_STORE_DEFAULT } from "../config/sessions.js"; import { danger, info, success } from "../globals.js"; import { getChildLogger } from "../logging.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; @@ -160,6 +161,8 @@ export async function logoutWeb(runtime: RuntimeEnv = defaultRuntime) { return false; } await fs.rm(WA_WEB_AUTH_DIR, { recursive: true, force: true }); + // Also drop session store to clear lingering per-sender state after logout. + await fs.rm(SESSION_STORE_DEFAULT, { force: true }); runtime.log( success( "Cleared WhatsApp Web credentials. Run `warelay login --provider web` to relink.", From 26921cbe6899dc03eb0f1d9667aaab179297319f Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 17:11:43 +0000 Subject: [PATCH 08/11] chore(logs): rotate daily and prune after 24h --- CHANGELOG.md | 1 + README.md | 14 +++++++++++-- src/logger.test.ts | 28 +++++++++++++++++++++++++- src/logging.ts | 50 +++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 87 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ace486db..2383eb8cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Security - Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. - `warelay logout` now also prunes the shared session store (`~/.warelay/sessions.json`) alongside WhatsApp Web credentials, reducing leftover state after unlinking. +- Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention. ## 1.3.0 — 2025-12-02 diff --git a/README.md b/README.md index 70cd06569..6524c35ea 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,16 @@ warelay supports running on the same phone number you message from—you chat wi } ``` +#### Abort trigger words +- If an inbound body is exactly `stop`, `esc`, `abort`, `wait`, or `exit`, the command/agent run is skipped and the user immediately gets `Agent was aborted.`. +- The session is tagged so the *next* prompt sent to the agent is prefixed with a short reminder that the previous run was aborted; the hint clears after that turn. + +#### Agent choices +- `inbound.reply.agent.kind` can be `claude`, `opencode`, `pi`, `codex`, or `gemini`. +- Gemini CLI supports `--output-format text|json|stream-json`; warelay auto-adds it when you set `agent.format`. +- Session defaults: Claude uses `--session-id/--resume`, Codex/Opencode/Pi use `--session`, and Gemini defaults to `--resume` for session resumes (new sessions need no flag). Override via `sessionArgNew/sessionArgResume` if you prefer custom flags. +- Reliability note: only Claude reliably returns a `session_id` that warelay can persist and reuse. Other harnesses currently don’t emit a stable session identifier, so multi-turn continuity may reset between runs for those agents (Pi does not auto-compact, but still doesn’t expose a session id). + #### Heartbeat pings (command mode) - When `heartbeatMinutes` is set (default 10 for `mode: "command"`), the relay periodically runs your command/Claude session with a heartbeat prompt. - Heartbeat body is `HEARTBEAT ultrathink` (so the model can recognize the probe); if Claude replies exactly `HEARTBEAT_OK`, the message is suppressed; otherwise the reply (or media) is forwarded. Suppressions are still logged so you know the heartbeat ran. @@ -145,7 +155,7 @@ warelay supports running on the same phone number you message from—you chat wi - When multiple active sessions exist, `warelay heartbeat` requires `--to ` or `--all`; if `allowFrom` is just `"*"`, you must choose a target with one of those flags. ### Logging (optional) -- File logs are written to `/tmp/warelay/warelay.log` by default. Levels: `silent | fatal | error | warn | info | debug | trace` (CLI `--verbose` forces `debug`). Web-provider inbound/outbound entries include message bodies and auto-reply text for easier auditing. +- File logs are written to `/tmp/warelay/warelay-YYYY-MM-DD.log` by default (rotated daily; files older than 24h are pruned). Levels: `silent | fatal | error | warn | info | debug | trace` (CLI `--verbose` forces `debug`). Web-provider inbound/outbound entries include message bodies and auto-reply text for easier auditing. - Override in `~/.warelay/warelay.json`: ```json5 @@ -208,7 +218,7 @@ Templating tokens: `{{Body}}`, `{{BodyStripped}}`, `{{From}}`, `{{To}}`, `{{Mess ## FAQ & Safety - Twilio errors: **63016 “permission to send an SMS has not been enabled”** → ensure your number is WhatsApp-enabled; **63007 template not approved** → send a free-form session message within 24h or use an approved template; **63112 policy violation** → adjust content, shorten to <1600 chars, avoid links that trigger spam filters. Re-run `pnpm warelay status` to see the exact Twilio response body. -- Does this store my messages? warelay only writes `~/.warelay/warelay.json` (config), `~/.warelay/credentials/` (WhatsApp Web auth), and `~/.warelay/sessions.json` (session IDs + timestamps). It does **not** persist message bodies beyond the session store. Logs stream to stdout/stderr and also `/tmp/warelay/warelay.log` (configurable via `logging.file`). +- Does this store my messages? warelay only writes `~/.warelay/warelay.json` (config), `~/.warelay/credentials/` (WhatsApp Web auth), and `~/.warelay/sessions.json` (session IDs + timestamps). It does **not** persist message bodies beyond the session store. Logs stream to stdout/stderr and also `/tmp/warelay/warelay-YYYY-MM-DD.log` (configurable via `logging.file`). - Personal WhatsApp safety: Automation on personal accounts can be rate-limited or logged out by WhatsApp. Use `--provider web` sparingly, keep messages human-like, and re-run `login` if the session is dropped. - Limits to remember: WhatsApp text limit ~1600 chars; avoid rapid bursts—space sends by a few seconds; keep webhook replies under a couple seconds for good UX; command auto-replies time out after 600s by default. - Deploy / keep running: Use `tmux` or `screen` for ad-hoc (`tmux new -s warelay -- pnpm warelay relay --provider twilio`). For long-running hosts, wrap `pnpm warelay relay ...` or `pnpm warelay webhook --ingress tailscale ...` in a systemd service or macOS LaunchAgent; ensure environment variables are loaded in that context. diff --git a/src/logger.test.ts b/src/logger.test.ts index 1e7d6aa63..709fcca35 100644 --- a/src/logger.test.ts +++ b/src/logger.test.ts @@ -7,7 +7,11 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { setVerbose } from "./globals.js"; import { logDebug, logError, logInfo, logSuccess, logWarn } from "./logger.js"; -import { resetLogger, setLoggerOverride } from "./logging.js"; +import { + DEFAULT_LOG_DIR, + resetLogger, + setLoggerOverride, +} from "./logging.js"; import type { RuntimeEnv } from "./runtime.js"; describe("logger helpers", () => { @@ -67,6 +71,28 @@ describe("logger helpers", () => { expect(content).toContain("warn-only"); cleanup(logPath); }); + + it("uses daily rolling default log file and prunes old ones", () => { + resetLogger(); + setLoggerOverride({}); // force defaults regardless of user config + const today = new Date().toISOString().slice(0, 10); + const todayPath = path.join(DEFAULT_LOG_DIR, `warelay-${today}.log`); + + // create an old file to be pruned + const oldPath = path.join(DEFAULT_LOG_DIR, "warelay-2000-01-01.log"); + fs.mkdirSync(DEFAULT_LOG_DIR, { recursive: true }); + fs.writeFileSync(oldPath, "old"); + fs.utimesSync(oldPath, new Date(0), new Date(0)); + cleanup(todayPath); + + logInfo("roll-me"); + + expect(fs.existsSync(todayPath)).toBe(true); + expect(fs.readFileSync(todayPath, "utf-8")).toContain("roll-me"); + expect(fs.existsSync(oldPath)).toBe(false); + + cleanup(todayPath); + }); }); function pathForTest() { diff --git a/src/logging.ts b/src/logging.ts index 3e0a6d21b..cabe9b277 100644 --- a/src/logging.ts +++ b/src/logging.ts @@ -6,8 +6,12 @@ import pino, { type Bindings, type LevelWithSilent, type Logger } from "pino"; import { loadConfig, type WarelayConfig } from "./config/config.js"; import { isVerbose } from "./globals.js"; -const DEFAULT_LOG_DIR = path.join(os.tmpdir(), "warelay"); -export const DEFAULT_LOG_FILE = path.join(DEFAULT_LOG_DIR, "warelay.log"); +export const DEFAULT_LOG_DIR = path.join(os.tmpdir(), "warelay"); +export const DEFAULT_LOG_FILE = path.join(DEFAULT_LOG_DIR, "warelay.log"); // legacy single-file path + +const LOG_PREFIX = "warelay"; +const LOG_SUFFIX = ".log"; +const MAX_LOG_AGE_MS = 24 * 60 * 60 * 1000; // 24h const ALLOWED_LEVELS: readonly LevelWithSilent[] = [ "silent", @@ -46,7 +50,7 @@ function resolveSettings(): ResolvedSettings { const cfg: WarelayConfig["logging"] | undefined = overrideSettings ?? loadConfig().logging; const level = normalizeLevel(cfg?.level); - const file = cfg?.file ?? DEFAULT_LOG_FILE; + const file = cfg?.file ?? defaultRollingPathForToday(); return { level, file }; } @@ -57,6 +61,10 @@ function settingsChanged(a: ResolvedSettings | null, b: ResolvedSettings) { function buildLogger(settings: ResolvedSettings): Logger { fs.mkdirSync(path.dirname(settings.file), { recursive: true }); + // Clean up stale rolling logs when using a dated log filename. + if (isRollingPath(settings.file)) { + pruneOldRollingLogs(path.dirname(settings.file)); + } const destination = pino.destination({ dest: settings.file, mkdir: true, @@ -104,3 +112,39 @@ export function resetLogger() { cachedSettings = null; overrideSettings = null; } + +function defaultRollingPathForToday(): string { + const today = new Date().toISOString().slice(0, 10); // YYYY-MM-DD + return path.join(DEFAULT_LOG_DIR, `${LOG_PREFIX}-${today}${LOG_SUFFIX}`); +} + +function isRollingPath(file: string): boolean { + const base = path.basename(file); + return ( + base.startsWith(`${LOG_PREFIX}-`) && + base.endsWith(LOG_SUFFIX) && + base.length === `${LOG_PREFIX}-YYYY-MM-DD${LOG_SUFFIX}`.length + ); +} + +function pruneOldRollingLogs(dir: string): void { + try { + const entries = fs.readdirSync(dir, { withFileTypes: true }); + const cutoff = Date.now() - MAX_LOG_AGE_MS; + for (const entry of entries) { + if (!entry.isFile()) continue; + if (!entry.name.startsWith(`${LOG_PREFIX}-`) || !entry.name.endsWith(LOG_SUFFIX)) continue; + const fullPath = path.join(dir, entry.name); + try { + const stat = fs.statSync(fullPath); + if (stat.mtimeMs < cutoff) { + fs.rmSync(fullPath, { force: true }); + } + } catch { + // ignore errors during pruning + } + } + } catch { + // ignore missing dir or read errors + } +} From b94b22015639571b326742b9cf84d5cf544a2fdc Mon Sep 17 00:00:00 2001 From: Joao Lisboa Date: Tue, 2 Dec 2025 10:52:37 -0300 Subject: [PATCH 09/11] Fix path traversal vulnerability in media server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /media/:id endpoint was vulnerable to path traversal attacks. Since this endpoint is exposed via Tailscale Funnel (unlike the WhatsApp webhook which requires Twilio signature validation), attackers could directly request paths like /media/%2e%2e%2fwarelay.json to access sensitive files in ~/.warelay/ (e.g. warelay.json), or even escape further to the user's home directory via multiple ../ sequences. Fix: validate resolved paths stay within the media directory. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/media/server.test.ts | 10 ++++++++++ src/media/server.ts | 7 ++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/media/server.test.ts b/src/media/server.test.ts index 876051dcd..c30e6ea61 100644 --- a/src/media/server.test.ts +++ b/src/media/server.test.ts @@ -49,4 +49,14 @@ describe("media server", () => { await expect(fs.stat(file)).rejects.toThrow(); await new Promise((r) => server.close(r)); }); + + it("blocks path traversal attempts", async () => { + const server = await startMediaServer(0, 5_000); + const port = (server.address() as AddressInfo).port; + // URL-encoded "../" to bypass client-side path normalization + const res = await fetch(`http://localhost:${port}/media/%2e%2e%2fpackage.json`); + expect(res.status).toBe(400); + expect(await res.text()).toBe("invalid path"); + await new Promise((r) => server.close(r)); + }); }); diff --git a/src/media/server.ts b/src/media/server.ts index 52c5a1ec3..27c2d5ed9 100644 --- a/src/media/server.ts +++ b/src/media/server.ts @@ -17,7 +17,12 @@ export function attachMediaRoutes( app.get("/media/:id", async (req, res) => { const id = req.params.id; - const file = path.join(mediaDir, id); + const file = path.resolve(mediaDir, id); + const mediaRoot = path.resolve(mediaDir) + path.sep; + if (!file.startsWith(mediaRoot)) { + res.status(400).send("invalid path"); + return; + } try { const stat = await fs.stat(file); if (Date.now() - stat.mtimeMs > ttlMs) { From 2cf134668c17ca100a9586222845d1a6a0a1a740 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 18:37:15 +0000 Subject: [PATCH 10/11] fix(media): block symlink traversal --- CHANGELOG.md | 1 + src/media/server.test.ts | 13 +++++++++++++ src/media/server.ts | 27 +++++++++++++++++---------- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2383eb8cd..b8a4dbc1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. - `warelay logout` now also prunes the shared session store (`~/.warelay/sessions.json`) alongside WhatsApp Web credentials, reducing leftover state after unlinking. - Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention. +- Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. ## 1.3.0 — 2025-12-02 diff --git a/src/media/server.test.ts b/src/media/server.test.ts index c30e6ea61..875088cbf 100644 --- a/src/media/server.test.ts +++ b/src/media/server.test.ts @@ -59,4 +59,17 @@ describe("media server", () => { expect(await res.text()).toBe("invalid path"); await new Promise((r) => server.close(r)); }); + + it("blocks symlink escaping outside media dir", async () => { + const target = path.join(process.cwd(), "package.json"); // outside MEDIA_DIR + const link = path.join(MEDIA_DIR, "link-out"); + await fs.symlink(target, link); + + const server = await startMediaServer(0, 5_000); + const port = (server.address() as AddressInfo).port; + const res = await fetch(`http://localhost:${port}/media/link-out`); + expect(res.status).toBe(400); + expect(await res.text()).toBe("invalid path"); + await new Promise((r) => server.close(r)); + }); }); diff --git a/src/media/server.ts b/src/media/server.ts index 27c2d5ed9..1c37c2a33 100644 --- a/src/media/server.ts +++ b/src/media/server.ts @@ -17,24 +17,31 @@ export function attachMediaRoutes( app.get("/media/:id", async (req, res) => { const id = req.params.id; - const file = path.resolve(mediaDir, id); - const mediaRoot = path.resolve(mediaDir) + path.sep; - if (!file.startsWith(mediaRoot)) { - res.status(400).send("invalid path"); - return; - } + const mediaRoot = (await fs.realpath(mediaDir)) + path.sep; + const file = path.resolve(mediaRoot, id); + try { - const stat = await fs.stat(file); + const lstat = await fs.lstat(file); + if (lstat.isSymbolicLink()) { + res.status(400).send("invalid path"); + return; + } + const realPath = await fs.realpath(file); + if (!realPath.startsWith(mediaRoot)) { + res.status(400).send("invalid path"); + return; + } + const stat = await fs.stat(realPath); if (Date.now() - stat.mtimeMs > ttlMs) { - await fs.rm(file).catch(() => {}); + await fs.rm(realPath).catch(() => {}); res.status(410).send("expired"); return; } - res.sendFile(file); + res.sendFile(realPath); // best-effort single-use cleanup after response ends res.on("finish", () => { setTimeout(() => { - fs.rm(file).catch(() => {}); + fs.rm(realPath).catch(() => {}); }, 500); }); } catch { From a34271adf96311b8f11698bcf27b3cdfa6ca8eb5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 2 Dec 2025 18:38:02 +0000 Subject: [PATCH 11/11] chore: credit media fix contributor --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b8a4dbc1a..c31abc7e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. - `warelay logout` now also prunes the shared session store (`~/.warelay/sessions.json`) alongside WhatsApp Web credentials, reducing leftover state after unlinking. - Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention. -- Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. +- Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. (Thanks @joaohlisboa) ## 1.3.0 — 2025-12-02