diff --git a/CHANGELOG.md b/CHANGELOG.md index c31abc7e8..5c7d1245d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,18 @@ # Changelog -## Unreleased +## 1.3.2 — 2025-12-03 + +### Bug Fixes +- Tau/Pi RPC replies are now buffered until the assistant turn finishes and only completed assistant `message_end` events are emitted, preventing duplicate or partial WhatsApp messages. +- Command auto-replies return the parsed assistant texts array only (no deprecated `text` field), while preserving single-payload callers and keeping multi-message replies intact. +- WhatsApp Web auto-replies now fall back to sending the caption text if media delivery fails, so users still see a reply instead of silence. +- Outbound chunking now prefers newlines and word boundaries and only splits when exceeding platform limits, keeping multi-paragraph replies in a single message unless necessary. +- Heartbeat replies now normalize array payloads for both web and Twilio paths and safely handle optional `heartbeatCommand`, preventing TS build failures and runtime crashes when agents return multiple messages. + +### Testing +- Updated agent and auto-reply parsers plus web media send fallbacks; test suite adjusted and now passing after the RPC/message handling refactors. + +## 1.3.1 — 2025-12-02 ### Security - Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior. @@ -8,6 +20,16 @@ - Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention. - Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. (Thanks @joaohlisboa) +### Performance +- Web auto-replies using the Pi agent now keep a single long-lived `tau` process in RPC mode instead of spawning per message, eliminating cold-start latency while preserving session/cwd handling. + +### Bug Fixes +- Media downloads now follow up to 5 redirects and still derive MIME/extension from sniffed content or headers; added regression test for redirected downloads. +- Hosted media responses set `Content-Type` from sniffed MIME (not the file name) and still clean up single-use files after send. +- Claude system prompt is guaranteed to be included on the first session turn even when `sendSystemOnce` is enabled, while later turns stay system-free. +- Media server deletes served files right after the response finishes (instead of a fixed timeout) while keeping MIME sniffing and single-use semantics. +- Tau RPC result typing now exposes `signal`/`killed` fields so TS builds align with runtime data. + ## 1.3.0 — 2025-12-02 ### Highlights @@ -42,6 +64,11 @@ ## Unreleased +### Fixed +- Support multiple assistant text replies when using Tau RPC: agents now emit `texts` arrays and command auto-replies deliver each message separately without leaking raw JSON. +- Normalized agent parsers (pi/claude/opencode/codex/gemini) to the new plural output shape. +- Enforce outbound text size caps: WhatsApp/Twilio messages chunked at 1600 chars; web replies chunked at 4000 chars. + ### Changes - **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests. - **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs. diff --git a/docs/agent.md b/docs/agent.md index eacd853b5..c0eaec718 100644 --- a/docs/agent.md +++ b/docs/agent.md @@ -1,6 +1,6 @@ # Agent Abstraction Refactor Plan -Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact. +Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode, Gemini) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact. ## Overview - Introduce a pluggable agent layer (`src/agents/*`), selected by config. @@ -15,7 +15,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without reply: { mode: "command", agent: { - kind: "claude" | "opencode" | "pi" | "codex", + kind: "claude" | "opencode" | "pi" | "codex" | "gemini", format?: "text" | "json", identityPrefix?: string }, @@ -42,6 +42,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without - `src/agents/opencode.ts` – reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix. - `src/agents/pi.ts` – parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags. - `src/agents/codex.ts` – parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only. +- `src/agents/gemini.ts` – minimal parsing (plain text), identity prepend, honors `--output-format` when `format` is set, and defaults to `--resume {{SessionId}}` for session resume (new sessions need no flag). Override `sessionArgNew/sessionArgResume` if you use a different session strategy. - Shared MEDIA extraction stays in `media/parse.ts`. ## Command runner changes diff --git a/src/agents/agents.test.ts b/src/agents/agents.test.ts index 06e64037e..da40b2109 100644 --- a/src/agents/agents.test.ts +++ b/src/agents/agents.test.ts @@ -4,6 +4,7 @@ import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js"; import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js"; import { claudeSpec } from "./claude.js"; import { codexSpec } from "./codex.js"; +import { GEMINI_IDENTITY_PREFIX, geminiSpec } from "./gemini.js"; import { opencodeSpec } from "./opencode.js"; import { piSpec } from "./pi.js"; @@ -61,7 +62,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}', ].join("\n"); const parsed = piSpec.parseOutput(stdout); - expect(parsed.text).toBe("hello world"); + expect(parsed.texts?.[0]).toBe("hello world"); expect(parsed.meta?.provider).toBe("inflection"); expect((parsed.meta?.usage as { output?: number })?.output).toBe(5); }); @@ -72,7 +73,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}', ].join("\n"); const parsed = codexSpec.parseOutput(stdout); - expect(parsed.text).toBe("hi there"); + expect(parsed.texts?.[0]).toBe("hi there"); const usage = parsed.meta?.usage as { input?: number; output?: number; @@ -92,7 +93,7 @@ describe("agent buildArgs + parseOutput helpers", () => { '{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}', ].join("\n"); const parsed = opencodeSpec.parseOutput(stdout); - expect(parsed.text).toBe("hi"); + expect(parsed.texts?.[0]).toBe("hi"); expect(parsed.meta?.extra?.summary).toContain("duration=1200ms"); expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020"); expect(parsed.meta?.extra?.summary).toContain("tokens=100+20"); @@ -115,4 +116,33 @@ describe("agent buildArgs + parseOutput helpers", () => { expect(built).toContain("--skip-git-repo-check"); expect(built).toContain("read-only"); }); + + it("geminiSpec prepends identity unless already sent", () => { + const argv = ["gemini", "hi"]; + const built = geminiSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: true, + sessionId: "sess", + sendSystemOnce: false, + systemSent: false, + identityPrefix: undefined, + format: "json", + }); + expect(built.at(-1)).toContain(GEMINI_IDENTITY_PREFIX); + + const builtOnce = geminiSpec.buildArgs({ + argv, + bodyIndex: 1, + isNewSession: false, + sessionId: "sess", + sendSystemOnce: true, + systemSent: true, + identityPrefix: undefined, + format: "json", + }); + expect(builtOnce.at(-1)).toBe("hi"); + expect(builtOnce).toContain("--output-format"); + expect(builtOnce).toContain("json"); + }); }); diff --git a/src/agents/claude.ts b/src/agents/claude.ts index 261dfe4ce..22f8e7ed6 100644 --- a/src/agents/claude.ts +++ b/src/agents/claude.ts @@ -12,7 +12,16 @@ import type { AgentMeta, AgentSpec } from "./types.js"; function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined { if (!parsed?.parsed) return undefined; const summary = summarizeClaudeMetadata(parsed.parsed); - return summary ? { extra: { summary } } : undefined; + const sessionId = + parsed.parsed && + typeof parsed.parsed === "object" && + typeof (parsed.parsed as { session_id?: unknown }).session_id === "string" + ? (parsed.parsed as { session_id: string }).session_id + : undefined; + const meta: AgentMeta = {}; + if (sessionId) meta.sessionId = sessionId; + if (summary) meta.extra = { summary }; + return Object.keys(meta).length ? meta : undefined; } export const claudeSpec: AgentSpec = { @@ -60,7 +69,7 @@ export const claudeSpec: AgentSpec = { const parsed = parseClaudeJson(rawStdout); const text = parsed?.text ?? rawStdout.trim(); return { - text: text?.trim(), + texts: text ? [text.trim()] : undefined, meta: toMeta(parsed), }; }, diff --git a/src/agents/codex.ts b/src/agents/codex.ts index da1cd29a2..94b17dd11 100644 --- a/src/agents/codex.ts +++ b/src/agents/codex.ts @@ -4,7 +4,7 @@ import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js"; function parseCodexJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); - let text: string | undefined; + const texts: string[] = []; let meta: AgentMeta | undefined; for (const line of lines) { @@ -21,7 +21,7 @@ function parseCodexJson(raw: string): AgentParseResult { ev.item?.type === "agent_message" && typeof ev.item.text === "string" ) { - text = ev.item.text; + texts.push(ev.item.text); } if ( ev.type === "turn.completed" && @@ -50,7 +50,8 @@ function parseCodexJson(raw: string): AgentParseResult { } } - return { text: text?.trim(), meta }; + const finalTexts = texts.length ? texts.map((t) => t.trim()) : undefined; + return { texts: finalTexts, meta }; } export const codexSpec: AgentSpec = { diff --git a/src/agents/gemini.ts b/src/agents/gemini.ts new file mode 100644 index 000000000..ce0d68165 --- /dev/null +++ b/src/agents/gemini.ts @@ -0,0 +1,54 @@ +import path from "node:path"; + +import type { AgentParseResult, AgentSpec } from "./types.js"; + +const GEMINI_BIN = "gemini"; +export const GEMINI_IDENTITY_PREFIX = + "You are Gemini responding for warelay. Keep WhatsApp replies concise (<1500 chars). If the prompt contains media paths or a Transcript block, use them. If this was a heartbeat probe and nothing needs attention, reply with exactly HEARTBEAT_OK."; + +// Gemini CLI currently prints plain text; --output json is flaky across versions, so we +// keep parsing minimal and let MEDIA token stripping happen later in the pipeline. +function parseGeminiOutput(raw: string): AgentParseResult { + const trimmed = raw.trim(); + const text = trimmed || undefined; + return { + texts: text ? [text] : undefined, + meta: undefined, + } satisfies AgentParseResult; +} + +export const geminiSpec: AgentSpec = { + kind: "gemini", + isInvocation: (argv) => + argv.length > 0 && path.basename(argv[0]) === GEMINI_BIN, + buildArgs: (ctx) => { + const argv = [...ctx.argv]; + const body = argv[ctx.bodyIndex] ?? ""; + const beforeBody = argv.slice(0, ctx.bodyIndex); + const afterBody = argv.slice(ctx.bodyIndex + 1); + + if (ctx.format) { + const hasOutput = + beforeBody.some( + (p) => p === "--output-format" || p.startsWith("--output-format="), + ) || + afterBody.some( + (p) => p === "--output-format" || p.startsWith("--output-format="), + ); + if (!hasOutput) { + beforeBody.push("--output-format", ctx.format); + } + } + + const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent); + const bodyWithIdentity = + shouldPrependIdentity && body + ? [ctx.identityPrefix ?? GEMINI_IDENTITY_PREFIX, body] + .filter(Boolean) + .join("\n\n") + : body; + + return [...beforeBody, bodyWithIdentity, ...afterBody]; + }, + parseOutput: parseGeminiOutput, +}; diff --git a/src/agents/index.ts b/src/agents/index.ts index 231d8a3eb..9e475cf59 100644 --- a/src/agents/index.ts +++ b/src/agents/index.ts @@ -1,5 +1,6 @@ import { claudeSpec } from "./claude.js"; import { codexSpec } from "./codex.js"; +import { geminiSpec } from "./gemini.js"; import { opencodeSpec } from "./opencode.js"; import { piSpec } from "./pi.js"; import type { AgentKind, AgentSpec } from "./types.js"; @@ -7,6 +8,7 @@ import type { AgentKind, AgentSpec } from "./types.js"; const specs: Record = { claude: claudeSpec, codex: codexSpec, + gemini: geminiSpec, opencode: opencodeSpec, pi: piSpec, }; @@ -15,4 +17,4 @@ export function getAgentSpec(kind: AgentKind): AgentSpec { return specs[kind]; } -export { AgentKind, AgentMeta, AgentParseResult } from "./types.js"; +export type { AgentKind, AgentMeta, AgentParseResult } from "./types.js"; diff --git a/src/agents/opencode.ts b/src/agents/opencode.ts index c458d94c1..836b7c4e0 100644 --- a/src/agents/opencode.ts +++ b/src/agents/opencode.ts @@ -55,7 +55,7 @@ export const opencodeSpec: AgentSpec = { const parsed = parseOpencodeJson(rawStdout); const text = parsed.text ?? rawStdout.trim(); return { - text: text?.trim(), + texts: text ? [text.trim()] : undefined, meta: toMeta(parsed), }; }, diff --git a/src/agents/pi.ts b/src/agents/pi.ts index c7359b98e..a9a75ff07 100644 --- a/src/agents/pi.ts +++ b/src/agents/pi.ts @@ -13,41 +13,63 @@ type PiAssistantMessage = { function parsePiJson(raw: string): AgentParseResult { const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{")); - let lastMessage: PiAssistantMessage | undefined; + + // Collect only completed assistant messages (skip streaming updates/toolcalls). + const texts: string[] = []; + let lastAssistant: PiAssistantMessage | undefined; + let lastPushed: string | undefined; + for (const line of lines) { try { const ev = JSON.parse(line) as { type?: string; message?: PiAssistantMessage; }; - // Pi emits a stream; we only care about the terminal assistant message_end. - if (ev.type === "message_end" && ev.message?.role === "assistant") { - lastMessage = ev.message; + + const isAssistantMessage = + (ev.type === "message" || ev.type === "message_end") && + ev.message?.role === "assistant" && + Array.isArray(ev.message.content); + + if (!isAssistantMessage) continue; + + const msg = ev.message as PiAssistantMessage; + const msgText = msg.content + ?.filter((c) => c?.type === "text" && typeof c.text === "string") + .map((c) => c.text) + .join("\n") + .trim(); + + if (msgText && msgText !== lastPushed) { + texts.push(msgText); + lastPushed = msgText; + lastAssistant = msg; } } catch { - // ignore + // ignore malformed lines } } - const text = - lastMessage?.content - ?.filter((c) => c?.type === "text" && typeof c.text === "string") - .map((c) => c.text) - .join("\n") - ?.trim() ?? undefined; - const meta: AgentMeta | undefined = lastMessage - ? { - model: lastMessage.model, - provider: lastMessage.provider, - stopReason: lastMessage.stopReason, - usage: lastMessage.usage, - } - : undefined; - return { text, meta }; + + const meta: AgentMeta | undefined = + lastAssistant && texts.length + ? { + model: lastAssistant.model, + provider: lastAssistant.provider, + stopReason: lastAssistant.stopReason, + usage: lastAssistant.usage, + } + : undefined; + + return { texts, meta }; } export const piSpec: AgentSpec = { kind: "pi", - isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi", + isInvocation: (argv) => { + if (argv.length === 0) return false; + const base = path.basename(argv[0]).replace(/\.(m?js)$/i, ""); + return base === "pi" || base === "tau"; + }, buildArgs: (ctx) => { const argv = [...ctx.argv]; // Non-interactive print + JSON diff --git a/src/agents/types.ts b/src/agents/types.ts index d430fb296..2c868847f 100644 --- a/src/agents/types.ts +++ b/src/agents/types.ts @@ -1,9 +1,10 @@ -export type AgentKind = "claude" | "opencode" | "pi" | "codex"; +export type AgentKind = "claude" | "opencode" | "pi" | "codex" | "gemini"; export type AgentMeta = { model?: string; provider?: string; stopReason?: string; + sessionId?: string; usage?: { input?: number; output?: number; @@ -15,7 +16,8 @@ export type AgentMeta = { }; export type AgentParseResult = { - text?: string; + // Plural to support agents that emit multiple assistant turns per prompt. + texts?: string[]; mediaUrls?: string[]; meta?: AgentMeta; }; diff --git a/src/auto-reply/chunk.test.ts b/src/auto-reply/chunk.test.ts new file mode 100644 index 000000000..065edc672 --- /dev/null +++ b/src/auto-reply/chunk.test.ts @@ -0,0 +1,47 @@ +import { describe, expect, it } from "vitest"; + +import { chunkText } from "./chunk.js"; + +describe("chunkText", () => { + it("keeps multi-line text in one chunk when under limit", () => { + const text = "Line one\n\nLine two\n\nLine three"; + const chunks = chunkText(text, 1600); + expect(chunks).toEqual([text]); + }); + + it("splits only when text exceeds the limit", () => { + const part = "a".repeat(20); + const text = part.repeat(5); // 100 chars + const chunks = chunkText(text, 60); + expect(chunks.length).toBe(2); + expect(chunks[0].length).toBe(60); + expect(chunks[1].length).toBe(40); + expect(chunks.join("")).toBe(text); + }); + + it("prefers breaking at a newline before the limit", () => { + const text = `paragraph one line\n\nparagraph two starts here and continues`; + const chunks = chunkText(text, 40); + expect(chunks).toEqual([ + "paragraph one line", + "paragraph two starts here and continues", + ]); + }); + + it("otherwise breaks at the last whitespace under the limit", () => { + const text = + "This is a message that should break nicely near a word boundary."; + const chunks = chunkText(text, 30); + expect(chunks[0].length).toBeLessThanOrEqual(30); + expect(chunks[1].length).toBeLessThanOrEqual(30); + expect(chunks.join(" ").replace(/\s+/g, " ").trim()).toBe( + text.replace(/\s+/g, " ").trim(), + ); + }); + + it("falls back to a hard break when no whitespace is present", () => { + const text = "Supercalifragilisticexpialidocious"; // 34 chars + const chunks = chunkText(text, 10); + expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]); + }); +}); diff --git a/src/auto-reply/chunk.ts b/src/auto-reply/chunk.ts new file mode 100644 index 000000000..a9470db87 --- /dev/null +++ b/src/auto-reply/chunk.ts @@ -0,0 +1,51 @@ +// Utilities for splitting outbound text into platform-sized chunks without +// unintentionally breaking on newlines. Using [\s\S] keeps newlines inside +// the chunk so messages are only split when they truly exceed the limit. + +export function chunkText(text: string, limit: number): string[] { + if (!text) return []; + if (limit <= 0) return [text]; + if (text.length <= limit) return [text]; + + const chunks: string[] = []; + let remaining = text; + + while (remaining.length > limit) { + const window = remaining.slice(0, limit); + + // 1) Prefer a newline break inside the window. + let breakIdx = window.lastIndexOf("\n"); + + // 2) Otherwise prefer the last whitespace (word boundary) inside the window. + if (breakIdx <= 0) { + for (let i = window.length - 1; i >= 0; i--) { + if (/\s/.test(window[i])) { + breakIdx = i; + break; + } + } + } + + // 3) Fallback: hard break exactly at the limit. + if (breakIdx <= 0) breakIdx = limit; + + const rawChunk = remaining.slice(0, breakIdx); + const chunk = rawChunk.trimEnd(); + if (chunk.length > 0) { + chunks.push(chunk); + } + + // If we broke on whitespace/newline, skip that separator; for hard breaks keep it. + const brokeOnSeparator = + breakIdx < remaining.length && /\s/.test(remaining[breakIdx]); + const nextStart = Math.min( + remaining.length, + breakIdx + (brokeOnSeparator ? 1 : 0), + ); + remaining = remaining.slice(nextStart).trimStart(); + } + + if (remaining.length) chunks.push(remaining); + + return chunks; +} diff --git a/src/auto-reply/claude.ts b/src/auto-reply/claude.ts index ca3cfa9df..bd7841dad 100644 --- a/src/auto-reply/claude.ts +++ b/src/auto-reply/claude.ts @@ -4,7 +4,7 @@ import { z } from "zod"; // Preferred binary name for Claude CLI invocations. export const CLAUDE_BIN = "claude"; export const CLAUDE_IDENTITY_PREFIX = - "You are Clawd (Claude) running on the user's Mac via warelay. Your scratchpad is /Users/steipete/clawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK."; + "You are Clawd (Claude) running on the user's Mac via warelay. Keep WhatsApp replies under ~1500 characters. Your scratchpad is ~/clawd; this is your folder and you can add what you like in markdown files and/or images. You can send media by including MEDIA:/path/to/file.jpg on its own line (no spaces in path). Media limits: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK."; function extractClaudeText(payload: unknown): string | undefined { // Best-effort walker to find the primary text field in Claude JSON outputs. diff --git a/src/auto-reply/command-reply.test.ts b/src/auto-reply/command-reply.test.ts index 01a32ca0e..2c6d09267 100644 --- a/src/auto-reply/command-reply.test.ts +++ b/src/auto-reply/command-reply.test.ts @@ -66,7 +66,7 @@ describe("runCommandReply", () => { it("injects claude flags and identity prefix", async () => { const captures: ReplyPayload[] = []; const runner = makeRunner({ stdout: "ok" }, captures); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["claude", "{{Body}}"], @@ -83,6 +83,7 @@ describe("runCommandReply", () => { enqueue: enqueueImmediate, }); + const payload = payloads?.[0]; expect(payload?.text).toBe("ok"); const finalArgv = captures[0].argv as string[]; expect(finalArgv).toContain("--output-format"); @@ -192,7 +193,7 @@ describe("runCommandReply", () => { const runner = vi.fn(async () => { throw { stdout: "partial output here", killed: true, signal: "SIGKILL" }; }); - const { payload, meta } = await runCommandReply({ + const { payloads, meta } = await runCommandReply({ reply: { mode: "command", command: ["echo", "hi"], @@ -208,6 +209,7 @@ describe("runCommandReply", () => { commandRunner: runner, enqueue: enqueueImmediate, }); + const payload = payloads?.[0]; expect(payload?.text).toContain("Command timed out after 1s"); expect(payload?.text).toContain("partial output"); expect(meta.killed).toBe(true); @@ -217,7 +219,7 @@ describe("runCommandReply", () => { const runner = vi.fn(async () => { throw { stdout: "", killed: true, signal: "SIGKILL" }; }); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["echo", "hi"], @@ -234,6 +236,7 @@ describe("runCommandReply", () => { commandRunner: runner, enqueue: enqueueImmediate, }); + const payload = payloads?.[0]; expect(payload?.text).toContain("(cwd: /tmp/work)"); }); @@ -244,7 +247,7 @@ describe("runCommandReply", () => { const runner = makeRunner({ stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`, }); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["echo", "hi"], @@ -261,6 +264,7 @@ describe("runCommandReply", () => { commandRunner: runner, enqueue: enqueueImmediate, }); + const payload = payloads?.[0]; expect(payload?.mediaUrls).toEqual(["https://example.com/img.jpg"]); await fs.unlink(tmp); }); @@ -318,7 +322,7 @@ describe("runCommandReply", () => { const runner = makeRunner({ stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}', }); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["claude", "{{Body}}"], @@ -335,6 +339,7 @@ describe("runCommandReply", () => { enqueue: enqueueImmediate, }); // Should NOT contain raw JSON - empty result should produce fallback message + const payload = payloads?.[0]; expect(payload?.text).not.toContain('{"result"'); expect(payload?.text).toContain("command produced no output"); }); @@ -343,7 +348,7 @@ describe("runCommandReply", () => { const runner = makeRunner({ stdout: '{"text":"","duration_ms":50}', }); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["claude", "{{Body}}"], @@ -360,6 +365,7 @@ describe("runCommandReply", () => { enqueue: enqueueImmediate, }); // Empty text should produce fallback message, not raw JSON + const payload = payloads?.[0]; expect(payload?.text).not.toContain('{"text"'); expect(payload?.text).toContain("command produced no output"); }); @@ -368,7 +374,7 @@ describe("runCommandReply", () => { const runner = makeRunner({ stdout: '{"result":"hello world","duration_ms":50}', }); - const { payload } = await runCommandReply({ + const { payloads } = await runCommandReply({ reply: { mode: "command", command: ["claude", "{{Body}}"], @@ -384,6 +390,7 @@ describe("runCommandReply", () => { commandRunner: runner, enqueue: enqueueImmediate, }); + const payload = payloads?.[0]; expect(payload?.text).toBe("hello world"); }); }); diff --git a/src/auto-reply/command-reply.ts b/src/auto-reply/command-reply.ts index eb1838be0..c89d5f4d6 100644 --- a/src/auto-reply/command-reply.ts +++ b/src/auto-reply/command-reply.ts @@ -6,9 +6,11 @@ import type { AgentMeta } from "../agents/types.js"; import type { WarelayConfig } from "../config/config.js"; import { isVerbose, logVerbose } from "../globals.js"; import { logError } from "../logger.js"; +import { getChildLogger } from "../logging.js"; import { splitMediaFromOutput } from "../media/parse.js"; import { enqueueCommand } from "../process/command-queue.js"; import type { runCommandWithTimeout } from "../process/exec.js"; +import { runPiRpc } from "../process/tau-rpc.js"; import { applyTemplate, type TemplateContext } from "./templating.js"; import type { ReplyPayload } from "./types.js"; @@ -18,6 +20,8 @@ type CommandReplyConfig = NonNullable["reply"] & { type EnqueueRunner = typeof enqueueCommand; +type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high"; + type CommandReplyParams = { reply: CommandReplyConfig; templatingCtx: TemplateContext; @@ -29,6 +33,7 @@ type CommandReplyParams = { timeoutSeconds: number; commandRunner: typeof runCommandWithTimeout; enqueue?: EnqueueRunner; + thinkLevel?: ThinkLevel; }; export type CommandReplyMeta = { @@ -42,7 +47,7 @@ export type CommandReplyMeta = { }; export type CommandReplyResult = { - payload?: ReplyPayload; + payloads?: ReplyPayload[]; meta: CommandReplyMeta; }; @@ -96,9 +101,34 @@ export function summarizeClaudeMetadata(payload: unknown): string | undefined { return parts.length ? parts.join(", ") : undefined; } +function appendThinkingCue(body: string, level?: ThinkLevel): string { + if (!level || level === "off") return body; + const cue = (() => { + switch (level) { + case "high": + return "ultrathink"; + case "medium": + return "think harder"; + case "low": + return "think hard"; + case "minimal": + return "think"; + default: + return ""; + } + })(); + return [body.trim(), cue].filter(Boolean).join(" "); +} + export async function runCommandReply( params: CommandReplyParams, ): Promise { + const logger = getChildLogger({ module: "command-reply" }); + const verboseLog = (msg: string) => { + logger.debug(msg); + if (isVerbose()) logVerbose(msg); + }; + const { reply, templatingCtx, @@ -110,6 +140,7 @@ export async function runCommandReply( timeoutSeconds, commandRunner, enqueue = enqueueCommand, + thinkLevel, } = params; if (!reply.command?.length) { @@ -133,14 +164,25 @@ export async function runCommandReply( // Session args prepared (templated) and injected generically if (reply.session) { - const defaultNew = - agentCfg.kind === "claude" - ? ["--session-id", "{{SessionId}}"] - : ["--session", "{{SessionId}}"]; - const defaultResume = - agentCfg.kind === "claude" - ? ["--resume", "{{SessionId}}"] - : ["--session", "{{SessionId}}"]; + const defaultSessionArgs = (() => { + switch (agentCfg.kind) { + case "claude": + return { + newArgs: ["--session-id", "{{SessionId}}"], + resumeArgs: ["--resume", "{{SessionId}}"], + }; + case "gemini": + // Gemini CLI supports --resume ; starting a new session needs no flag. + return { newArgs: [], resumeArgs: ["--resume", "{{SessionId}}"] }; + default: + return { + newArgs: ["--session", "{{SessionId}}"], + resumeArgs: ["--session", "{{SessionId}}"], + }; + } + })(); + const defaultNew = defaultSessionArgs.newArgs; + const defaultResume = defaultSessionArgs.resumeArgs; const sessionArgList = ( isNewSession ? (reply.session.sessionArgNew ?? defaultNew) @@ -159,6 +201,23 @@ export async function runCommandReply( } } + if (thinkLevel && thinkLevel !== "off") { + if (agentKind === "pi") { + const hasThinkingFlag = argv.some( + (p, i) => + p === "--thinking" || + (i > 0 && argv[i - 1] === "--thinking") || + p.startsWith("--thinking="), + ); + if (!hasThinkingFlag) { + argv.splice(bodyIndex, 0, "--thinking", thinkLevel); + bodyIndex += 2; + } + } else if (argv[bodyIndex]) { + argv[bodyIndex] = appendThinkingCue(argv[bodyIndex] ?? "", thinkLevel); + } + } + const shouldApplyAgent = agent.isInvocation(argv); const finalArgv = shouldApplyAgent ? agent.buildArgs({ @@ -176,55 +235,115 @@ export async function runCommandReply( logVerbose( `Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`, ); + logger.info( + { + agent: agentKind, + sessionId: templatingCtx.SessionId, + newSession: isNewSession, + cwd: reply.cwd, + command: finalArgv.slice(0, -1), // omit body to reduce noise + }, + "command auto-reply start", + ); const started = Date.now(); let queuedMs: number | undefined; let queuedAhead: number | undefined; try { - const { stdout, stderr, code, signal, killed } = await enqueue( - () => commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }), - { - onWait: (waitMs, ahead) => { - queuedMs = waitMs; - queuedAhead = ahead; - if (isVerbose()) { - logVerbose( - `Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`, - ); + const run = async () => { + // Prefer long-lived tau RPC for pi agent to avoid cold starts. + if (agentKind === "pi") { + const promptIndex = finalArgv.length - 1; + const body = finalArgv[promptIndex] ?? ""; + // Build rpc args without the prompt body; force --mode rpc. + const rpcArgv = (() => { + const copy = [...finalArgv]; + copy.splice(promptIndex, 1); + const modeIdx = copy.indexOf("--mode"); + if (modeIdx >= 0 && copy[modeIdx + 1]) { + copy.splice(modeIdx, 2, "--mode", "rpc"); + } else if (!copy.includes("--mode")) { + copy.splice(copy.length - 1, 0, "--mode", "rpc"); } - }, + return copy; + })(); + return await runPiRpc({ + argv: rpcArgv, + cwd: reply.cwd, + prompt: body, + timeoutMs, + }); + } + return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }); + }; + + const { stdout, stderr, code, signal, killed } = await enqueue(run, { + onWait: (waitMs, ahead) => { + queuedMs = waitMs; + queuedAhead = ahead; + if (isVerbose()) { + logVerbose( + `Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`, + ); + } }, - ); + }); const rawStdout = stdout.trim(); let mediaFromCommand: string[] | undefined; - let trimmed = rawStdout; + const trimmed = rawStdout; if (stderr?.trim()) { logVerbose(`Command auto-reply stderr: ${stderr.trim()}`); } const parsed = trimmed ? agent.parseOutput(trimmed) : undefined; - // Treat empty string as "no content" so we can fall back to the friendly - // "(command produced no output)" message instead of echoing raw JSON. - if (parsed && parsed.text !== undefined) { - trimmed = parsed.text.trim(); + const parserProvided = !!parsed; + + // Collect one message per assistant text from parseOutput (tau RPC can emit many). + const parsedTexts = + parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? []; + + type ReplyItem = { text: string; media?: string[] }; + const replyItems: ReplyItem[] = []; + + for (const t of parsedTexts) { + const { text: cleanedText, mediaUrls: mediaFound } = + splitMediaFromOutput(t); + replyItems.push({ + text: cleanedText, + media: mediaFound?.length ? mediaFound : undefined, + }); } - const { text: cleanedText, mediaUrls: mediaFound } = - splitMediaFromOutput(trimmed); - trimmed = cleanedText; - if (mediaFound?.length) { - mediaFromCommand = mediaFound; - if (isVerbose()) logVerbose(`MEDIA token extracted: ${mediaFound}`); - } else if (isVerbose()) { - logVerbose("No MEDIA token extracted from final text"); + // If parser gave nothing, fall back to raw stdout as a single message. + if (replyItems.length === 0 && trimmed && !parserProvided) { + const { text: cleanedText, mediaUrls: mediaFound } = + splitMediaFromOutput(trimmed); + if (cleanedText || mediaFound?.length) { + replyItems.push({ + text: cleanedText, + media: mediaFound?.length ? mediaFound : undefined, + }); + } } - if (!trimmed && !mediaFromCommand) { + + // No content at all → fallback notice. + if (replyItems.length === 0) { const meta = parsed?.meta?.extra?.summary ?? undefined; - trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`; - logVerbose("No text/media produced; injecting fallback notice to user"); + replyItems.push({ + text: `(command produced no output${meta ? `; ${meta}` : ""})`, + }); + verboseLog("No text/media produced; injecting fallback notice to user"); } - logVerbose(`Command auto-reply stdout (trimmed): ${trimmed || ""}`); - logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`); + + verboseLog( + `Command auto-reply stdout produced ${replyItems.length} message(s)`, + ); + const elapsed = Date.now() - started; + verboseLog(`Command auto-reply finished in ${elapsed}ms`); + logger.info( + { durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, + "command auto-reply finished", + ); if ((code ?? 0) !== 0) { console.error( `Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`, @@ -235,7 +354,7 @@ export async function runCommandReply( : ""; const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: Date.now() - started, queuedMs, @@ -253,7 +372,7 @@ export async function runCommandReply( ); const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: Date.now() - started, queuedMs, @@ -265,43 +384,6 @@ export async function runCommandReply( }, }; } - let mediaUrls = - mediaFromCommand ?? (reply.mediaUrl ? [reply.mediaUrl] : undefined); - - // If mediaMaxMb is set, skip local media paths larger than the cap. - if (mediaUrls?.length && reply.mediaMaxMb) { - const maxBytes = reply.mediaMaxMb * 1024 * 1024; - const filtered: string[] = []; - for (const url of mediaUrls) { - if (/^https?:\/\//i.test(url)) { - filtered.push(url); - continue; - } - const abs = path.isAbsolute(url) ? url : path.resolve(url); - try { - const stats = await fs.stat(abs); - if (stats.size <= maxBytes) { - filtered.push(url); - } else if (isVerbose()) { - logVerbose( - `Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`, - ); - } - } catch { - filtered.push(url); - } - } - mediaUrls = filtered; - } - - const payload = - trimmed || mediaUrls?.length - ? { - text: trimmed || undefined, - mediaUrl: mediaUrls?.[0], - mediaUrls, - } - : undefined; const meta: CommandReplyMeta = { durationMs: Date.now() - started, queuedMs, @@ -311,17 +393,67 @@ export async function runCommandReply( killed, agentMeta: parsed?.meta, }; - if (isVerbose()) { - logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`); + + const payloads: ReplyPayload[] = []; + + // Build each reply item sequentially (delivery handled by caller). + for (const item of replyItems) { + let mediaUrls = + item.media ?? + mediaFromCommand ?? + (reply.mediaUrl ? [reply.mediaUrl] : undefined); + + // If mediaMaxMb is set, skip local media paths larger than the cap. + if (mediaUrls?.length && reply.mediaMaxMb) { + const maxBytes = reply.mediaMaxMb * 1024 * 1024; + const filtered: string[] = []; + for (const url of mediaUrls) { + if (/^https?:\/\//i.test(url)) { + filtered.push(url); + continue; + } + const abs = path.isAbsolute(url) ? url : path.resolve(url); + try { + const stats = await fs.stat(abs); + if (stats.size <= maxBytes) { + filtered.push(url); + } else if (isVerbose()) { + logVerbose( + `Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`, + ); + } + } catch { + filtered.push(url); + } + } + mediaUrls = filtered; + } + + const payload = + item.text || mediaUrls?.length + ? { + text: item.text || undefined, + mediaUrl: mediaUrls?.[0], + mediaUrls, + } + : undefined; + + if (payload) payloads.push(payload); } - return { payload, meta }; + + verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`); + return { payloads, meta }; } catch (err) { const elapsed = Date.now() - started; + logger.info( + { durationMs: elapsed, agent: agentKind, cwd: reply.cwd }, + "command auto-reply failed", + ); const anyErr = err as { killed?: boolean; signal?: string }; const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL"; const errorObj = err as { stdout?: string; stderr?: string }; if (errorObj.stderr?.trim()) { - logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); + verboseLog(`Command auto-reply stderr: ${errorObj.stderr.trim()}`); } if (timeoutHit) { console.error( @@ -339,7 +471,7 @@ export async function runCommandReply( ? `${baseMsg}\n\nPartial output before timeout:\n${partialSnippet}` : baseMsg; return { - payload: { text }, + payloads: [{ text }], meta: { durationMs: elapsed, queuedMs, @@ -355,7 +487,7 @@ export async function runCommandReply( const errMsg = err instanceof Error ? err.message : String(err); const errorText = `⚠️ Command failed: ${errMsg}`; return { - payload: { text: errorText }, + payloads: [{ text: errorText }], meta: { durationMs: elapsed, queuedMs, diff --git a/src/auto-reply/heartbeat-prehook.test.ts b/src/auto-reply/heartbeat-prehook.test.ts deleted file mode 100644 index 06b0a07b1..000000000 --- a/src/auto-reply/heartbeat-prehook.test.ts +++ /dev/null @@ -1,261 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; -import type { WarelayConfig } from "../config/config.js"; -import type { SpawnResult } from "../process/exec.js"; -import { - buildHeartbeatPrompt, - runHeartbeatPreHook, -} from "./heartbeat-prehook.js"; - -describe("buildHeartbeatPrompt", () => { - it("returns base prompt when no context", () => { - expect(buildHeartbeatPrompt("HEARTBEAT ultrathink")).toBe( - "HEARTBEAT ultrathink", - ); - expect(buildHeartbeatPrompt("HEARTBEAT ultrathink", "")).toBe( - "HEARTBEAT ultrathink", - ); - expect(buildHeartbeatPrompt("HEARTBEAT ultrathink", " ")).toBe( - "HEARTBEAT ultrathink", - ); - }); - - it("appends context when provided", () => { - const result = buildHeartbeatPrompt( - "HEARTBEAT ultrathink", - "You have 3 unread emails", - ); - expect(result).toBe( - "HEARTBEAT ultrathink\n\n---\nContext from pre-hook:\nYou have 3 unread emails", - ); - }); - - it("trims context whitespace", () => { - const result = buildHeartbeatPrompt("HEARTBEAT", " context with spaces "); - expect(result).toContain("context with spaces"); - expect(result).not.toContain(" context"); - }); -}); - -describe("runHeartbeatPreHook", () => { - it("returns empty result when no pre-hook configured", async () => { - const cfg: WarelayConfig = {}; - const result = await runHeartbeatPreHook(cfg); - expect(result.durationMs).toBe(0); - expect(result.context).toBeUndefined(); - expect(result.error).toBeUndefined(); - }); - - it("returns empty result when pre-hook is empty array", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: [], - }, - }, - }, - }; - const result = await runHeartbeatPreHook(cfg); - expect(result.durationMs).toBe(0); - expect(result.context).toBeUndefined(); - }); - - it("returns stdout as context on success", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["echo", "email summary"], - }, - }, - }, - }; - const mockRunner = vi.fn().mockResolvedValue({ - stdout: "email summary\n", - stderr: "", - code: 0, - signal: null, - killed: false, - } satisfies SpawnResult); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.context).toBe("email summary"); - expect(result.error).toBeUndefined(); - expect(mockRunner).toHaveBeenCalledWith(["echo", "email summary"], { - timeoutMs: 30000, - }); - }); - - it("returns error on non-zero exit", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["failing-script"], - }, - }, - }, - }; - const mockRunner = vi.fn().mockResolvedValue({ - stdout: "", - stderr: "error output", - code: 1, - signal: null, - killed: false, - } satisfies SpawnResult); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.context).toBeUndefined(); - expect(result.error).toContain("exited with code 1"); - }); - - it("handles timeout gracefully", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["slow-script"], - heartbeatPreHookTimeoutSeconds: 5, - }, - }, - }, - }; - const mockRunner = vi.fn().mockResolvedValue({ - stdout: "", - stderr: "", - code: null, - signal: "SIGKILL", - killed: true, - } satisfies SpawnResult); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.timedOut).toBe(true); - expect(result.error).toContain("timed out"); - expect(result.context).toBeUndefined(); - }); - - it("uses custom timeout from config", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["script"], - heartbeatPreHookTimeoutSeconds: 60, - }, - }, - }, - }; - const mockRunner = vi.fn().mockResolvedValue({ - stdout: "ok", - stderr: "", - code: 0, - signal: null, - killed: false, - } satisfies SpawnResult); - - await runHeartbeatPreHook(cfg, mockRunner); - expect(mockRunner).toHaveBeenCalledWith(["script"], { timeoutMs: 60000 }); - }); - - it("returns empty context for whitespace-only stdout", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["script"], - }, - }, - }, - }; - const mockRunner = vi.fn().mockResolvedValue({ - stdout: " \n\n ", - stderr: "", - code: 0, - signal: null, - killed: false, - } satisfies SpawnResult); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.context).toBeUndefined(); - expect(result.error).toBeUndefined(); - }); - - it("handles thrown error from command runner", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["script"], - }, - }, - }, - }; - const mockRunner = vi.fn().mockRejectedValue(new Error("spawn ENOENT")); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.error).toBe("spawn ENOENT"); - expect(result.context).toBeUndefined(); - }); - - it("handles thrown timeout error (killed property)", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["script"], - }, - }, - }, - }; - const timeoutError = new Error("Command timed out"); - (timeoutError as unknown as { killed: boolean }).killed = true; - const mockRunner = vi.fn().mockRejectedValue(timeoutError); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.timedOut).toBe(true); - expect(result.error).toContain("timed out"); - }); - - it("caps large stdout to max size", async () => { - const cfg: WarelayConfig = { - inbound: { - reply: { - mode: "command", - command: ["echo"], - session: { - heartbeatPreHook: ["script"], - }, - }, - }, - }; - const largeOutput = "x".repeat(10000); - const mockRunner = vi.fn().mockResolvedValue({ - stdout: largeOutput, - stderr: "", - code: 0, - signal: null, - killed: false, - } satisfies SpawnResult); - - const result = await runHeartbeatPreHook(cfg, mockRunner); - expect(result.context).toBeDefined(); - expect(result.context?.length).toBeLessThan(largeOutput.length); - expect(result.context).toContain("...[truncated]"); - }); -}); diff --git a/src/auto-reply/heartbeat-prehook.ts b/src/auto-reply/heartbeat-prehook.ts deleted file mode 100644 index 5ed44dbef..000000000 --- a/src/auto-reply/heartbeat-prehook.ts +++ /dev/null @@ -1,118 +0,0 @@ -import type { WarelayConfig } from "../config/config.js"; -import { danger, logVerbose } from "../globals.js"; -import { logDebug, logWarn } from "../logger.js"; -import { runCommandWithTimeout, type SpawnResult } from "../process/exec.js"; - -export type PreHookResult = { - context?: string; - durationMs: number; - error?: string; - timedOut?: boolean; -}; - -const DEFAULT_PREHOOK_TIMEOUT_SECONDS = 30; -const MAX_CONTEXT_CHARS = 8000; - -export function buildHeartbeatPrompt( - basePrompt: string, - preHookContext?: string, -): string { - if (!preHookContext?.trim()) { - return basePrompt; - } - return `${basePrompt}\n\n---\nContext from pre-hook:\n${preHookContext.trim()}`; -} - -function capContextSize(stdout: string): string { - const trimmed = stdout.trim(); - if (trimmed.length <= MAX_CONTEXT_CHARS) { - return trimmed; - } - return `${trimmed.slice(0, MAX_CONTEXT_CHARS)}...[truncated]`; -} - -export async function runHeartbeatPreHook( - cfg: WarelayConfig, - commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout, -): Promise { - const sessionCfg = cfg.inbound?.reply?.session; - const preHookCommand = sessionCfg?.heartbeatPreHook; - - if (!preHookCommand?.length) { - return { durationMs: 0 }; - } - - const timeoutSeconds = - sessionCfg?.heartbeatPreHookTimeoutSeconds ?? - DEFAULT_PREHOOK_TIMEOUT_SECONDS; - const timeoutMs = timeoutSeconds * 1000; - const started = Date.now(); - - logVerbose(`Running heartbeat pre-hook: ${preHookCommand.join(" ")}`); - - try { - const result: SpawnResult = await commandRunner(preHookCommand, { - timeoutMs, - }); - const durationMs = Date.now() - started; - - if (result.killed || result.signal === "SIGKILL") { - const stderrPreview = result.stderr?.trim().slice(0, 200) || "(empty)"; - logWarn(`Heartbeat pre-hook timed out after ${timeoutSeconds}s`); - logDebug(`Pre-hook stderr preview: ${stderrPreview}`); - return { - durationMs, - timedOut: true, - error: `Pre-hook timed out after ${timeoutSeconds}s`, - }; - } - - if ((result.code ?? 0) !== 0) { - const stderrPreview = result.stderr?.trim().slice(0, 200) || "(empty)"; - const errorMsg = `Pre-hook exited with code ${result.code}`; - logWarn(errorMsg); - logDebug(`Pre-hook stderr preview: ${stderrPreview}`); - return { - durationMs, - error: errorMsg, - }; - } - - const stdout = result.stdout?.trim(); - logVerbose( - `Pre-hook completed in ${durationMs}ms, output length: ${stdout?.length ?? 0}`, - ); - - if (stdout) { - logDebug( - `Pre-hook output: ${stdout.slice(0, 200)}${stdout.length > 200 ? "..." : ""}`, - ); - } - - const cappedContext = stdout ? capContextSize(stdout) : undefined; - - return { - context: cappedContext || undefined, - durationMs, - }; - } catch (err) { - const durationMs = Date.now() - started; - const anyErr = err as { killed?: boolean; signal?: string }; - - if (anyErr.killed || anyErr.signal === "SIGKILL") { - return { - durationMs, - timedOut: true, - error: `Pre-hook timed out after ${timeoutSeconds}s`, - }; - } - - const errorMsg = err instanceof Error ? err.message : String(err); - console.error(danger(`Heartbeat pre-hook failed: ${errorMsg}`)); - - return { - durationMs, - error: errorMsg, - }; - } -} diff --git a/src/auto-reply/reply.chunking.test.ts b/src/auto-reply/reply.chunking.test.ts new file mode 100644 index 000000000..4b30c8c56 --- /dev/null +++ b/src/auto-reply/reply.chunking.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it, vi } from "vitest"; + +import type { WarelayConfig } from "../config/config.js"; +import { autoReplyIfConfigured } from "./reply.js"; + +describe("autoReplyIfConfigured chunking", () => { + it("sends a single Twilio message for multi-line text under limit", async () => { + const body = [ + "Oh! Hi Peter! 🦞", + "", + "Sorry, I got a bit trigger-happy with the heartbeat response there. What's up?", + "", + "Everything working on your end?", + ].join("\n"); + + const config: WarelayConfig = { + inbound: { + reply: { + mode: "text", + text: body, + }, + }, + }; + + const create = vi.fn().mockResolvedValue({}); + const client = { messages: { create } } as unknown as Parameters< + typeof autoReplyIfConfigured + >[0]; + + const message = { + body: "ping", + from: "+15551234567", + to: "+15557654321", + sid: "SM123", + } as Parameters[1]; + + await autoReplyIfConfigured(client, message, config); + + expect(create).toHaveBeenCalledTimes(1); + expect(create).toHaveBeenCalledWith( + expect.objectContaining({ + body, + from: message.to, + to: message.from, + }), + ); + }); +}); diff --git a/src/auto-reply/reply.ts b/src/auto-reply/reply.ts index cdf1eef5f..e760b01e5 100644 --- a/src/auto-reply/reply.ts +++ b/src/auto-reply/reply.ts @@ -1,5 +1,4 @@ import crypto from "node:crypto"; - import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js"; import { loadConfig, type WarelayConfig } from "../config/config.js"; import { @@ -8,6 +7,7 @@ import { deriveSessionKey, loadSessionStore, resolveStorePath, + type SessionEntry, saveSessionStore, } from "../config/sessions.js"; import { info, isVerbose, logVerbose } from "../globals.js"; @@ -17,6 +17,7 @@ import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { splitMessage } from "../twilio/send.js"; import type { TwilioRequester } from "../twilio/types.js"; import { sendTypingIndicator } from "../twilio/typing.js"; +import { chunkText } from "./chunk.js"; import { runCommandReply } from "./command-reply.js"; import { applyTemplate, @@ -28,12 +29,54 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js"; export type { GetReplyOptions, ReplyPayload } from "./types.js"; +const TWILIO_TEXT_LIMIT = 1600; + +const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]); +const ABORT_MEMORY = new Map(); + +type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high"; + +function normalizeThinkLevel(raw?: string | null): ThinkLevel | undefined { + if (!raw) return undefined; + const key = raw.toLowerCase(); + if (["off"].includes(key)) return "off"; + if (["min", "minimal"].includes(key)) return "minimal"; + if (["low", "thinkhard", "think-hard", "think_hard"].includes(key)) + return "low"; + if (["med", "medium", "thinkharder", "think-harder", "harder"].includes(key)) + return "medium"; + if ( + ["high", "ultra", "ultrathink", "think-hard", "thinkhardest"].includes(key) + ) + return "high"; + if (["think"].includes(key)) return "minimal"; + return undefined; +} + +function extractThinkDirective(body?: string): { + cleaned: string; + thinkLevel?: ThinkLevel; +} { + if (!body) return { cleaned: "" }; + const re = /\/think:([a-zA-Z-]+)/i; + const match = body.match(re); + const thinkLevel = normalizeThinkLevel(match?.[1]); + const cleaned = match ? body.replace(match[0], "").trim() : body; + return { cleaned, thinkLevel }; +} + +function isAbortTrigger(text?: string): boolean { + if (!text) return false; + const normalized = text.trim().toLowerCase(); + return ABORT_TRIGGERS.has(normalized); +} + export async function getReplyFromConfig( ctx: MsgContext, opts?: GetReplyOptions, configOverride?: WarelayConfig, commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout, -): Promise { +): Promise { // Choose reply from config: static text or external command stdout. const cfg = configOverride ?? loadConfig(); const reply = cfg.inbound?.reply; @@ -110,11 +153,13 @@ export async function getReplyFromConfig( const storePath = resolveStorePath(sessionCfg?.store); let sessionStore: ReturnType | undefined; let sessionKey: string | undefined; + let sessionEntry: SessionEntry | undefined; let sessionId: string | undefined; let isNewSession = false; let bodyStripped: string | undefined; let systemSent = false; + let abortedLastRun = false; if (sessionCfg) { const trimmedBody = (ctx.Body ?? "").trim(); @@ -142,13 +187,21 @@ export async function getReplyFromConfig( if (!isNewSession && freshEntry) { sessionId = entry.sessionId; systemSent = entry.systemSent ?? false; + abortedLastRun = entry.abortedLastRun ?? false; } else { sessionId = crypto.randomUUID(); isNewSession = true; systemSent = false; + abortedLastRun = false; } - sessionStore[sessionKey] = { sessionId, updatedAt: Date.now(), systemSent }; + sessionEntry = { + sessionId, + updatedAt: Date.now(), + systemSent, + abortedLastRun, + }; + sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); } @@ -159,11 +212,22 @@ export async function getReplyFromConfig( IsNewSession: isNewSession ? "true" : "false", }; + const { cleaned: thinkCleaned, thinkLevel } = extractThinkDirective( + sessionCtx.BodyStripped ?? sessionCtx.Body ?? "", + ); + sessionCtx.Body = thinkCleaned; + sessionCtx.BodyStripped = thinkCleaned; + // Optional allowlist by origin number (E.164 without whatsapp: prefix) const allowFrom = cfg.inbound?.allowFrom; const from = (ctx.From ?? "").replace(/^whatsapp:/, ""); const to = (ctx.To ?? "").replace(/^whatsapp:/, ""); const isSamePhone = from && to && from === to; + const abortKey = sessionKey ?? (from || undefined) ?? (to || undefined); + + if (!sessionEntry && abortKey) { + abortedLastRun = ABORT_MEMORY.get(abortKey) ?? false; + } // Same-phone mode (self-messaging) is always allowed if (isSamePhone) { @@ -179,6 +243,23 @@ export async function getReplyFromConfig( } } + const abortRequested = + reply?.mode === "command" && + isAbortTrigger((sessionCtx.BodyStripped ?? sessionCtx.Body ?? "").trim()); + + if (abortRequested) { + if (sessionEntry && sessionStore && sessionKey) { + sessionEntry.abortedLastRun = true; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } else if (abortKey) { + ABORT_MEMORY.set(abortKey, true); + } + cleanupTyping(); + return { text: "Agent was aborted." }; + } + await startTypingLoop(); // Optional prefix injected before Body for templating/command prompts. @@ -192,16 +273,30 @@ export async function getReplyFromConfig( ? applyTemplate(reply.bodyPrefix, sessionCtx) : ""; const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? ""; - const prefixedBodyBase = (() => { - let body = baseBody; - if (!sendSystemOnce || isFirstTurnInSession) { - body = bodyPrefix ? `${bodyPrefix}${body}` : body; + const abortedHint = + reply?.mode === "command" && abortedLastRun + ? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification." + : ""; + let prefixedBodyBase = baseBody; + if (!sendSystemOnce || isFirstTurnInSession) { + prefixedBodyBase = bodyPrefix + ? `${bodyPrefix}${prefixedBodyBase}` + : prefixedBodyBase; + } + if (sessionIntro) { + prefixedBodyBase = `${sessionIntro}\n\n${prefixedBodyBase}`; + } + if (abortedHint) { + prefixedBodyBase = `${abortedHint}\n\n${prefixedBodyBase}`; + if (sessionEntry && sessionStore && sessionKey) { + sessionEntry.abortedLastRun = false; + sessionEntry.updatedAt = Date.now(); + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + } else if (abortKey) { + ABORT_MEMORY.set(abortKey, false); } - if (sessionIntro) { - body = `${sessionIntro}\n\n${body}`; - } - return body; - })(); + } if ( sessionCfg && sendSystemOnce && @@ -209,12 +304,18 @@ export async function getReplyFromConfig( sessionStore && sessionKey ) { - sessionStore[sessionKey] = { - ...(sessionStore[sessionKey] ?? {}), - sessionId: sessionId ?? crypto.randomUUID(), + const current = sessionEntry ?? + sessionStore[sessionKey] ?? { + sessionId: sessionId ?? crypto.randomUUID(), + updatedAt: Date.now(), + }; + sessionEntry = { + ...current, + sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(), updatedAt: Date.now(), systemSent: true, }; + sessionStore[sessionKey] = sessionEntry; await saveSessionStore(storePath, sessionStore); systemSent = true; } @@ -261,15 +362,29 @@ export async function getReplyFromConfig( return result; } - if (reply && reply.mode === "command" && reply.command?.length) { + const isHeartbeat = opts?.isHeartbeat === true; + + if (reply && reply.mode === "command") { + const heartbeatCommand = isHeartbeat + ? (reply as { heartbeatCommand?: string[] }).heartbeatCommand + : undefined; + const commandArgs = heartbeatCommand?.length + ? heartbeatCommand + : reply.command; + + if (!commandArgs?.length) { + cleanupTyping(); + return undefined; + } + await onReplyStart(); const commandReply = { ...reply, - command: reply.command, + command: commandArgs, mode: "command" as const, }; try { - const { payload, meta } = await runCommandReply({ + const runResult = await runCommandReply({ reply: commandReply, templatingCtx, sendSystemOnce, @@ -279,11 +394,47 @@ export async function getReplyFromConfig( timeoutMs, timeoutSeconds, commandRunner, + thinkLevel, }); + const payloadArray = runResult.payloads ?? []; + const meta = runResult.meta; + const normalizedPayloads = + payloadArray.length === 1 ? payloadArray[0] : payloadArray; + if ( + !normalizedPayloads || + (Array.isArray(normalizedPayloads) && normalizedPayloads.length === 0) + ) { + return undefined; + } + if (sessionCfg && sessionStore && sessionKey) { + const returnedSessionId = meta.agentMeta?.sessionId; + if (returnedSessionId && returnedSessionId !== sessionId) { + const entry = sessionEntry ?? + sessionStore[sessionKey] ?? { + sessionId: returnedSessionId, + updatedAt: Date.now(), + systemSent, + abortedLastRun, + }; + sessionEntry = { + ...entry, + sessionId: returnedSessionId, + updatedAt: Date.now(), + }; + sessionStore[sessionKey] = sessionEntry; + await saveSessionStore(storePath, sessionStore); + sessionId = returnedSessionId; + if (isVerbose()) { + logVerbose( + `Session id updated from agent meta: ${returnedSessionId} (store: ${storePath})`, + ); + } + } + } if (meta.agentMeta && isVerbose()) { logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`); } - return payload; + return normalizedPayloads; } finally { cleanupTyping(); } @@ -340,17 +491,18 @@ export async function autoReplyIfConfigured( const replyResult = await getReplyFromConfig( ctx, { - onReplyStart: () => sendTypingIndicator(client, runtime, message.sid), + onReplyStart: async () => { + await sendTypingIndicator(client, runtime, message.sid); + }, }, cfg, ); - if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) - ) - return; + const replies = replyResult + ? Array.isArray(replyResult) + ? replyResult + : [replyResult] + : []; + if (replies.length === 0) return; const replyFrom = message.to; const replyTo = message.from; @@ -363,23 +515,7 @@ export async function autoReplyIfConfigured( return; } - if (replyResult.text) { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyResult.text.length}`, - ); - } else { - logVerbose( - `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`, - ); - } - try { - const mediaList = replyResult.mediaUrls?.length - ? replyResult.mediaUrls - : replyResult.mediaUrl - ? [replyResult.mediaUrl] - : []; - const sendTwilio = async (body: string, media?: string) => { let resolvedMedia = media; if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) { @@ -419,22 +555,48 @@ export async function autoReplyIfConfigured( } }; - if (mediaList.length === 0) { - await sendTwilio(replyResult.text ?? ""); - } else { - // First media with body (if any), then remaining as separate media-only sends. - await sendTwilio(replyResult.text ?? "", mediaList[0]); - for (const extra of mediaList.slice(1)) { - await sendTwilio("", extra); + for (const replyPayload of replies) { + const mediaList = replyPayload.mediaUrls?.length + ? replyPayload.mediaUrls + : replyPayload.mediaUrl + ? [replyPayload.mediaUrl] + : []; + + const text = replyPayload.text ?? ""; + const chunks = chunkText(text, TWILIO_TEXT_LIMIT); + if (chunks.length === 0) chunks.push(""); + + for (let i = 0; i < chunks.length; i++) { + const body = chunks[i]; + const attachMedia = i === 0 ? mediaList[0] : undefined; + + if (body) { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`, + ); + } else if (attachMedia) { + logVerbose( + `Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`, + ); + } + + await sendTwilio(body, attachMedia); + + if (i === 0 && mediaList.length > 1) { + for (const extra of mediaList.slice(1)) { + await sendTwilio("", extra); + } + } + + if (isVerbose()) { + console.log( + info( + `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`, + ), + ); + } } } - if (isVerbose()) { - console.log( - info( - `↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyResult.mediaUrl ? ", media" : ""})`, - ), - ); - } } catch (err) { const anyErr = err as { code?: string | number; diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 65d54518a..211d1f932 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -1,5 +1,6 @@ export type GetReplyOptions = { - onReplyStart?: () => Promise | void; + onReplyStart?: () => Promise | void; + isHeartbeat?: boolean; }; export type ReplyPayload = { diff --git a/src/cli/deps.ts b/src/cli/deps.ts index 5e8f3f677..51c428175 100644 --- a/src/cli/deps.ts +++ b/src/cli/deps.ts @@ -1,12 +1,10 @@ import { autoReplyIfConfigured } from "../auto-reply/reply.js"; -import { loadConfig } from "../config/config.js"; import { readEnv } from "../env.js"; import { info } from "../globals.js"; import { ensureBinary } from "../infra/binaries.js"; import { ensurePortAvailable, handlePortError } from "../infra/ports.js"; import { ensureFunnel, getTailnetHostname } from "../infra/tailscale.js"; import { ensureMediaHosted } from "../media/host.js"; -import { getQueueSize } from "../process/command-queue.js"; import { logWebSelfId, monitorWebProvider, @@ -14,7 +12,6 @@ import { } from "../providers/web/index.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { createClient } from "../twilio/client.js"; -import { runTwilioHeartbeatOnce } from "../twilio/heartbeat.js"; import { listRecentMessages } from "../twilio/messages.js"; import { monitorTwilio as monitorTwilioImpl } from "../twilio/monitor.js"; import { sendMessage, waitForFinalStatus } from "../twilio/send.js"; @@ -54,7 +51,6 @@ export async function monitorTwilio( lookbackMinutes: number, clientOverride?: ReturnType, maxIterations = Infinity, - opts?: { heartbeatNow?: boolean; heartbeatMinutes?: number }, ) { // Adapter that wires default deps/runtime for the Twilio monitor loop. return monitorTwilioImpl(intervalSeconds, lookbackMinutes, { @@ -66,13 +62,8 @@ export async function monitorTwilio( readEnv, createClient, sleep, - loadConfig, - runTwilioHeartbeatOnce, - getQueueSize, }, runtime: defaultRuntime, - heartbeatNow: opts?.heartbeatNow, - heartbeatMinutes: opts?.heartbeatMinutes, }); } diff --git a/src/cli/program.ts b/src/cli/program.ts index 3ae8606ac..de003ff52 100644 --- a/src/cli/program.ts +++ b/src/cli/program.ts @@ -451,15 +451,7 @@ Examples: ensureTwilioEnv(); logTwilioFrom(); - await monitorTwilio( - intervalSeconds, - lookbackMinutes, - undefined, - Infinity, - { - heartbeatNow, - }, - ); + await monitorTwilio(intervalSeconds, lookbackMinutes); }); program diff --git a/src/config/config.ts b/src/config/config.ts index 75c4bc1d9..86702dfe0 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -121,6 +121,7 @@ const ReplySchema = z z.literal("opencode"), z.literal("pi"), z.literal("codex"), + z.literal("gemini"), ]), format: z.union([z.literal("text"), z.literal("json")]).optional(), identityPrefix: z.string().optional(), diff --git a/src/config/sessions.ts b/src/config/sessions.ts index 9c5d10096..eba690e69 100644 --- a/src/config/sessions.ts +++ b/src/config/sessions.ts @@ -12,6 +12,7 @@ export type SessionEntry = { sessionId: string; updatedAt: number; systemSent?: boolean; + abortedLastRun?: boolean; }; export const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json"); diff --git a/src/index.core.test.ts b/src/index.core.test.ts index f213719d4..ffe4c748a 100644 --- a/src/index.core.test.ts +++ b/src/index.core.test.ts @@ -671,6 +671,124 @@ describe("config and templating", () => { expect(secondArgv[secondArgv.length - 1]).toBe("[sys] next"); }); + it("stores session id returned by agent meta when it differs", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-sessionid.json`, + ); + vi.spyOn(crypto, "randomUUID").mockReturnValue("initial-sid"); + const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockResolvedValue({ + stdout: '{"text":"hi","session_id":"agent-sid-123"}\n', + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["claude", "{{Body}}"], + agent: { kind: "claude", format: "json" as const }, + session: { store: tmpStore }, + }, + }, + }; + + await index.getReplyFromConfig( + { Body: "/new hi", From: "+1", To: "+2" }, + undefined, + cfg, + runSpy, + ); + + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { sessionId?: string }; + expect(entry.sessionId).toBe("agent-sid-123"); + }); + + it("aborts command when stop word is received and skips command runner", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-abort.json`, + ); + const runSpy = vi.fn().mockResolvedValue({ + stdout: "should-not-run", + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["echo", "{{Body}}"], + session: { store: tmpStore }, + }, + }, + }; + + const result = await index.getReplyFromConfig( + { Body: "stop", From: "+1", To: "+2" }, + undefined, + cfg, + runSpy, + ); + + expect(result?.text).toMatch(/aborted/i); + expect(runSpy).not.toHaveBeenCalled(); + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean }; + expect(entry.abortedLastRun).toBe(true); + }); + + it("adds an abort hint to the next prompt and then clears the flag", async () => { + const tmpStore = path.join( + os.tmpdir(), + `warelay-store-${Date.now()}-aborthint.json`, + ); + const runSpy = vi.fn().mockResolvedValue({ + stdout: "ok\n", + stderr: "", + code: 0, + signal: null, + killed: false, + }); + const cfg = { + inbound: { + reply: { + mode: "command" as const, + command: ["echo", "{{Body}}"], + session: { store: tmpStore }, + }, + }, + }; + + await index.getReplyFromConfig( + { Body: "abort", From: "+1555", To: "+2666" }, + undefined, + cfg, + runSpy, + ); + + const result = await index.getReplyFromConfig( + { Body: "continue", From: "+1555", To: "+2666" }, + undefined, + cfg, + runSpy, + ); + + const argv = runSpy.mock.calls[0][0]; + const prompt = argv.at(-1) as string; + expect(prompt).toMatch(/previous agent run was aborted/i); + expect(prompt).toMatch(/continue/); + const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8")); + const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean }; + expect(entry.abortedLastRun).toBe(false); + expect(result?.text).toBe("ok"); + }); + it("refreshes typing indicator while command runs", async () => { const onReplyStart = vi.fn(); const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockImplementation( @@ -777,7 +895,7 @@ describe("config and templating", () => { const argv = runSpy.mock.calls[0][0]; expect(argv[0]).toBe("claude"); expect(argv.at(-1)).toContain("You are Clawd (Claude)"); - expect(argv.at(-1)).toContain("/Users/steipete/clawd"); + expect(argv.at(-1)).toContain("scratchpad"); expect(argv.at(-1)).toMatch(/hi$/); // The helper should auto-add print and output format flags without disturbing the prompt position. expect(argv.includes("-p") || argv.includes("--print")).toBe(true); @@ -845,7 +963,7 @@ describe("config and templating", () => { expect(result?.text).toBe("Sure! What's up?"); const argv = runSpy.mock.calls[0][0]; expect(argv.at(-1)).toContain("You are Clawd (Claude)"); - expect(argv.at(-1)).toContain("/Users/steipete/clawd"); + expect(argv.at(-1)).toContain("scratchpad"); }); it("serializes command auto-replies via the queue", async () => { diff --git a/src/logger.test.ts b/src/logger.test.ts index 709fcca35..4fa8f38b5 100644 --- a/src/logger.test.ts +++ b/src/logger.test.ts @@ -7,11 +7,7 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { setVerbose } from "./globals.js"; import { logDebug, logError, logInfo, logSuccess, logWarn } from "./logger.js"; -import { - DEFAULT_LOG_DIR, - resetLogger, - setLoggerOverride, -} from "./logging.js"; +import { DEFAULT_LOG_DIR, resetLogger, setLoggerOverride } from "./logging.js"; import type { RuntimeEnv } from "./runtime.js"; describe("logger helpers", () => { diff --git a/src/logging.ts b/src/logging.ts index cabe9b277..85a8c26ae 100644 --- a/src/logging.ts +++ b/src/logging.ts @@ -133,7 +133,11 @@ function pruneOldRollingLogs(dir: string): void { const cutoff = Date.now() - MAX_LOG_AGE_MS; for (const entry of entries) { if (!entry.isFile()) continue; - if (!entry.name.startsWith(`${LOG_PREFIX}-`) || !entry.name.endsWith(LOG_SUFFIX)) continue; + if ( + !entry.name.startsWith(`${LOG_PREFIX}-`) || + !entry.name.endsWith(LOG_SUFFIX) + ) + continue; const fullPath = path.join(dir, entry.name); try { const stat = fs.statSync(fullPath); diff --git a/src/media/server.test.ts b/src/media/server.test.ts index 875088cbf..46ddc0b2c 100644 --- a/src/media/server.test.ts +++ b/src/media/server.test.ts @@ -54,7 +54,9 @@ describe("media server", () => { const server = await startMediaServer(0, 5_000); const port = (server.address() as AddressInfo).port; // URL-encoded "../" to bypass client-side path normalization - const res = await fetch(`http://localhost:${port}/media/%2e%2e%2fpackage.json`); + const res = await fetch( + `http://localhost:${port}/media/%2e%2e%2fpackage.json`, + ); expect(res.status).toBe(400); expect(await res.text()).toBe("invalid path"); await new Promise((r) => server.close(r)); diff --git a/src/media/server.ts b/src/media/server.ts index 1c37c2a33..d56af9ea9 100644 --- a/src/media/server.ts +++ b/src/media/server.ts @@ -4,6 +4,7 @@ import path from "node:path"; import express, { type Express } from "express"; import { danger } from "../globals.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; +import { detectMime } from "./mime.js"; import { cleanOldMedia, getMediaDir } from "./store.js"; const DEFAULT_TTL_MS = 2 * 60 * 1000; @@ -19,7 +20,6 @@ export function attachMediaRoutes( const id = req.params.id; const mediaRoot = (await fs.realpath(mediaDir)) + path.sep; const file = path.resolve(mediaRoot, id); - try { const lstat = await fs.lstat(file); if (lstat.isSymbolicLink()) { @@ -37,12 +37,15 @@ export function attachMediaRoutes( res.status(410).send("expired"); return; } - res.sendFile(realPath); + const data = await fs.readFile(realPath); + const mime = detectMime({ buffer: data, filePath: realPath }); + if (mime) res.type(mime); + res.send(data); // best-effort single-use cleanup after response ends res.on("finish", () => { setTimeout(() => { fs.rm(realPath).catch(() => {}); - }, 500); + }, 50); }); } catch { res.status(404).send("not found"); diff --git a/src/media/store.redirect.test.ts b/src/media/store.redirect.test.ts new file mode 100644 index 000000000..e933665ba --- /dev/null +++ b/src/media/store.redirect.test.ts @@ -0,0 +1,73 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { PassThrough } from "node:stream"; + +import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; + +const realOs = await vi.importActual("node:os"); +const HOME = path.join(realOs.tmpdir(), "warelay-home-redirect"); +const mockRequest = vi.fn(); + +vi.doMock("node:os", () => ({ + default: { homedir: () => HOME }, + homedir: () => HOME, +})); + +vi.doMock("node:https", () => ({ + request: (...args: unknown[]) => mockRequest(...args), +})); + +const { saveMediaSource } = await import("./store.js"); + +describe("media store redirects", () => { + beforeAll(async () => { + await fs.rm(HOME, { recursive: true, force: true }); + }); + + afterAll(async () => { + await fs.rm(HOME, { recursive: true, force: true }); + vi.clearAllMocks(); + }); + + it("follows redirects and keeps detected mime/extension", async () => { + let call = 0; + mockRequest.mockImplementation((_url, _opts, cb) => { + call += 1; + const res = new PassThrough(); + const req = { + on: (event: string, handler: (...args: unknown[]) => void) => { + if (event === "error") res.on("error", handler); + return req; + }, + end: () => undefined, + destroy: () => res.destroy(), + } as const; + + if (call === 1) { + res.statusCode = 302; + res.headers = { location: "https://example.com/final" }; + setImmediate(() => { + cb(res as unknown as Parameters[0]); + res.end(); + }); + } else { + res.statusCode = 200; + res.headers = { "content-type": "text/plain" }; + setImmediate(() => { + cb(res as unknown as Parameters[0]); + res.write("redirected"); + res.end(); + }); + } + + return req; + }); + + const saved = await saveMediaSource("https://example.com/start"); + + expect(mockRequest).toHaveBeenCalledTimes(2); + expect(saved.contentType).toBe("text/plain"); + expect(path.extname(saved.path)).toBe(".txt"); + expect(await fs.readFile(saved.path, "utf8")).toBe("redirected"); + }); +}); diff --git a/src/media/store.ts b/src/media/store.ts index 291d048e8..31664aab9 100644 --- a/src/media/store.ts +++ b/src/media/store.ts @@ -48,9 +48,21 @@ async function downloadToFile( url: string, dest: string, headers?: Record, + maxRedirects = 5, ): Promise<{ headerMime?: string; sniffBuffer: Buffer; size: number }> { return await new Promise((resolve, reject) => { const req = request(url, { headers }, (res) => { + // Follow redirects + if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400) { + const location = res.headers.location; + if (!location || maxRedirects <= 0) { + reject(new Error(`Redirect loop or missing Location header`)); + return; + } + const redirectUrl = new URL(location, url).href; + resolve(downloadToFile(redirectUrl, dest, headers, maxRedirects - 1)); + return; + } if (!res.statusCode || res.statusCode >= 400) { reject(new Error(`HTTP ${res.statusCode ?? "?"} downloading media`)); return; @@ -107,9 +119,9 @@ export async function saveMediaSource( const dir = subdir ? path.join(MEDIA_DIR, subdir) : MEDIA_DIR; await fs.mkdir(dir, { recursive: true }); await cleanOldMedia(); - const id = crypto.randomUUID(); + const baseId = crypto.randomUUID(); if (looksLikeUrl(source)) { - const tempDest = path.join(dir, `${id}.tmp`); + const tempDest = path.join(dir, `${baseId}.tmp`); const { headerMime, sniffBuffer, size } = await downloadToFile( source, tempDest, @@ -122,7 +134,8 @@ export async function saveMediaSource( }); const ext = extensionForMime(mime) ?? path.extname(new URL(source).pathname); - const finalDest = path.join(dir, ext ? `${id}${ext}` : id); + const id = ext ? `${baseId}${ext}` : baseId; + const finalDest = path.join(dir, id); await fs.rename(tempDest, finalDest); return { id, path: finalDest, size, contentType: mime }; } @@ -137,7 +150,8 @@ export async function saveMediaSource( const buffer = await fs.readFile(source); const mime = detectMime({ buffer, filePath: source }); const ext = extensionForMime(mime) ?? path.extname(source); - const dest = path.join(dir, ext ? `${id}${ext}` : id); + const id = ext ? `${baseId}${ext}` : baseId; + const dest = path.join(dir, id); await fs.writeFile(dest, buffer); return { id, path: dest, size: stat.size, contentType: mime }; } @@ -152,10 +166,11 @@ export async function saveMediaBuffer( } const dir = path.join(MEDIA_DIR, subdir); await fs.mkdir(dir, { recursive: true }); - const id = crypto.randomUUID(); + const baseId = crypto.randomUUID(); const mime = detectMime({ buffer, headerMime: contentType }); const ext = extensionForMime(mime); - const dest = path.join(dir, ext ? `${id}${ext}` : id); + const id = ext ? `${baseId}${ext}` : baseId; + const dest = path.join(dir, id); await fs.writeFile(dest, buffer); return { id, path: dest, size: buffer.byteLength, contentType: mime }; } diff --git a/src/process/tau-rpc.ts b/src/process/tau-rpc.ts new file mode 100644 index 000000000..83b44c416 --- /dev/null +++ b/src/process/tau-rpc.ts @@ -0,0 +1,153 @@ +import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; +import readline from "node:readline"; + +import { piSpec } from "../agents/pi.js"; + +type TauRpcOptions = { + argv: string[]; + cwd?: string; + timeoutMs: number; +}; + +type TauRpcResult = { + stdout: string; + stderr: string; + code: number; + signal?: NodeJS.Signals | null; + killed?: boolean; +}; + +class TauRpcClient { + private child: ChildProcessWithoutNullStreams | null = null; + private rl: readline.Interface | null = null; + private stderr = ""; + private buffer: string[] = []; + private idleTimer: NodeJS.Timeout | null = null; + private seenAssistantEnd = false; + private readonly idleMs = 120; + private pending: + | { + resolve: (r: TauRpcResult) => void; + reject: (err: unknown) => void; + timer: NodeJS.Timeout; + } + | undefined; + + constructor( + private readonly argv: string[], + private readonly cwd: string | undefined, + ) {} + + private ensureChild() { + if (this.child) return; + this.child = spawn(this.argv[0], this.argv.slice(1), { + cwd: this.cwd, + stdio: ["pipe", "pipe", "pipe"], + }); + this.rl = readline.createInterface({ input: this.child.stdout }); + this.rl.on("line", (line) => this.handleLine(line)); + this.child.stderr.on("data", (d) => { + this.stderr += d.toString(); + }); + this.child.on("exit", (code, signal) => { + if (this.pending) { + this.pending.reject( + new Error(`tau rpc exited (code=${code}, signal=${signal})`), + ); + clearTimeout(this.pending.timer); + this.pending = undefined; + } + this.dispose(); + }); + } + + private handleLine(line: string) { + if (!this.pending) return; + this.buffer.push(line); + // Streamed JSON arrives line-by-line; mark when an assistant message finishes + // and resolve after a short idle to capture any follow-up events (e.g. tools) + // that belong to the same turn. + if ( + line.includes('"type":"message_end"') && + line.includes('"role":"assistant"') + ) { + this.seenAssistantEnd = true; + } + + if (this.seenAssistantEnd) { + if (this.idleTimer) clearTimeout(this.idleTimer); + this.idleTimer = setTimeout(() => { + if (!this.pending) return; + const out = this.buffer.join("\n"); + // Only resolve once we have at least one assistant text payload; otherwise keep waiting. + const parsed = piSpec.parseOutput(out); + if (parsed.texts && parsed.texts.length > 0) { + const pending = this.pending; + this.pending = undefined; + this.buffer = []; + this.seenAssistantEnd = false; + clearTimeout(pending.timer); + pending.resolve({ stdout: out, stderr: this.stderr, code: 0 }); + return; + } + // No assistant text yet; wait for more lines. + }, this.idleMs); // small idle window to group streaming blocks + } + } + + async prompt(prompt: string, timeoutMs: number): Promise { + this.ensureChild(); + if (this.pending) { + throw new Error("tau rpc already handling a request"); + } + const child = this.child; + if (!child) throw new Error("tau rpc child not initialized"); + await new Promise((resolve, reject) => { + const ok = child.stdin.write( + `${JSON.stringify({ + type: "prompt", + message: { role: "user", content: [{ type: "text", text: prompt }] }, + })}\n`, + (err) => (err ? reject(err) : resolve()), + ); + if (!ok) child.stdin.once("drain", () => resolve()); + }); + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending = undefined; + reject(new Error(`tau rpc timed out after ${timeoutMs}ms`)); + child.kill("SIGKILL"); + }, timeoutMs); + this.pending = { resolve, reject, timer }; + }); + } + + dispose() { + this.rl?.close(); + this.rl = null; + if (this.child && !this.child.killed) { + this.child.kill("SIGKILL"); + } + this.child = null; + this.buffer = []; + this.stderr = ""; + } +} + +let singleton: { key: string; client: TauRpcClient } | undefined; + +export async function runPiRpc( + opts: TauRpcOptions & { prompt: string }, +): Promise { + const key = `${opts.cwd ?? ""}|${opts.argv.join(" ")}`; + if (!singleton || singleton.key !== key) { + singleton?.client.dispose(); + singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) }; + } + return singleton.client.prompt(opts.prompt, opts.timeoutMs); +} + +export function resetPiRpc() { + singleton?.client.dispose(); + singleton = undefined; +} diff --git a/src/twilio/heartbeat.test.ts b/src/twilio/heartbeat.test.ts index 4abf51fc7..216f759da 100644 --- a/src/twilio/heartbeat.test.ts +++ b/src/twilio/heartbeat.test.ts @@ -1,6 +1,6 @@ -import { beforeEach, describe, expect, it, type Mock, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; -import { HEARTBEAT_PROMPT, HEARTBEAT_TOKEN } from "../web/auto-reply.js"; +import { HEARTBEAT_TOKEN } from "../web/auto-reply.js"; import { runTwilioHeartbeatOnce } from "./heartbeat.js"; vi.mock("./send.js", () => ({ @@ -11,34 +11,15 @@ vi.mock("../auto-reply/reply.js", () => ({ getReplyFromConfig: vi.fn(), })); -vi.mock("../auto-reply/heartbeat-prehook.js", () => ({ - runHeartbeatPreHook: vi.fn(), - buildHeartbeatPrompt: vi.fn((base: string, ctx?: string) => - ctx ? `${base}\n\n---\nContext from pre-hook:\n${ctx}` : base, - ), -})); - -vi.mock("../config/config.js", () => ({ - loadConfig: vi.fn(() => ({})), -})); - -// eslint-disable-next-line import/first -import { runHeartbeatPreHook } from "../auto-reply/heartbeat-prehook.js"; // eslint-disable-next-line import/first import { getReplyFromConfig } from "../auto-reply/reply.js"; // eslint-disable-next-line import/first import { sendMessage } from "./send.js"; -const sendMessageMock = sendMessage as unknown as Mock; -const replyResolverMock = getReplyFromConfig as unknown as Mock; -const runHeartbeatPreHookMock = runHeartbeatPreHook as unknown as Mock; +const sendMessageMock = sendMessage as unknown as vi.Mock; +const replyResolverMock = getReplyFromConfig as unknown as vi.Mock; describe("runTwilioHeartbeatOnce", () => { - beforeEach(() => { - vi.clearAllMocks(); - runHeartbeatPreHookMock.mockResolvedValue({ durationMs: 0 }); - }); - it("sends manual override body and skips resolver", async () => { sendMessageMock.mockResolvedValue({}); await runTwilioHeartbeatOnce({ @@ -91,80 +72,4 @@ describe("runTwilioHeartbeatOnce", () => { expect.anything(), ); }); - - describe("pre-hook integration", () => { - it("runs pre-hook and includes context in prompt", async () => { - runHeartbeatPreHookMock.mockResolvedValue({ - context: "You have 3 unread emails", - durationMs: 100, - }); - replyResolverMock.mockResolvedValue({ text: "ALERT!" }); - sendMessageMock.mockResolvedValue({}); - - await runTwilioHeartbeatOnce({ to: "+1555" }); - - expect(runHeartbeatPreHookMock).toHaveBeenCalled(); - expect(replyResolverMock).toHaveBeenCalledWith( - expect.objectContaining({ - Body: expect.stringContaining("Context from pre-hook"), - }), - undefined, - ); - }); - - it("skips pre-hook when skipPreHook is true", async () => { - replyResolverMock.mockResolvedValue({ text: "ALERT!" }); - sendMessageMock.mockResolvedValue({}); - - await runTwilioHeartbeatOnce({ to: "+1555", skipPreHook: true }); - - expect(runHeartbeatPreHookMock).not.toHaveBeenCalled(); - expect(replyResolverMock).toHaveBeenCalledWith( - expect.objectContaining({ - Body: HEARTBEAT_PROMPT, - }), - undefined, - ); - }); - - it("continues with basic heartbeat on pre-hook failure", async () => { - runHeartbeatPreHookMock.mockResolvedValue({ - error: "Pre-hook failed", - durationMs: 50, - }); - replyResolverMock.mockResolvedValue({ text: "ALERT!" }); - sendMessageMock.mockResolvedValue({}); - - await runTwilioHeartbeatOnce({ to: "+1555" }); - - expect(replyResolverMock).toHaveBeenCalledWith( - expect.objectContaining({ - Body: HEARTBEAT_PROMPT, - }), - undefined, - ); - expect(sendMessage).toHaveBeenCalled(); - }); - - it("uses injected config to avoid loading real config in tests", async () => { - const testConfig = { - inbound: { - reply: { - mode: "command" as const, - command: ["echo"], - session: { - heartbeatPreHook: ["test-script"], - }, - }, - }, - }; - runHeartbeatPreHookMock.mockResolvedValue({ durationMs: 0 }); - replyResolverMock.mockResolvedValue({ text: "OK" }); - sendMessageMock.mockResolvedValue({}); - - await runTwilioHeartbeatOnce({ to: "+1555", cfg: testConfig }); - - expect(runHeartbeatPreHookMock).toHaveBeenCalledWith(testConfig); - }); - }); }); diff --git a/src/twilio/heartbeat.ts b/src/twilio/heartbeat.ts index 6391e0824..c8129834e 100644 --- a/src/twilio/heartbeat.ts +++ b/src/twilio/heartbeat.ts @@ -1,9 +1,4 @@ -import { - buildHeartbeatPrompt, - runHeartbeatPreHook, -} from "../auto-reply/heartbeat-prehook.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; -import { loadConfig, type WarelayConfig } from "../config/config.js"; import { danger, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; @@ -19,8 +14,6 @@ export async function runTwilioHeartbeatOnce(opts: { replyResolver?: ReplyResolver; overrideBody?: string; dryRun?: boolean; - skipPreHook?: boolean; - cfg?: WarelayConfig; }) { const { to, @@ -28,10 +21,8 @@ export async function runTwilioHeartbeatOnce(opts: { runtime = defaultRuntime, overrideBody, dryRun = false, - skipPreHook = false, } = opts; const replyResolver = opts.replyResolver ?? getReplyFromConfig; - const cfg = opts.cfg ?? loadConfig(); if (overrideBody && overrideBody.trim().length === 0) { throw new Error("Override body must be non-empty when provided."); @@ -51,52 +42,40 @@ export async function runTwilioHeartbeatOnce(opts: { return; } - // Run pre-hook unless skipped - let heartbeatPrompt = HEARTBEAT_PROMPT; - if (!skipPreHook) { - const preHookResult = await runHeartbeatPreHook(cfg); - if (preHookResult.error) { - logInfo( - `Pre-hook failed: ${preHookResult.error} (continuing)`, - runtime, - ); - } - heartbeatPrompt = buildHeartbeatPrompt( - HEARTBEAT_PROMPT, - preHookResult.context, - ); - } - const replyResult = await replyResolver( { - Body: heartbeatPrompt, + Body: HEARTBEAT_PROMPT, From: to, To: to, MessageSid: undefined, }, - undefined, + { isHeartbeat: true }, ); + const replyPayload = Array.isArray(replyResult) + ? replyResult[0] + : replyResult; + if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) + !replyPayload || + (!replyPayload.text && + !replyPayload.mediaUrl && + !replyPayload.mediaUrls?.length) ) { logInfo("heartbeat skipped: empty reply", runtime); return; } const hasMedia = Boolean( - replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0, + replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0, ); - const stripped = stripHeartbeatToken(replyResult.text); + const stripped = stripHeartbeatToken(replyPayload.text); if (stripped.shouldSkip && !hasMedia) { logInfo(success("heartbeat: ok (HEARTBEAT_OK)"), runtime); return; } - const finalText = stripped.text || replyResult.text || ""; + const finalText = stripped.text || replyPayload.text || ""; if (dryRun) { logInfo( `[dry-run] heartbeat -> ${to}: ${finalText.slice(0, 200)}`, diff --git a/src/twilio/monitor.test.ts b/src/twilio/monitor.test.ts index f94339214..76b6a8527 100644 --- a/src/twilio/monitor.test.ts +++ b/src/twilio/monitor.test.ts @@ -1,41 +1,8 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; -import { - monitorTwilio, - resetSeenMessageSids, - resolveHeartbeatRecipient, -} from "./monitor.js"; - -// Base mock deps factory -function createMockDeps(overrides: Record = {}) { - return { - autoReplyIfConfigured: vi.fn().mockResolvedValue(undefined), - listRecentMessages: vi.fn().mockResolvedValue([]), - readEnv: vi.fn(() => ({ - accountSid: "AC", - whatsappFrom: "whatsapp:+1", - auth: { accountSid: "AC", authToken: "t" }, - })), - createClient: vi.fn(() => ({ messages: { create: vi.fn() } }) as never), - sleep: vi.fn().mockResolvedValue(undefined), - loadConfig: vi.fn(() => ({})), - runTwilioHeartbeatOnce: vi.fn().mockResolvedValue(undefined), - getQueueSize: vi.fn(() => 0), - ...overrides, - }; -} +import { monitorTwilio } from "./monitor.js"; describe("monitorTwilio", () => { - beforeEach(() => { - resetSeenMessageSids(); - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - vi.clearAllMocks(); - }); - it("processes inbound messages once with injected deps", async () => { const listRecentMessages = vi.fn().mockResolvedValue([ { @@ -50,300 +17,29 @@ describe("monitorTwilio", () => { status: null, }, ]); + const autoReplyIfConfigured = vi.fn().mockResolvedValue(undefined); + const readEnv = vi.fn(() => ({ + accountSid: "AC", + whatsappFrom: "whatsapp:+1", + auth: { accountSid: "AC", authToken: "t" }, + })); + const createClient = vi.fn( + () => ({ messages: { create: vi.fn() } }) as never, + ); + const sleep = vi.fn().mockResolvedValue(undefined); - const deps = createMockDeps({ listRecentMessages }); - - const monitorPromise = monitorTwilio(0, 0, { - deps, + await monitorTwilio(0, 0, { + deps: { + autoReplyIfConfigured, + listRecentMessages, + readEnv, + createClient, + sleep, + }, maxIterations: 1, }); - // Advance timers to complete the iteration - await vi.runAllTimersAsync(); - await monitorPromise; - expect(listRecentMessages).toHaveBeenCalledTimes(1); - expect(deps.autoReplyIfConfigured).toHaveBeenCalledTimes(1); - }); - - describe("heartbeat timer setup", () => { - it("sets up heartbeat timer when heartbeatMinutes is configured", async () => { - const setIntervalSpy = vi.spyOn(global, "setInterval"); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - allowFrom: ["+15551234567"], // Provide a recipient - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 1, - }, - }, - })), - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - }); - - await vi.runAllTimersAsync(); - await monitorPromise; - - // Heartbeat timer should have been set up with 60000ms (1 minute) interval - expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 60_000); - }); - - it("does not set up heartbeat timer when heartbeatMinutes is 0", async () => { - const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 0, - }, - }, - })), - runTwilioHeartbeatOnce, - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - }); - - // Advance timer past what would be the heartbeat interval - await vi.advanceTimersByTimeAsync(60_000); - await vi.runAllTimersAsync(); - await monitorPromise; - - // Heartbeat should not have been triggered - expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled(); - }); - }); - - describe("heartbeat immediate (heartbeatNow)", () => { - it("runs immediate heartbeat when heartbeatNow is true", async () => { - const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - allowFrom: ["+15551234567"], - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 10, - }, - }, - })), - runTwilioHeartbeatOnce, - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - heartbeatNow: true, - }); - - // Run immediate timer callbacks (for the immediate heartbeat) - await vi.runAllTimersAsync(); - await monitorPromise; - - // Heartbeat should have been called immediately - expect(runTwilioHeartbeatOnce).toHaveBeenCalled(); - }); - }); - - describe("heartbeat skips when busy", () => { - it("skips heartbeat when command queue is busy", async () => { - const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - allowFrom: ["+15551234567"], - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 1, - }, - }, - })), - runTwilioHeartbeatOnce, - getQueueSize: vi.fn(() => 1), // Queue is busy - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - heartbeatNow: true, - }); - - await vi.runAllTimersAsync(); - await monitorPromise; - - // Heartbeat should NOT have been called because queue is busy - expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled(); - }); - }); - - describe("heartbeat error handling", () => { - it("catches heartbeat errors without crashing", async () => { - const runtimeError = vi.fn(); - const runTwilioHeartbeatOnce = vi - .fn() - .mockRejectedValue(new Error("Heartbeat failed")); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - allowFrom: ["+15551234567"], - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 1, - }, - }, - })), - runTwilioHeartbeatOnce, - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - heartbeatNow: true, - runtime: { - log: vi.fn(), - error: runtimeError, - exit: vi.fn() as unknown as (code: number) => never, - }, - }); - - await vi.runAllTimersAsync(); - await monitorPromise; - - // Should have logged error but not crashed - expect(runtimeError).toHaveBeenCalledWith( - expect.stringContaining("Heartbeat failed"), - ); - }); - }); - - describe("heartbeat idle time check", () => { - it("skips heartbeat when not idle long enough", async () => { - const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined); - const nowMs = Date.now(); - const recentInboundTime = nowMs - 2 * 60_000; // 2 minutes ago - - const deps = createMockDeps({ - listRecentMessages: vi.fn().mockResolvedValue([ - { - sid: "m1", - direction: "inbound", - dateCreated: new Date(recentInboundTime), - from: "+15559999999", - to: "+15551234567", - body: "hi", - errorCode: null, - errorMessage: null, - status: null, - }, - ]), - loadConfig: vi.fn(() => ({ - inbound: { - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 1, - session: { - heartbeatIdleMinutes: 5, // Require 5 min idle - }, - }, - }, - })), - runTwilioHeartbeatOnce, - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 2, - }); - - // Advance to trigger heartbeat (60s) - await vi.advanceTimersByTimeAsync(60_000); - await vi.runAllTimersAsync(); - await monitorPromise; - - // Heartbeat should be skipped because last inbound was only 2 min ago (< 5 min) - expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled(); - }); - }); - - describe("timer cleanup", () => { - it("clears heartbeat timer on exit", async () => { - const clearIntervalSpy = vi.spyOn(global, "clearInterval"); - const deps = createMockDeps({ - loadConfig: vi.fn(() => ({ - inbound: { - reply: { - mode: "command" as const, - command: ["echo", "test"], - heartbeatMinutes: 1, - }, - }, - })), - }); - - const monitorPromise = monitorTwilio(5, 5, { - deps, - maxIterations: 1, - }); - - await vi.runAllTimersAsync(); - await monitorPromise; - - // clearInterval should have been called for cleanup - expect(clearIntervalSpy).toHaveBeenCalled(); - }); - }); -}); - -describe("resolveHeartbeatRecipient", () => { - it("returns lastInboundFrom when available", () => { - const cfg = { inbound: { allowFrom: ["+15551234567"] } }; - const result = resolveHeartbeatRecipient(cfg, "whatsapp:+15559999999"); - expect(result).toBe("+15559999999"); - }); - - it("strips whatsapp: prefix from lastInboundFrom", () => { - const cfg = {}; - const result = resolveHeartbeatRecipient(cfg, "whatsapp:+15559999999"); - expect(result).toBe("+15559999999"); - }); - - it("falls back to first non-wildcard allowFrom entry", () => { - const cfg = { - inbound: { allowFrom: ["*", "+15551234567", "+15552222222"] }, - }; - const result = resolveHeartbeatRecipient(cfg, undefined); - expect(result).toBe("+15551234567"); - }); - - it("returns null when allowFrom is empty", () => { - const cfg = { inbound: { allowFrom: [] } }; - const result = resolveHeartbeatRecipient(cfg, undefined); - expect(result).toBeNull(); - }); - - it("returns null when allowFrom contains only wildcards", () => { - const cfg = { inbound: { allowFrom: ["*"] } }; - const result = resolveHeartbeatRecipient(cfg, undefined); - expect(result).toBeNull(); - }); - - it("returns null when no allowFrom and no lastInboundFrom", () => { - const cfg = {}; - const result = resolveHeartbeatRecipient(cfg, undefined); - expect(result).toBeNull(); + expect(autoReplyIfConfigured).toHaveBeenCalledTimes(1); }); }); diff --git a/src/twilio/monitor.ts b/src/twilio/monitor.ts index 579ae62b8..58bae951f 100644 --- a/src/twilio/monitor.ts +++ b/src/twilio/monitor.ts @@ -1,15 +1,11 @@ import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js"; import { autoReplyIfConfigured } from "../auto-reply/reply.js"; -import { loadConfig, type WarelayConfig } from "../config/config.js"; import { readEnv } from "../env.js"; import { danger } from "../globals.js"; import { logDebug, logInfo, logWarn } from "../logger.js"; -import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; -import { normalizeE164, sleep, withWhatsAppPrefix } from "../utils.js"; -import { resolveReplyHeartbeatMinutes } from "../web/auto-reply.js"; +import { sleep, withWhatsAppPrefix } from "../utils.js"; import { createClient } from "./client.js"; -import { runTwilioHeartbeatOnce } from "./heartbeat.js"; type MonitorDeps = { autoReplyIfConfigured: typeof autoReplyIfConfigured; @@ -21,10 +17,6 @@ type MonitorDeps = { readEnv: typeof readEnv; createClient: typeof createClient; sleep: typeof sleep; - // Heartbeat dependencies - loadConfig: typeof loadConfig; - runTwilioHeartbeatOnce: typeof runTwilioHeartbeatOnce; - getQueueSize: typeof getQueueSize; }; const DEFAULT_POLL_INTERVAL_SECONDS = 5; @@ -46,9 +38,6 @@ type MonitorOptions = { maxIterations?: number; deps?: MonitorDeps; runtime?: RuntimeEnv; - // Heartbeat options - heartbeatNow?: boolean; // Run heartbeat immediately on start - heartbeatMinutes?: number; // Override config value }; const defaultDeps: MonitorDeps = { @@ -57,104 +46,8 @@ const defaultDeps: MonitorDeps = { readEnv, createClient, sleep, - // Heartbeat deps - loadConfig, - runTwilioHeartbeatOnce, - getQueueSize, }; -// Lightweight mutex for serializing heartbeat and auto-reply -let inFlightLock: Promise = Promise.resolve(); - -function acquireLock(): Promise<() => void> { - let release: (() => void) | undefined; - const prev = inFlightLock; - inFlightLock = new Promise((resolve) => { - release = resolve; - }); - return prev.then(() => { - if (!release) throw new Error("Lock release function not set"); - return release; - }); -} - -// Resolve recipient for heartbeat: uses lastInboundFrom or first non-wildcard allowFrom entry -function resolveHeartbeatRecipient( - cfg: WarelayConfig, - lastInboundFrom: string | undefined, -): string | null { - // Prefer last inbound sender - if (lastInboundFrom) { - // Strip whatsapp: prefix if present - const cleaned = lastInboundFrom.replace(/^whatsapp:/, ""); - return normalizeE164(cleaned); - } - // Fall back to first non-wildcard allowFrom entry - const allowFrom = cfg.inbound?.allowFrom ?? []; - const nonWildcard = allowFrom.filter((v) => v !== "*"); - if (nonWildcard.length > 0 && nonWildcard[0]) { - return normalizeE164(nonWildcard[0]); - } - return null; -} - -// State tracking for heartbeat -type HeartbeatState = { - lastInboundFrom: string | undefined; - lastInboundAt: number | undefined; -}; - -// Run heartbeat once with serialization -async function runTwilioHeartbeatLoop(params: { - deps: MonitorDeps; - runtime: RuntimeEnv; - cfg: WarelayConfig; - state: HeartbeatState; -}) { - const { deps, runtime, cfg, state } = params; - - const release = await acquireLock(); - try { - // Check if command queue is busy - if (deps.getQueueSize() > 0) { - logInfo("heartbeat: skipped (requests in flight)", runtime); - return; - } - - const recipient = resolveHeartbeatRecipient(cfg, state.lastInboundFrom); - if (!recipient) { - logInfo( - "heartbeat: skipped (no recipient - configure allowFrom or wait for inbound)", - runtime, - ); - return; - } - - // Check idle time threshold if configured - const idleMinutes = - cfg.inbound?.reply?.session?.heartbeatIdleMinutes ?? - cfg.inbound?.reply?.session?.idleMinutes; - if (idleMinutes && state.lastInboundAt) { - const idleMs = Date.now() - state.lastInboundAt; - if (idleMs < idleMinutes * 60_000) { - logInfo( - `heartbeat: skipped (idle ${Math.floor(idleMs / 60_000)}m < ${idleMinutes}m)`, - runtime, - ); - return; - } - } - - await deps.runTwilioHeartbeatOnce({ - to: recipient, - runtime, - cfg, - }); - } finally { - release(); - } -} - // Poll Twilio for inbound messages and auto-reply when configured. export async function monitorTwilio( pollSeconds: number, @@ -169,121 +62,44 @@ export async function monitorTwilio( const env = deps.readEnv(runtime); const from = withWhatsAppPrefix(env.whatsappFrom); const client = opts?.client ?? deps.createClient(env); - - // Load config and resolve heartbeat minutes - const cfg = deps.loadConfig(); - const heartbeatMinutes = resolveReplyHeartbeatMinutes( - cfg, - opts?.heartbeatMinutes, - ); - - // Heartbeat state tracking - const heartbeatState: HeartbeatState = { - lastInboundFrom: undefined, - lastInboundAt: undefined, - }; - let heartbeatTimer: NodeJS.Timeout | null = null; - - // Cleanup function for the heartbeat timer - const clearHeartbeatTimer = () => { - if (heartbeatTimer) { - clearInterval(heartbeatTimer); - heartbeatTimer = null; - } - }; - - // Log startup info - const heartbeatInfo = heartbeatMinutes - ? `Heartbeat: every ${heartbeatMinutes}m` - : "Heartbeat: disabled"; logInfo( - `📡 Monitoring inbound messages to ${from} (poll ${pollSeconds}s, lookback ${lookbackMinutes}m) | ${heartbeatInfo}`, + `📡 Monitoring inbound messages to ${from} (poll ${pollSeconds}s, lookback ${lookbackMinutes}m)`, runtime, ); - // Set up heartbeat timer if enabled - if (heartbeatMinutes) { - const intervalMs = heartbeatMinutes * 60_000; - heartbeatTimer = setInterval(() => { - void runTwilioHeartbeatLoop({ - deps, - runtime, - cfg, - state: heartbeatState, - }).catch((err) => { - runtime.error(danger(`Heartbeat error: ${String(err)}`)); - }); - }, intervalMs); - - // Run immediate heartbeat if requested - if (opts?.heartbeatNow) { - void runTwilioHeartbeatLoop({ - deps, - runtime, - cfg, - state: heartbeatState, - }).catch((err) => { - runtime.error(danger(`Immediate heartbeat error: ${String(err)}`)); - }); - } - } - let lastSeenSid: string | undefined; let iterations = 0; - - try { - while (iterations < maxIterations) { - let messages: ListedMessage[] = []; - try { - messages = - (await deps.listRecentMessages(lookbackMinutes, 50, client)) ?? []; - backoffMs = 1_000; // reset after success - } catch (err) { - logWarn( - `Twilio polling failed (will retry in ${backoffMs}ms): ${String(err)}`, - runtime, - ); - await deps.sleep(backoffMs); - backoffMs = Math.min(backoffMs * 2, 10_000); - continue; - } - const inboundOnly = messages.filter((m) => m.direction === "inbound"); - // Sort newest -> oldest without relying on external helpers (avoids test mocks clobbering imports). - const newestFirst = [...inboundOnly].sort( - (a, b) => - (b.dateCreated?.getTime() ?? 0) - (a.dateCreated?.getTime() ?? 0), - ); - - // Update heartbeat state from newest inbound message - if (newestFirst.length > 0 && newestFirst[0].from) { - heartbeatState.lastInboundFrom = newestFirst[0].from; - heartbeatState.lastInboundAt = newestFirst[0].dateCreated?.getTime(); - } - - await handleMessages(messages, client, lastSeenSid, deps, runtime); - lastSeenSid = newestFirst.length ? newestFirst[0].sid : lastSeenSid; - iterations += 1; - if (iterations >= maxIterations) break; - await deps.sleep( - Math.max(pollSeconds, DEFAULT_POLL_INTERVAL_SECONDS) * 1000, + while (iterations < maxIterations) { + let messages: ListedMessage[] = []; + try { + messages = + (await deps.listRecentMessages(lookbackMinutes, 50, client)) ?? []; + backoffMs = 1_000; // reset after success + } catch (err) { + logWarn( + `Twilio polling failed (will retry in ${backoffMs}ms): ${String(err)}`, + runtime, ); + await deps.sleep(backoffMs); + backoffMs = Math.min(backoffMs * 2, 10_000); + continue; } - } finally { - clearHeartbeatTimer(); + const inboundOnly = messages.filter((m) => m.direction === "inbound"); + // Sort newest -> oldest without relying on external helpers (avoids test mocks clobbering imports). + const newestFirst = [...inboundOnly].sort( + (a, b) => + (b.dateCreated?.getTime() ?? 0) - (a.dateCreated?.getTime() ?? 0), + ); + await handleMessages(messages, client, lastSeenSid, deps, runtime); + lastSeenSid = newestFirst.length ? newestFirst[0].sid : lastSeenSid; + iterations += 1; + if (iterations >= maxIterations) break; + await deps.sleep( + Math.max(pollSeconds, DEFAULT_POLL_INTERVAL_SECONDS) * 1000, + ); } } -// Track all seen message SIDs to avoid re-processing -const seenMessageSids = new Set(); - -// Export for testing - reset seen message SIDs between test runs -export function resetSeenMessageSids() { - seenMessageSids.clear(); -} - -// Export for testing -export { resolveHeartbeatRecipient }; - async function handleMessages( messages: ListedMessage[], client: ReturnType, @@ -293,16 +109,6 @@ async function handleMessages( ) { for (const m of messages) { if (!m.sid) continue; - // Skip messages we've already seen/logged - if (seenMessageSids.has(m.sid)) continue; - seenMessageSids.add(m.sid); - // Limit set size to prevent memory leak - if (seenMessageSids.size > 1000) { - const oldestSids = Array.from(seenMessageSids).slice(0, 500); - for (const sid of oldestSids) { - seenMessageSids.delete(sid); - } - } if (lastSeenSid && m.sid === lastSeenSid) break; // stop at previously seen logDebug(`[${m.sid}] ${m.from ?? "?"} -> ${m.to ?? "?"}: ${m.body ?? ""}`); if (m.direction !== "inbound") continue; diff --git a/src/twilio/webhook.ts b/src/twilio/webhook.ts index 51a1b90f4..585c0c289 100644 --- a/src/twilio/webhook.ts +++ b/src/twilio/webhook.ts @@ -69,7 +69,7 @@ export async function startWebhook( } const client = createClient(env); - let replyResult: ReplyPayload | undefined = + let replyResult: ReplyPayload | ReplyPayload[] | undefined = autoReply !== undefined ? { text: autoReply } : undefined; if (!replyResult) { replyResult = await getReplyFromConfig( @@ -83,14 +83,20 @@ export async function startWebhook( MediaType: mediaType, }, { - onReplyStart: () => sendTypingIndicator(client, runtime, MessageSid), + onReplyStart: async () => { + await sendTypingIndicator(client, runtime, MessageSid); + }, }, ); } - if (replyResult && (replyResult.text || replyResult.mediaUrl)) { + const replyPayload = Array.isArray(replyResult) + ? replyResult[0] + : replyResult; + + if (replyPayload && (replyPayload.text || replyPayload.mediaUrl)) { try { - let mediaUrl = replyResult.mediaUrl; + let mediaUrl = replyPayload.mediaUrl; if (mediaUrl && !/^https?:\/\//i.test(mediaUrl)) { const hosted = await mediaHost.ensureMediaHosted(mediaUrl); mediaUrl = hosted.url; @@ -98,7 +104,7 @@ export async function startWebhook( await client.messages.create({ from: To, to: From, - body: replyResult.text ?? "", + body: replyPayload.text ?? "", ...(mediaUrl ? { mediaUrl: [mediaUrl] } : {}), }); if (verbose) diff --git a/src/web/auto-reply.ts b/src/web/auto-reply.ts index 75eeb7578..24de637b4 100644 --- a/src/web/auto-reply.ts +++ b/src/web/auto-reply.ts @@ -1,7 +1,4 @@ -import { - buildHeartbeatPrompt, - runHeartbeatPreHook, -} from "../auto-reply/heartbeat-prehook.js"; +import { chunkText } from "../auto-reply/chunk.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { waitForever } from "../cli/wait.js"; @@ -16,7 +13,7 @@ import { import { danger, info, isVerbose, logVerbose, success } from "../globals.js"; import { logInfo } from "../logger.js"; import { getChildLogger } from "../logging.js"; -import { enqueueCommand, getQueueSize } from "../process/command-queue.js"; +import { getQueueSize } from "../process/command-queue.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeE164 } from "../utils.js"; import { monitorWebInbox } from "./inbound.js"; @@ -33,6 +30,8 @@ import { } from "./reconnect.js"; import { getWebAuthAgeMs } from "./session.js"; +const WEB_TEXT_LIMIT = 4000; + /** * Send a message via IPC if relay is running, otherwise fall back to direct. * This avoids Signal session corruption from multiple Baileys connections. @@ -73,7 +72,7 @@ const formatDuration = (ms: number) => const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30; export const HEARTBEAT_TOKEN = "HEARTBEAT_OK"; -export const HEARTBEAT_PROMPT = "HEARTBEAT ultrathink"; +export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high"; export function resolveReplyHeartbeatMinutes( cfg: ReturnType, @@ -109,7 +108,6 @@ export async function runWebHeartbeatOnce(opts: { sessionId?: string; overrideBody?: string; dryRun?: boolean; - skipPreHook?: boolean; }) { const { cfg: cfgOverride, @@ -118,7 +116,6 @@ export async function runWebHeartbeatOnce(opts: { sessionId, overrideBody, dryRun = false, - skipPreHook = false, } = opts; const _runtime = opts.runtime ?? defaultRuntime; const replyResolver = opts.replyResolver ?? getReplyFromConfig; @@ -187,51 +184,25 @@ export async function runWebHeartbeatOnce(opts: { return; } - // Run pre-hook unless skipped or overrideBody provided - let heartbeatPrompt = HEARTBEAT_PROMPT; - if (!skipPreHook) { - const preHookResult = await runHeartbeatPreHook(cfg); - if (preHookResult.error) { - heartbeatLogger.warn( - { - to, - error: preHookResult.error, - durationMs: preHookResult.durationMs, - timedOut: preHookResult.timedOut, - }, - "heartbeat pre-hook failed (continuing with basic heartbeat)", - ); - } else if (preHookResult.context) { - heartbeatLogger.info( - { - to, - contextLength: preHookResult.context.length, - durationMs: preHookResult.durationMs, - }, - "heartbeat pre-hook succeeded", - ); - } - heartbeatPrompt = buildHeartbeatPrompt( - HEARTBEAT_PROMPT, - preHookResult.context, - ); - } - const replyResult = await replyResolver( { - Body: heartbeatPrompt, + Body: HEARTBEAT_PROMPT, From: to, To: to, MessageSid: sessionId ?? sessionSnapshot.entry?.sessionId, }, - undefined, + { isHeartbeat: true }, cfg, ); + const replyPayload = Array.isArray(replyResult) + ? replyResult[0] + : replyResult; + if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) + !replyPayload || + (!replyPayload.text && + !replyPayload.mediaUrl && + !replyPayload.mediaUrls?.length) ) { heartbeatLogger.info( { @@ -246,9 +217,9 @@ export async function runWebHeartbeatOnce(opts: { } const hasMedia = Boolean( - replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0, + replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0, ); - const stripped = stripHeartbeatToken(replyResult.text); + const stripped = stripHeartbeatToken(replyPayload.text); if (stripped.shouldSkip && !hasMedia) { // Don't let heartbeats keep sessions alive: restore previous updatedAt so idle expiry still works. const sessionCfg = cfg.inbound?.reply?.session; @@ -260,7 +231,7 @@ export async function runWebHeartbeatOnce(opts: { } heartbeatLogger.info( - { to, reason: "heartbeat-token", rawLength: replyResult.text?.length }, + { to, reason: "heartbeat-token", rawLength: replyPayload.text?.length }, "heartbeat skipped", ); console.log(success("heartbeat: ok (HEARTBEAT_OK)")); @@ -274,7 +245,7 @@ export async function runWebHeartbeatOnce(opts: { ); } - const finalText = stripped.text || replyResult.text || ""; + const finalText = stripped.text || replyPayload.text || ""; if (dryRun) { heartbeatLogger.info( { to, reason: "dry-run", chars: finalText.length }, @@ -407,14 +378,18 @@ async function deliverWebReply(params: { skipLog, } = params; const replyStarted = Date.now(); + const textChunks = chunkText(replyResult.text || "", WEB_TEXT_LIMIT); const mediaList = replyResult.mediaUrls?.length ? replyResult.mediaUrls : replyResult.mediaUrl ? [replyResult.mediaUrl] : []; - if (mediaList.length === 0 && replyResult.text) { - await msg.reply(replyResult.text || ""); + // Text-only replies + if (mediaList.length === 0 && textChunks.length) { + for (const chunk of textChunks) { + await msg.reply(chunk); + } if (!skipLog) { logInfo( `✅ Sent web reply to ${msg.from} (${(Date.now() - replyStarted).toFixed(0)}ms)`, @@ -438,8 +413,12 @@ async function deliverWebReply(params: { return; } - const cleanText = replyResult.text ?? undefined; + const remainingText = [...textChunks]; + + // Media (with optional caption on first item) for (const [index, mediaUrl] of mediaList.entries()) { + const caption = + index === 0 ? remainingText.shift() || undefined : undefined; try { const media = await loadWebMedia(mediaUrl, maxMediaBytes); if (isVerbose()) { @@ -450,7 +429,6 @@ async function deliverWebReply(params: { `Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`, ); } - const caption = index === 0 ? cleanText || undefined : undefined; if (media.kind === "image") { await msg.sendMedia({ image: media.buffer, @@ -490,7 +468,7 @@ async function deliverWebReply(params: { connectionId: connectionId ?? null, to: msg.from, from: msg.to, - text: index === 0 ? (cleanText ?? null) : null, + text: caption ?? null, mediaUrl, mediaSizeBytes: media.buffer.length, mediaKind: media.kind, @@ -502,12 +480,21 @@ async function deliverWebReply(params: { console.error( danger(`Failed sending web media to ${msg.from}: ${String(err)}`), ); - if (index === 0 && cleanText) { - console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`); - await msg.reply(cleanText || ""); + replyLogger.warn({ err, mediaUrl }, "failed to send web media reply"); + if (index === 0) { + const fallbackText = remainingText.shift() ?? caption ?? ""; + if (fallbackText) { + console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`); + await msg.reply(fallbackText); + } } } } + + // Remaining text chunks after media + for (const chunk of remainingText) { + await msg.reply(chunk); + } } export async function monitorWebProvider( @@ -657,84 +644,85 @@ export async function monitorWebProvider( : new Date().toISOString(); console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`); - const replyResult = await enqueueCommand(() => - (replyResolver ?? getReplyFromConfig)( - { - Body: combinedBody, - From: latest.from, - To: latest.to, - MessageSid: latest.id, - MediaPath: latest.mediaPath, - MediaUrl: latest.mediaUrl, - MediaType: latest.mediaType, - }, - { - onReplyStart: latest.sendComposing, - }, - ), + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: combinedBody, + From: latest.from, + To: latest.to, + MessageSid: latest.id, + MediaPath: latest.mediaPath, + MediaUrl: latest.mediaUrl, + MediaType: latest.mediaType, + }, + { + onReplyStart: latest.sendComposing, + }, ); - if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) - ) { + const replyList = replyResult + ? Array.isArray(replyResult) + ? replyResult + : [replyResult] + : []; + + if (replyList.length === 0) { logVerbose("Skipping auto-reply: no text/media returned from resolver"); return; } // Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match) const responsePrefix = cfg.inbound?.responsePrefix; - if ( - responsePrefix && - replyResult.text && - replyResult.text.trim() !== HEARTBEAT_TOKEN - ) { - if (!replyResult.text.startsWith(responsePrefix)) { - replyResult.text = `${responsePrefix} ${replyResult.text}`; + + for (const replyPayload of replyList) { + if ( + responsePrefix && + replyPayload.text && + replyPayload.text.trim() !== HEARTBEAT_TOKEN && + !replyPayload.text.startsWith(responsePrefix) + ) { + replyPayload.text = `${responsePrefix} ${replyPayload.text}`; } - } - try { - await deliverWebReply({ - replyResult, - msg: latest, - maxMediaBytes, - replyLogger, - runtime, - connectionId, - }); + try { + await deliverWebReply({ + replyResult: replyPayload, + msg: latest, + maxMediaBytes, + replyLogger, + runtime, + connectionId, + }); - if (replyResult.text) { - recentlySent.add(replyResult.text); - recentlySent.add(combinedBody); // Prevent echo on the batch text itself - logVerbose( - `Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`, - ); - if (recentlySent.size > MAX_RECENT_MESSAGES) { - const firstKey = recentlySent.values().next().value; - if (firstKey) recentlySent.delete(firstKey); + if (replyPayload.text) { + recentlySent.add(replyPayload.text); + recentlySent.add(combinedBody); // Prevent echo on the batch text itself + logVerbose( + `Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`, + ); + if (recentlySent.size > MAX_RECENT_MESSAGES) { + const firstKey = recentlySent.values().next().value; + if (firstKey) recentlySent.delete(firstKey); + } } - } - if (isVerbose()) { - console.log( - success( - `↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`, - ), - ); - } else { - console.log( - success( - `↩️ ${replyResult.text ?? ""}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`, - ), + if (isVerbose()) { + console.log( + success( + `↩️ Auto-replied to ${from} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`, + ), + ); + } else { + console.log( + success( + `↩️ ${replyPayload.text ?? ""}${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? " (media)" : ""}`, + ), + ); + } + } catch (err) { + console.error( + danger(`Failed sending web auto-reply to ${from}: ${String(err)}`), ); } - } catch (err) { - console.error( - danger(`Failed sending web auto-reply to ${from}: ${String(err)}`), - ); } }; @@ -967,59 +955,31 @@ export async function monitorWebProvider( "reply heartbeat start", ); } - - // Run pre-hook to gather context - const preHookResult = await runHeartbeatPreHook(cfg); - if (preHookResult.error) { - heartbeatLogger.warn( - { - connectionId, - error: preHookResult.error, - durationMs: preHookResult.durationMs, - timedOut: preHookResult.timedOut, - }, - "heartbeat pre-hook failed (continuing with basic heartbeat)", - ); - } else if (preHookResult.context) { - heartbeatLogger.info( - { - connectionId, - contextLength: preHookResult.context.length, - durationMs: preHookResult.durationMs, - }, - "heartbeat pre-hook succeeded", - ); - } - const heartbeatPrompt = buildHeartbeatPrompt( - HEARTBEAT_PROMPT, - preHookResult.context, + const replyResult = await (replyResolver ?? getReplyFromConfig)( + { + Body: HEARTBEAT_PROMPT, + From: lastInboundMsg.from, + To: lastInboundMsg.to, + MessageSid: snapshot.entry?.sessionId, + MediaPath: undefined, + MediaUrl: undefined, + MediaType: undefined, + }, + { + onReplyStart: lastInboundMsg.sendComposing, + isHeartbeat: true, + }, ); - const hbFrom = lastInboundMsg.from; - const hbTo = lastInboundMsg.to; - const hbComposing = lastInboundMsg.sendComposing; - const replyResult = await enqueueCommand(() => - (replyResolver ?? getReplyFromConfig)( - { - Body: heartbeatPrompt, - From: hbFrom, - To: hbTo, - MessageSid: snapshot.entry?.sessionId, - MediaPath: undefined, - MediaUrl: undefined, - MediaType: undefined, - }, - { - onReplyStart: hbComposing, - }, - ), - ); + const replyPayload = Array.isArray(replyResult) + ? replyResult[0] + : replyResult; if ( - !replyResult || - (!replyResult.text && - !replyResult.mediaUrl && - !replyResult.mediaUrls?.length) + !replyPayload || + (!replyPayload.text && + !replyPayload.mediaUrl && + !replyPayload.mediaUrls?.length) ) { heartbeatLogger.info( { @@ -1033,9 +993,9 @@ export async function monitorWebProvider( return; } - const stripped = stripHeartbeatToken(replyResult.text); + const stripped = stripHeartbeatToken(replyPayload.text); const hasMedia = Boolean( - replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0, + replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0, ); if (stripped.shouldSkip && !hasMedia) { heartbeatLogger.info( @@ -1043,7 +1003,7 @@ export async function monitorWebProvider( connectionId, durationMs: Date.now() - tickStart, reason: "heartbeat-token", - rawLength: replyResult.text?.length ?? 0, + rawLength: replyPayload.text?.length ?? 0, }, "reply heartbeat skipped", ); @@ -1063,7 +1023,7 @@ export async function monitorWebProvider( } const cleanedReply: ReplyPayload = { - ...replyResult, + ...replyPayload, text: finalText, }; diff --git a/src/web/inbound.media.test.ts b/src/web/inbound.media.test.ts index f0205b27e..b8d60f0af 100644 --- a/src/web/inbound.media.test.ts +++ b/src/web/inbound.media.test.ts @@ -100,7 +100,12 @@ describe("web inbound media saves with extension", () => { }; realSock.ev.emit("messages.upsert", upsert); - await new Promise((resolve) => setTimeout(resolve, 5)); + + // Allow a brief window for the async handler to fire on slower runners. + for (let i = 0; i < 10; i++) { + if (onMessage.mock.calls.length > 0) break; + await new Promise((resolve) => setTimeout(resolve, 5)); + } expect(onMessage).toHaveBeenCalledTimes(1); const msg = onMessage.mock.calls[0][0]; diff --git a/src/web/media.ts b/src/web/media.ts index 76c0e8605..83515704d 100644 --- a/src/web/media.ts +++ b/src/web/media.ts @@ -1,4 +1,5 @@ import fs from "node:fs/promises"; +import path from "node:path"; import sharp from "sharp"; import { isVerbose, logVerbose } from "../globals.js"; @@ -12,7 +13,12 @@ import { detectMime } from "../media/mime.js"; export async function loadWebMedia( mediaUrl: string, maxBytes?: number, -): Promise<{ buffer: Buffer; contentType?: string; kind: MediaKind }> { +): Promise<{ + buffer: Buffer; + contentType?: string; + kind: MediaKind; + fileName?: string; +}> { if (mediaUrl.startsWith("file://")) { mediaUrl = mediaUrl.replace("file://", ""); } @@ -40,6 +46,14 @@ export async function loadWebMedia( }; if (/^https?:\/\//i.test(mediaUrl)) { + let fileName: string | undefined; + try { + const url = new URL(mediaUrl); + const base = path.basename(url.pathname); + fileName = base || undefined; + } catch { + // ignore parse errors; leave undefined + } const res = await fetch(mediaUrl); if (!res.ok || !res.body) { throw new Error(`Failed to fetch media: HTTP ${res.status}`); @@ -56,7 +70,7 @@ export async function loadWebMedia( maxBytesForKind(kind), ); if (kind === "image") { - return optimizeAndClampImage(array, cap); + return { ...(await optimizeAndClampImage(array, cap)), fileName }; } if (array.length > cap) { throw new Error( @@ -65,19 +79,25 @@ export async function loadWebMedia( ).toFixed(2)}MB)`, ); } - return { buffer: array, contentType: contentType ?? undefined, kind }; + return { + buffer: array, + contentType: contentType ?? undefined, + kind, + fileName, + }; } // Local path const data = await fs.readFile(mediaUrl); const mime = detectMime({ buffer: data, filePath: mediaUrl }); const kind = mediaKindFromMime(mime); + const fileName = path.basename(mediaUrl) || undefined; const cap = Math.min( maxBytes ?? maxBytesForKind(kind), maxBytesForKind(kind), ); if (kind === "image") { - return optimizeAndClampImage(data, cap); + return { ...(await optimizeAndClampImage(data, cap)), fileName }; } if (data.length > cap) { throw new Error( @@ -86,7 +106,7 @@ export async function loadWebMedia( ).toFixed(2)}MB)`, ); } - return { buffer: data, contentType: mime, kind }; + return { buffer: data, contentType: mime, kind, fileName }; } export async function optimizeImageToJpeg( diff --git a/src/web/outbound.test.ts b/src/web/outbound.test.ts index 2e4abecd2..6b8132358 100644 --- a/src/web/outbound.test.ts +++ b/src/web/outbound.test.ts @@ -1,3 +1,4 @@ +import type { AnyMessageContent } from "@whiskeysockets/baileys"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { resetLogger, setLoggerOverride } from "../logging.js"; @@ -17,6 +18,11 @@ vi.mock("./session.js", () => { }; }); +const loadWebMediaMock = vi.fn(); +vi.mock("./media.js", () => ({ + loadWebMedia: (...args: unknown[]) => loadWebMediaMock(...args), +})); + import { sendMessageWeb } from "./outbound.js"; const { createWaSocket } = await import("./session.js"); @@ -37,4 +43,98 @@ describe("web outbound", () => { expect(sock.sendMessage).toHaveBeenCalled(); expect(sock.ws.close).toHaveBeenCalled(); }); + + it("maps audio to PTT with opus mime when ogg", async () => { + const buf = Buffer.from("audio"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "audio/ogg", + kind: "audio", + }); + await sendMessageWeb("+1555", "voice note", { + verbose: false, + mediaUrl: "/tmp/voice.ogg", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + audio: buf, + ptt: true, + mimetype: "audio/ogg; codecs=opus", + }); + }); + + it("maps video with caption", async () => { + const buf = Buffer.from("video"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "video/mp4", + kind: "video", + }); + await sendMessageWeb("+1555", "clip", { + verbose: false, + mediaUrl: "/tmp/video.mp4", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + video: buf, + caption: "clip", + mimetype: "video/mp4", + }); + }); + + it("maps image with caption", async () => { + const buf = Buffer.from("img"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "image/jpeg", + kind: "image", + }); + await sendMessageWeb("+1555", "pic", { + verbose: false, + mediaUrl: "/tmp/pic.jpg", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + image: buf, + caption: "pic", + mimetype: "image/jpeg", + }); + }); + + it("maps other kinds to document with filename", async () => { + const buf = Buffer.from("pdf"); + loadWebMediaMock.mockResolvedValueOnce({ + buffer: buf, + contentType: "application/pdf", + kind: "document", + fileName: "file.pdf", + }); + await sendMessageWeb("+1555", "doc", { + verbose: false, + mediaUrl: "/tmp/file.pdf", + }); + const sock = await createWaSocket(); + const [, payload] = sock.sendMessage.mock.calls.at(-1) as [ + string, + AnyMessageContent, + ]; + expect(payload).toMatchObject({ + document: buf, + fileName: "file.pdf", + caption: "doc", + mimetype: "application/pdf", + }); + }); }); diff --git a/src/web/outbound.ts b/src/web/outbound.ts index ac17ea5b9..6e1b3cf0c 100644 --- a/src/web/outbound.ts +++ b/src/web/outbound.ts @@ -35,11 +35,39 @@ export async function sendMessageWeb( let payload: AnyMessageContent = { text: body }; if (options.mediaUrl) { const media = await loadWebMedia(options.mediaUrl); - payload = { - image: media.buffer, - caption: body || undefined, - mimetype: media.contentType, - }; + const caption = body || undefined; + if (media.kind === "audio") { + // WhatsApp expects explicit opus codec for PTT voice notes. + const mimetype = + media.contentType === "audio/ogg" + ? "audio/ogg; codecs=opus" + : (media.contentType ?? "application/octet-stream"); + payload = { audio: media.buffer, ptt: true, mimetype }; + } else if (media.kind === "video") { + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + video: media.buffer, + caption, + mimetype, + }; + } else if (media.kind === "image") { + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + image: media.buffer, + caption, + mimetype, + }; + } else { + // Fallback to document for anything else (pdf, etc.). + const fileName = media.fileName ?? "file"; + const mimetype = media.contentType ?? "application/octet-stream"; + payload = { + document: media.buffer, + fileName, + caption, + mimetype, + }; + } } logInfo( `📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`,