Merge upstream/main: heartbeat refactoring, chunking, tau-rpc

Merged 30 commits from upstream (steipete/warelay) including:
- Heartbeat array handling and normalization
- Text chunking for Twilio/web replies
- Tau RPC process management
- Agent/session metadata logging
- Media cleanup and redirect handling
- Gemini agent support
- Simplified Twilio monitor (removed inline heartbeat)

Conflict resolutions:
- AGENTS.md: Kept fork-specific agent notes (tmux, --provider twilio)
- src/auto-reply/types.ts: Accepted upstream isHeartbeat flag
- src/web/auto-reply.ts: Accepted upstream chunking, removed heartbeat-prehook
- src/twilio/heartbeat.ts: Accepted upstream simplified version
- Fixed typing indicator return type (Promise<void> wrapper)

Deleted files (removed in upstream):
- src/auto-reply/heartbeat-prehook.ts
- src/auto-reply/heartbeat-prehook.test.ts
This commit is contained in:
Kyle Crommett 2025-12-02 23:08:54 -08:00
commit 1c64d5061f
42 changed files with 1540 additions and 1469 deletions

View File

@ -1,6 +1,18 @@
# Changelog
## Unreleased
## 1.3.2 — 2025-12-03
### Bug Fixes
- Tau/Pi RPC replies are now buffered until the assistant turn finishes and only completed assistant `message_end` events are emitted, preventing duplicate or partial WhatsApp messages.
- Command auto-replies return the parsed assistant texts array only (no deprecated `text` field), while preserving single-payload callers and keeping multi-message replies intact.
- WhatsApp Web auto-replies now fall back to sending the caption text if media delivery fails, so users still see a reply instead of silence.
- Outbound chunking now prefers newlines and word boundaries and only splits when exceeding platform limits, keeping multi-paragraph replies in a single message unless necessary.
- Heartbeat replies now normalize array payloads for both web and Twilio paths and safely handle optional `heartbeatCommand`, preventing TS build failures and runtime crashes when agents return multiple messages.
### Testing
- Updated agent and auto-reply parsers plus web media send fallbacks; test suite adjusted and now passing after the RPC/message handling refactors.
## 1.3.1 — 2025-12-02
### Security
- Hardened the relay IPC socket: now lives under `~/.warelay/ipc`, enforces 0700 dir / 0600 socket perms, rejects symlink or foreign-owned paths, and includes unit tests to lock in the behavior.
@ -8,6 +20,16 @@
- Logging now rolls daily to `/tmp/warelay/warelay-YYYY-MM-DD.log` (or custom dir) and prunes files older than 24h to reduce data retention.
- Media server now rejects symlinked files and ensures resolved paths stay inside the media directory, closing traversal via symlinks; added regression test. (Thanks @joaohlisboa)
### Performance
- Web auto-replies using the Pi agent now keep a single long-lived `tau` process in RPC mode instead of spawning per message, eliminating cold-start latency while preserving session/cwd handling.
### Bug Fixes
- Media downloads now follow up to 5 redirects and still derive MIME/extension from sniffed content or headers; added regression test for redirected downloads.
- Hosted media responses set `Content-Type` from sniffed MIME (not the file name) and still clean up single-use files after send.
- Claude system prompt is guaranteed to be included on the first session turn even when `sendSystemOnce` is enabled, while later turns stay system-free.
- Media server deletes served files right after the response finishes (instead of a fixed timeout) while keeping MIME sniffing and single-use semantics.
- Tau RPC result typing now exposes `signal`/`killed` fields so TS builds align with runtime data.
## 1.3.0 — 2025-12-02
### Highlights
@ -42,6 +64,11 @@
## Unreleased
### Fixed
- Support multiple assistant text replies when using Tau RPC: agents now emit `texts` arrays and command auto-replies deliver each message separately without leaking raw JSON.
- Normalized agent parsers (pi/claude/opencode/codex/gemini) to the new plural output shape.
- Enforce outbound text size caps: WhatsApp/Twilio messages chunked at 1600 chars; web replies chunked at 4000 chars.
### Changes
- **Heartbeat backpressure:** Web reply heartbeats now check the shared command queue and skip while any command/Claude runs are in flight, preventing concurrent prompts during long-running requests.
- **Isolated session fixtures in web tests:** Heartbeat/auto-reply tests now create temporary session stores instead of using the default `~/.warelay/sessions.json`, preventing local config pollution during test runs.

View File

@ -1,6 +1,6 @@
# Agent Abstraction Refactor Plan
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode, Gemini) cleanly, without legacy flags, and make parsing/injection per-agent. Keep WhatsApp/Twilio plumbing intact.
## Overview
- Introduce a pluggable agent layer (`src/agents/*`), selected by config.
@ -15,7 +15,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without
reply: {
mode: "command",
agent: {
kind: "claude" | "opencode" | "pi" | "codex",
kind: "claude" | "opencode" | "pi" | "codex" | "gemini",
format?: "text" | "json",
identityPrefix?: string
},
@ -42,6 +42,7 @@ Goal: support multiple agent CLIs (Claude, Codex, Pi, Opencode) cleanly, without
- `src/agents/opencode.ts` reuse `parseOpencodeJson` (from PR #5), inject `--format json`, session flag `--session` defaults, identity prefix.
- `src/agents/pi.ts` parse NDJSON `AssistantMessageEvent` (final `message_end.message.content[text]`), inject `--mode json`/`-p` defaults, session flags.
- `src/agents/codex.ts` parse Codex JSONL (last `item` with `type:"agent_message"`; usage from `turn.completed`), inject `codex exec --json --skip-git-repo-check`, sandbox default read-only.
- `src/agents/gemini.ts` minimal parsing (plain text), identity prepend, honors `--output-format` when `format` is set, and defaults to `--resume {{SessionId}}` for session resume (new sessions need no flag). Override `sessionArgNew/sessionArgResume` if you use a different session strategy.
- Shared MEDIA extraction stays in `media/parse.ts`.
## Command runner changes

View File

@ -4,6 +4,7 @@ import { CLAUDE_IDENTITY_PREFIX } from "../auto-reply/claude.js";
import { OPENCODE_IDENTITY_PREFIX } from "../auto-reply/opencode.js";
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { GEMINI_IDENTITY_PREFIX, geminiSpec } from "./gemini.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
@ -61,7 +62,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
'{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"hello world"}],"usage":{"input":10,"output":5},"model":"pi-1","provider":"inflection","stopReason":"end"}}',
].join("\n");
const parsed = piSpec.parseOutput(stdout);
expect(parsed.text).toBe("hello world");
expect(parsed.texts?.[0]).toBe("hello world");
expect(parsed.meta?.provider).toBe("inflection");
expect((parsed.meta?.usage as { output?: number })?.output).toBe(5);
});
@ -72,7 +73,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
'{"type":"turn.completed","usage":{"input_tokens":50,"output_tokens":10,"cached_input_tokens":5}}',
].join("\n");
const parsed = codexSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi there");
expect(parsed.texts?.[0]).toBe("hi there");
const usage = parsed.meta?.usage as {
input?: number;
output?: number;
@ -92,7 +93,7 @@ describe("agent buildArgs + parseOutput helpers", () => {
'{"type":"step_finish","timestamp":1200,"part":{"cost":0.002,"tokens":{"input":100,"output":20}}}',
].join("\n");
const parsed = opencodeSpec.parseOutput(stdout);
expect(parsed.text).toBe("hi");
expect(parsed.texts?.[0]).toBe("hi");
expect(parsed.meta?.extra?.summary).toContain("duration=1200ms");
expect(parsed.meta?.extra?.summary).toContain("cost=$0.0020");
expect(parsed.meta?.extra?.summary).toContain("tokens=100+20");
@ -115,4 +116,33 @@ describe("agent buildArgs + parseOutput helpers", () => {
expect(built).toContain("--skip-git-repo-check");
expect(built).toContain("read-only");
});
it("geminiSpec prepends identity unless already sent", () => {
const argv = ["gemini", "hi"];
const built = geminiSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: true,
sessionId: "sess",
sendSystemOnce: false,
systemSent: false,
identityPrefix: undefined,
format: "json",
});
expect(built.at(-1)).toContain(GEMINI_IDENTITY_PREFIX);
const builtOnce = geminiSpec.buildArgs({
argv,
bodyIndex: 1,
isNewSession: false,
sessionId: "sess",
sendSystemOnce: true,
systemSent: true,
identityPrefix: undefined,
format: "json",
});
expect(builtOnce.at(-1)).toBe("hi");
expect(builtOnce).toContain("--output-format");
expect(builtOnce).toContain("json");
});
});

View File

@ -12,7 +12,16 @@ import type { AgentMeta, AgentSpec } from "./types.js";
function toMeta(parsed?: ClaudeJsonParseResult): AgentMeta | undefined {
if (!parsed?.parsed) return undefined;
const summary = summarizeClaudeMetadata(parsed.parsed);
return summary ? { extra: { summary } } : undefined;
const sessionId =
parsed.parsed &&
typeof parsed.parsed === "object" &&
typeof (parsed.parsed as { session_id?: unknown }).session_id === "string"
? (parsed.parsed as { session_id: string }).session_id
: undefined;
const meta: AgentMeta = {};
if (sessionId) meta.sessionId = sessionId;
if (summary) meta.extra = { summary };
return Object.keys(meta).length ? meta : undefined;
}
export const claudeSpec: AgentSpec = {
@ -60,7 +69,7 @@ export const claudeSpec: AgentSpec = {
const parsed = parseClaudeJson(rawStdout);
const text = parsed?.text ?? rawStdout.trim();
return {
text: text?.trim(),
texts: text ? [text.trim()] : undefined,
meta: toMeta(parsed),
};
},

View File

@ -4,7 +4,7 @@ import type { AgentMeta, AgentParseResult, AgentSpec } from "./types.js";
function parseCodexJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let text: string | undefined;
const texts: string[] = [];
let meta: AgentMeta | undefined;
for (const line of lines) {
@ -21,7 +21,7 @@ function parseCodexJson(raw: string): AgentParseResult {
ev.item?.type === "agent_message" &&
typeof ev.item.text === "string"
) {
text = ev.item.text;
texts.push(ev.item.text);
}
if (
ev.type === "turn.completed" &&
@ -50,7 +50,8 @@ function parseCodexJson(raw: string): AgentParseResult {
}
}
return { text: text?.trim(), meta };
const finalTexts = texts.length ? texts.map((t) => t.trim()) : undefined;
return { texts: finalTexts, meta };
}
export const codexSpec: AgentSpec = {

54
src/agents/gemini.ts Normal file
View File

@ -0,0 +1,54 @@
import path from "node:path";
import type { AgentParseResult, AgentSpec } from "./types.js";
const GEMINI_BIN = "gemini";
export const GEMINI_IDENTITY_PREFIX =
"You are Gemini responding for warelay. Keep WhatsApp replies concise (<1500 chars). If the prompt contains media paths or a Transcript block, use them. If this was a heartbeat probe and nothing needs attention, reply with exactly HEARTBEAT_OK.";
// Gemini CLI currently prints plain text; --output json is flaky across versions, so we
// keep parsing minimal and let MEDIA token stripping happen later in the pipeline.
function parseGeminiOutput(raw: string): AgentParseResult {
const trimmed = raw.trim();
const text = trimmed || undefined;
return {
texts: text ? [text] : undefined,
meta: undefined,
} satisfies AgentParseResult;
}
export const geminiSpec: AgentSpec = {
kind: "gemini",
isInvocation: (argv) =>
argv.length > 0 && path.basename(argv[0]) === GEMINI_BIN,
buildArgs: (ctx) => {
const argv = [...ctx.argv];
const body = argv[ctx.bodyIndex] ?? "";
const beforeBody = argv.slice(0, ctx.bodyIndex);
const afterBody = argv.slice(ctx.bodyIndex + 1);
if (ctx.format) {
const hasOutput =
beforeBody.some(
(p) => p === "--output-format" || p.startsWith("--output-format="),
) ||
afterBody.some(
(p) => p === "--output-format" || p.startsWith("--output-format="),
);
if (!hasOutput) {
beforeBody.push("--output-format", ctx.format);
}
}
const shouldPrependIdentity = !(ctx.sendSystemOnce && ctx.systemSent);
const bodyWithIdentity =
shouldPrependIdentity && body
? [ctx.identityPrefix ?? GEMINI_IDENTITY_PREFIX, body]
.filter(Boolean)
.join("\n\n")
: body;
return [...beforeBody, bodyWithIdentity, ...afterBody];
},
parseOutput: parseGeminiOutput,
};

View File

@ -1,5 +1,6 @@
import { claudeSpec } from "./claude.js";
import { codexSpec } from "./codex.js";
import { geminiSpec } from "./gemini.js";
import { opencodeSpec } from "./opencode.js";
import { piSpec } from "./pi.js";
import type { AgentKind, AgentSpec } from "./types.js";
@ -7,6 +8,7 @@ import type { AgentKind, AgentSpec } from "./types.js";
const specs: Record<AgentKind, AgentSpec> = {
claude: claudeSpec,
codex: codexSpec,
gemini: geminiSpec,
opencode: opencodeSpec,
pi: piSpec,
};
@ -15,4 +17,4 @@ export function getAgentSpec(kind: AgentKind): AgentSpec {
return specs[kind];
}
export { AgentKind, AgentMeta, AgentParseResult } from "./types.js";
export type { AgentKind, AgentMeta, AgentParseResult } from "./types.js";

View File

@ -55,7 +55,7 @@ export const opencodeSpec: AgentSpec = {
const parsed = parseOpencodeJson(rawStdout);
const text = parsed.text ?? rawStdout.trim();
return {
text: text?.trim(),
texts: text ? [text.trim()] : undefined,
meta: toMeta(parsed),
};
},

View File

@ -13,41 +13,63 @@ type PiAssistantMessage = {
function parsePiJson(raw: string): AgentParseResult {
const lines = raw.split(/\n+/).filter((l) => l.trim().startsWith("{"));
let lastMessage: PiAssistantMessage | undefined;
// Collect only completed assistant messages (skip streaming updates/toolcalls).
const texts: string[] = [];
let lastAssistant: PiAssistantMessage | undefined;
let lastPushed: string | undefined;
for (const line of lines) {
try {
const ev = JSON.parse(line) as {
type?: string;
message?: PiAssistantMessage;
};
// Pi emits a stream; we only care about the terminal assistant message_end.
if (ev.type === "message_end" && ev.message?.role === "assistant") {
lastMessage = ev.message;
const isAssistantMessage =
(ev.type === "message" || ev.type === "message_end") &&
ev.message?.role === "assistant" &&
Array.isArray(ev.message.content);
if (!isAssistantMessage) continue;
const msg = ev.message as PiAssistantMessage;
const msgText = msg.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
.trim();
if (msgText && msgText !== lastPushed) {
texts.push(msgText);
lastPushed = msgText;
lastAssistant = msg;
}
} catch {
// ignore
// ignore malformed lines
}
}
const text =
lastMessage?.content
?.filter((c) => c?.type === "text" && typeof c.text === "string")
.map((c) => c.text)
.join("\n")
?.trim() ?? undefined;
const meta: AgentMeta | undefined = lastMessage
? {
model: lastMessage.model,
provider: lastMessage.provider,
stopReason: lastMessage.stopReason,
usage: lastMessage.usage,
}
: undefined;
return { text, meta };
const meta: AgentMeta | undefined =
lastAssistant && texts.length
? {
model: lastAssistant.model,
provider: lastAssistant.provider,
stopReason: lastAssistant.stopReason,
usage: lastAssistant.usage,
}
: undefined;
return { texts, meta };
}
export const piSpec: AgentSpec = {
kind: "pi",
isInvocation: (argv) => argv.length > 0 && path.basename(argv[0]) === "pi",
isInvocation: (argv) => {
if (argv.length === 0) return false;
const base = path.basename(argv[0]).replace(/\.(m?js)$/i, "");
return base === "pi" || base === "tau";
},
buildArgs: (ctx) => {
const argv = [...ctx.argv];
// Non-interactive print + JSON

View File

@ -1,9 +1,10 @@
export type AgentKind = "claude" | "opencode" | "pi" | "codex";
export type AgentKind = "claude" | "opencode" | "pi" | "codex" | "gemini";
export type AgentMeta = {
model?: string;
provider?: string;
stopReason?: string;
sessionId?: string;
usage?: {
input?: number;
output?: number;
@ -15,7 +16,8 @@ export type AgentMeta = {
};
export type AgentParseResult = {
text?: string;
// Plural to support agents that emit multiple assistant turns per prompt.
texts?: string[];
mediaUrls?: string[];
meta?: AgentMeta;
};

View File

@ -0,0 +1,47 @@
import { describe, expect, it } from "vitest";
import { chunkText } from "./chunk.js";
describe("chunkText", () => {
it("keeps multi-line text in one chunk when under limit", () => {
const text = "Line one\n\nLine two\n\nLine three";
const chunks = chunkText(text, 1600);
expect(chunks).toEqual([text]);
});
it("splits only when text exceeds the limit", () => {
const part = "a".repeat(20);
const text = part.repeat(5); // 100 chars
const chunks = chunkText(text, 60);
expect(chunks.length).toBe(2);
expect(chunks[0].length).toBe(60);
expect(chunks[1].length).toBe(40);
expect(chunks.join("")).toBe(text);
});
it("prefers breaking at a newline before the limit", () => {
const text = `paragraph one line\n\nparagraph two starts here and continues`;
const chunks = chunkText(text, 40);
expect(chunks).toEqual([
"paragraph one line",
"paragraph two starts here and continues",
]);
});
it("otherwise breaks at the last whitespace under the limit", () => {
const text =
"This is a message that should break nicely near a word boundary.";
const chunks = chunkText(text, 30);
expect(chunks[0].length).toBeLessThanOrEqual(30);
expect(chunks[1].length).toBeLessThanOrEqual(30);
expect(chunks.join(" ").replace(/\s+/g, " ").trim()).toBe(
text.replace(/\s+/g, " ").trim(),
);
});
it("falls back to a hard break when no whitespace is present", () => {
const text = "Supercalifragilisticexpialidocious"; // 34 chars
const chunks = chunkText(text, 10);
expect(chunks).toEqual(["Supercalif", "ragilistic", "expialidoc", "ious"]);
});
});

51
src/auto-reply/chunk.ts Normal file
View File

@ -0,0 +1,51 @@
// Utilities for splitting outbound text into platform-sized chunks without
// unintentionally breaking on newlines. Using [\s\S] keeps newlines inside
// the chunk so messages are only split when they truly exceed the limit.
export function chunkText(text: string, limit: number): string[] {
if (!text) return [];
if (limit <= 0) return [text];
if (text.length <= limit) return [text];
const chunks: string[] = [];
let remaining = text;
while (remaining.length > limit) {
const window = remaining.slice(0, limit);
// 1) Prefer a newline break inside the window.
let breakIdx = window.lastIndexOf("\n");
// 2) Otherwise prefer the last whitespace (word boundary) inside the window.
if (breakIdx <= 0) {
for (let i = window.length - 1; i >= 0; i--) {
if (/\s/.test(window[i])) {
breakIdx = i;
break;
}
}
}
// 3) Fallback: hard break exactly at the limit.
if (breakIdx <= 0) breakIdx = limit;
const rawChunk = remaining.slice(0, breakIdx);
const chunk = rawChunk.trimEnd();
if (chunk.length > 0) {
chunks.push(chunk);
}
// If we broke on whitespace/newline, skip that separator; for hard breaks keep it.
const brokeOnSeparator =
breakIdx < remaining.length && /\s/.test(remaining[breakIdx]);
const nextStart = Math.min(
remaining.length,
breakIdx + (brokeOnSeparator ? 1 : 0),
);
remaining = remaining.slice(nextStart).trimStart();
}
if (remaining.length) chunks.push(remaining);
return chunks;
}

View File

@ -4,7 +4,7 @@ import { z } from "zod";
// Preferred binary name for Claude CLI invocations.
export const CLAUDE_BIN = "claude";
export const CLAUDE_IDENTITY_PREFIX =
"You are Clawd (Claude) running on the user's Mac via warelay. Your scratchpad is /Users/steipete/clawd; this is your folder and you can add what you like in markdown files and/or images. You don't need to be concise, but WhatsApp replies must stay under ~1500 characters. Media you can send: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK.";
"You are Clawd (Claude) running on the user's Mac via warelay. Keep WhatsApp replies under ~1500 characters. Your scratchpad is ~/clawd; this is your folder and you can add what you like in markdown files and/or images. You can send media by including MEDIA:/path/to/file.jpg on its own line (no spaces in path). Media limits: images ≤6MB, audio/video ≤16MB, documents ≤100MB. The prompt may include a media path and an optional Transcript: section—use them when present. If a prompt is a heartbeat poll and nothing needs attention, reply with exactly HEARTBEAT_OK and nothing else; for any alert, do not include HEARTBEAT_OK.";
function extractClaudeText(payload: unknown): string | undefined {
// Best-effort walker to find the primary text field in Claude JSON outputs.

View File

@ -66,7 +66,7 @@ describe("runCommandReply", () => {
it("injects claude flags and identity prefix", async () => {
const captures: ReplyPayload[] = [];
const runner = makeRunner({ stdout: "ok" }, captures);
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
@ -83,6 +83,7 @@ describe("runCommandReply", () => {
enqueue: enqueueImmediate,
});
const payload = payloads?.[0];
expect(payload?.text).toBe("ok");
const finalArgv = captures[0].argv as string[];
expect(finalArgv).toContain("--output-format");
@ -192,7 +193,7 @@ describe("runCommandReply", () => {
const runner = vi.fn(async () => {
throw { stdout: "partial output here", killed: true, signal: "SIGKILL" };
});
const { payload, meta } = await runCommandReply({
const { payloads, meta } = await runCommandReply({
reply: {
mode: "command",
command: ["echo", "hi"],
@ -208,6 +209,7 @@ describe("runCommandReply", () => {
commandRunner: runner,
enqueue: enqueueImmediate,
});
const payload = payloads?.[0];
expect(payload?.text).toContain("Command timed out after 1s");
expect(payload?.text).toContain("partial output");
expect(meta.killed).toBe(true);
@ -217,7 +219,7 @@ describe("runCommandReply", () => {
const runner = vi.fn(async () => {
throw { stdout: "", killed: true, signal: "SIGKILL" };
});
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["echo", "hi"],
@ -234,6 +236,7 @@ describe("runCommandReply", () => {
commandRunner: runner,
enqueue: enqueueImmediate,
});
const payload = payloads?.[0];
expect(payload?.text).toContain("(cwd: /tmp/work)");
});
@ -244,7 +247,7 @@ describe("runCommandReply", () => {
const runner = makeRunner({
stdout: `hi\nMEDIA:${tmp}\nMEDIA:https://example.com/img.jpg`,
});
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["echo", "hi"],
@ -261,6 +264,7 @@ describe("runCommandReply", () => {
commandRunner: runner,
enqueue: enqueueImmediate,
});
const payload = payloads?.[0];
expect(payload?.mediaUrls).toEqual(["https://example.com/img.jpg"]);
await fs.unlink(tmp);
});
@ -318,7 +322,7 @@ describe("runCommandReply", () => {
const runner = makeRunner({
stdout: '{"result":"","duration_ms":50,"total_cost_usd":0.001}',
});
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
@ -335,6 +339,7 @@ describe("runCommandReply", () => {
enqueue: enqueueImmediate,
});
// Should NOT contain raw JSON - empty result should produce fallback message
const payload = payloads?.[0];
expect(payload?.text).not.toContain('{"result"');
expect(payload?.text).toContain("command produced no output");
});
@ -343,7 +348,7 @@ describe("runCommandReply", () => {
const runner = makeRunner({
stdout: '{"text":"","duration_ms":50}',
});
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
@ -360,6 +365,7 @@ describe("runCommandReply", () => {
enqueue: enqueueImmediate,
});
// Empty text should produce fallback message, not raw JSON
const payload = payloads?.[0];
expect(payload?.text).not.toContain('{"text"');
expect(payload?.text).toContain("command produced no output");
});
@ -368,7 +374,7 @@ describe("runCommandReply", () => {
const runner = makeRunner({
stdout: '{"result":"hello world","duration_ms":50}',
});
const { payload } = await runCommandReply({
const { payloads } = await runCommandReply({
reply: {
mode: "command",
command: ["claude", "{{Body}}"],
@ -384,6 +390,7 @@ describe("runCommandReply", () => {
commandRunner: runner,
enqueue: enqueueImmediate,
});
const payload = payloads?.[0];
expect(payload?.text).toBe("hello world");
});
});

View File

@ -6,9 +6,11 @@ import type { AgentMeta } from "../agents/types.js";
import type { WarelayConfig } from "../config/config.js";
import { isVerbose, logVerbose } from "../globals.js";
import { logError } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { splitMediaFromOutput } from "../media/parse.js";
import { enqueueCommand } from "../process/command-queue.js";
import type { runCommandWithTimeout } from "../process/exec.js";
import { runPiRpc } from "../process/tau-rpc.js";
import { applyTemplate, type TemplateContext } from "./templating.js";
import type { ReplyPayload } from "./types.js";
@ -18,6 +20,8 @@ type CommandReplyConfig = NonNullable<WarelayConfig["inbound"]>["reply"] & {
type EnqueueRunner = typeof enqueueCommand;
type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high";
type CommandReplyParams = {
reply: CommandReplyConfig;
templatingCtx: TemplateContext;
@ -29,6 +33,7 @@ type CommandReplyParams = {
timeoutSeconds: number;
commandRunner: typeof runCommandWithTimeout;
enqueue?: EnqueueRunner;
thinkLevel?: ThinkLevel;
};
export type CommandReplyMeta = {
@ -42,7 +47,7 @@ export type CommandReplyMeta = {
};
export type CommandReplyResult = {
payload?: ReplyPayload;
payloads?: ReplyPayload[];
meta: CommandReplyMeta;
};
@ -96,9 +101,34 @@ export function summarizeClaudeMetadata(payload: unknown): string | undefined {
return parts.length ? parts.join(", ") : undefined;
}
function appendThinkingCue(body: string, level?: ThinkLevel): string {
if (!level || level === "off") return body;
const cue = (() => {
switch (level) {
case "high":
return "ultrathink";
case "medium":
return "think harder";
case "low":
return "think hard";
case "minimal":
return "think";
default:
return "";
}
})();
return [body.trim(), cue].filter(Boolean).join(" ");
}
export async function runCommandReply(
params: CommandReplyParams,
): Promise<CommandReplyResult> {
const logger = getChildLogger({ module: "command-reply" });
const verboseLog = (msg: string) => {
logger.debug(msg);
if (isVerbose()) logVerbose(msg);
};
const {
reply,
templatingCtx,
@ -110,6 +140,7 @@ export async function runCommandReply(
timeoutSeconds,
commandRunner,
enqueue = enqueueCommand,
thinkLevel,
} = params;
if (!reply.command?.length) {
@ -133,14 +164,25 @@ export async function runCommandReply(
// Session args prepared (templated) and injected generically
if (reply.session) {
const defaultNew =
agentCfg.kind === "claude"
? ["--session-id", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultResume =
agentCfg.kind === "claude"
? ["--resume", "{{SessionId}}"]
: ["--session", "{{SessionId}}"];
const defaultSessionArgs = (() => {
switch (agentCfg.kind) {
case "claude":
return {
newArgs: ["--session-id", "{{SessionId}}"],
resumeArgs: ["--resume", "{{SessionId}}"],
};
case "gemini":
// Gemini CLI supports --resume <id>; starting a new session needs no flag.
return { newArgs: [], resumeArgs: ["--resume", "{{SessionId}}"] };
default:
return {
newArgs: ["--session", "{{SessionId}}"],
resumeArgs: ["--session", "{{SessionId}}"],
};
}
})();
const defaultNew = defaultSessionArgs.newArgs;
const defaultResume = defaultSessionArgs.resumeArgs;
const sessionArgList = (
isNewSession
? (reply.session.sessionArgNew ?? defaultNew)
@ -159,6 +201,23 @@ export async function runCommandReply(
}
}
if (thinkLevel && thinkLevel !== "off") {
if (agentKind === "pi") {
const hasThinkingFlag = argv.some(
(p, i) =>
p === "--thinking" ||
(i > 0 && argv[i - 1] === "--thinking") ||
p.startsWith("--thinking="),
);
if (!hasThinkingFlag) {
argv.splice(bodyIndex, 0, "--thinking", thinkLevel);
bodyIndex += 2;
}
} else if (argv[bodyIndex]) {
argv[bodyIndex] = appendThinkingCue(argv[bodyIndex] ?? "", thinkLevel);
}
}
const shouldApplyAgent = agent.isInvocation(argv);
const finalArgv = shouldApplyAgent
? agent.buildArgs({
@ -176,55 +235,115 @@ export async function runCommandReply(
logVerbose(
`Running command auto-reply: ${finalArgv.join(" ")}${reply.cwd ? ` (cwd: ${reply.cwd})` : ""}`,
);
logger.info(
{
agent: agentKind,
sessionId: templatingCtx.SessionId,
newSession: isNewSession,
cwd: reply.cwd,
command: finalArgv.slice(0, -1), // omit body to reduce noise
},
"command auto-reply start",
);
const started = Date.now();
let queuedMs: number | undefined;
let queuedAhead: number | undefined;
try {
const { stdout, stderr, code, signal, killed } = await enqueue(
() => commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd }),
{
onWait: (waitMs, ahead) => {
queuedMs = waitMs;
queuedAhead = ahead;
if (isVerbose()) {
logVerbose(
`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`,
);
const run = async () => {
// Prefer long-lived tau RPC for pi agent to avoid cold starts.
if (agentKind === "pi") {
const promptIndex = finalArgv.length - 1;
const body = finalArgv[promptIndex] ?? "";
// Build rpc args without the prompt body; force --mode rpc.
const rpcArgv = (() => {
const copy = [...finalArgv];
copy.splice(promptIndex, 1);
const modeIdx = copy.indexOf("--mode");
if (modeIdx >= 0 && copy[modeIdx + 1]) {
copy.splice(modeIdx, 2, "--mode", "rpc");
} else if (!copy.includes("--mode")) {
copy.splice(copy.length - 1, 0, "--mode", "rpc");
}
},
return copy;
})();
return await runPiRpc({
argv: rpcArgv,
cwd: reply.cwd,
prompt: body,
timeoutMs,
});
}
return await commandRunner(finalArgv, { timeoutMs, cwd: reply.cwd });
};
const { stdout, stderr, code, signal, killed } = await enqueue(run, {
onWait: (waitMs, ahead) => {
queuedMs = waitMs;
queuedAhead = ahead;
if (isVerbose()) {
logVerbose(
`Command auto-reply queued for ${waitMs}ms (${queuedAhead} ahead)`,
);
}
},
);
});
const rawStdout = stdout.trim();
let mediaFromCommand: string[] | undefined;
let trimmed = rawStdout;
const trimmed = rawStdout;
if (stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${stderr.trim()}`);
}
const parsed = trimmed ? agent.parseOutput(trimmed) : undefined;
// Treat empty string as "no content" so we can fall back to the friendly
// "(command produced no output)" message instead of echoing raw JSON.
if (parsed && parsed.text !== undefined) {
trimmed = parsed.text.trim();
const parserProvided = !!parsed;
// Collect one message per assistant text from parseOutput (tau RPC can emit many).
const parsedTexts =
parsed?.texts?.map((t) => t.trim()).filter(Boolean) ?? [];
type ReplyItem = { text: string; media?: string[] };
const replyItems: ReplyItem[] = [];
for (const t of parsedTexts) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(t);
replyItems.push({
text: cleanedText,
media: mediaFound?.length ? mediaFound : undefined,
});
}
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed);
trimmed = cleanedText;
if (mediaFound?.length) {
mediaFromCommand = mediaFound;
if (isVerbose()) logVerbose(`MEDIA token extracted: ${mediaFound}`);
} else if (isVerbose()) {
logVerbose("No MEDIA token extracted from final text");
// If parser gave nothing, fall back to raw stdout as a single message.
if (replyItems.length === 0 && trimmed && !parserProvided) {
const { text: cleanedText, mediaUrls: mediaFound } =
splitMediaFromOutput(trimmed);
if (cleanedText || mediaFound?.length) {
replyItems.push({
text: cleanedText,
media: mediaFound?.length ? mediaFound : undefined,
});
}
}
if (!trimmed && !mediaFromCommand) {
// No content at all → fallback notice.
if (replyItems.length === 0) {
const meta = parsed?.meta?.extra?.summary ?? undefined;
trimmed = `(command produced no output${meta ? `; ${meta}` : ""})`;
logVerbose("No text/media produced; injecting fallback notice to user");
replyItems.push({
text: `(command produced no output${meta ? `; ${meta}` : ""})`,
});
verboseLog("No text/media produced; injecting fallback notice to user");
}
logVerbose(`Command auto-reply stdout (trimmed): ${trimmed || "<empty>"}`);
logVerbose(`Command auto-reply finished in ${Date.now() - started}ms`);
verboseLog(
`Command auto-reply stdout produced ${replyItems.length} message(s)`,
);
const elapsed = Date.now() - started;
verboseLog(`Command auto-reply finished in ${elapsed}ms`);
logger.info(
{ durationMs: elapsed, agent: agentKind, cwd: reply.cwd },
"command auto-reply finished",
);
if ((code ?? 0) !== 0) {
console.error(
`Command auto-reply exited with code ${code ?? "unknown"} (signal: ${signal ?? "none"})`,
@ -235,7 +354,7 @@ export async function runCommandReply(
: "";
const errorText = `⚠️ Command exited with code ${code ?? "unknown"}${signal ? ` (${signal})` : ""}${partialOut}`;
return {
payload: { text: errorText },
payloads: [{ text: errorText }],
meta: {
durationMs: Date.now() - started,
queuedMs,
@ -253,7 +372,7 @@ export async function runCommandReply(
);
const errorText = `⚠️ Command was killed before completion (exit code ${code ?? "unknown"})`;
return {
payload: { text: errorText },
payloads: [{ text: errorText }],
meta: {
durationMs: Date.now() - started,
queuedMs,
@ -265,43 +384,6 @@ export async function runCommandReply(
},
};
}
let mediaUrls =
mediaFromCommand ?? (reply.mediaUrl ? [reply.mediaUrl] : undefined);
// If mediaMaxMb is set, skip local media paths larger than the cap.
if (mediaUrls?.length && reply.mediaMaxMb) {
const maxBytes = reply.mediaMaxMb * 1024 * 1024;
const filtered: string[] = [];
for (const url of mediaUrls) {
if (/^https?:\/\//i.test(url)) {
filtered.push(url);
continue;
}
const abs = path.isAbsolute(url) ? url : path.resolve(url);
try {
const stats = await fs.stat(abs);
if (stats.size <= maxBytes) {
filtered.push(url);
} else if (isVerbose()) {
logVerbose(
`Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`,
);
}
} catch {
filtered.push(url);
}
}
mediaUrls = filtered;
}
const payload =
trimmed || mediaUrls?.length
? {
text: trimmed || undefined,
mediaUrl: mediaUrls?.[0],
mediaUrls,
}
: undefined;
const meta: CommandReplyMeta = {
durationMs: Date.now() - started,
queuedMs,
@ -311,17 +393,67 @@ export async function runCommandReply(
killed,
agentMeta: parsed?.meta,
};
if (isVerbose()) {
logVerbose(`Command auto-reply meta: ${JSON.stringify(meta)}`);
const payloads: ReplyPayload[] = [];
// Build each reply item sequentially (delivery handled by caller).
for (const item of replyItems) {
let mediaUrls =
item.media ??
mediaFromCommand ??
(reply.mediaUrl ? [reply.mediaUrl] : undefined);
// If mediaMaxMb is set, skip local media paths larger than the cap.
if (mediaUrls?.length && reply.mediaMaxMb) {
const maxBytes = reply.mediaMaxMb * 1024 * 1024;
const filtered: string[] = [];
for (const url of mediaUrls) {
if (/^https?:\/\//i.test(url)) {
filtered.push(url);
continue;
}
const abs = path.isAbsolute(url) ? url : path.resolve(url);
try {
const stats = await fs.stat(abs);
if (stats.size <= maxBytes) {
filtered.push(url);
} else if (isVerbose()) {
logVerbose(
`Skipping media ${url} (${(stats.size / (1024 * 1024)).toFixed(2)}MB) over cap ${reply.mediaMaxMb}MB`,
);
}
} catch {
filtered.push(url);
}
}
mediaUrls = filtered;
}
const payload =
item.text || mediaUrls?.length
? {
text: item.text || undefined,
mediaUrl: mediaUrls?.[0],
mediaUrls,
}
: undefined;
if (payload) payloads.push(payload);
}
return { payload, meta };
verboseLog(`Command auto-reply meta: ${JSON.stringify(meta)}`);
return { payloads, meta };
} catch (err) {
const elapsed = Date.now() - started;
logger.info(
{ durationMs: elapsed, agent: agentKind, cwd: reply.cwd },
"command auto-reply failed",
);
const anyErr = err as { killed?: boolean; signal?: string };
const timeoutHit = anyErr.killed === true || anyErr.signal === "SIGKILL";
const errorObj = err as { stdout?: string; stderr?: string };
if (errorObj.stderr?.trim()) {
logVerbose(`Command auto-reply stderr: ${errorObj.stderr.trim()}`);
verboseLog(`Command auto-reply stderr: ${errorObj.stderr.trim()}`);
}
if (timeoutHit) {
console.error(
@ -339,7 +471,7 @@ export async function runCommandReply(
? `${baseMsg}\n\nPartial output before timeout:\n${partialSnippet}`
: baseMsg;
return {
payload: { text },
payloads: [{ text }],
meta: {
durationMs: elapsed,
queuedMs,
@ -355,7 +487,7 @@ export async function runCommandReply(
const errMsg = err instanceof Error ? err.message : String(err);
const errorText = `⚠️ Command failed: ${errMsg}`;
return {
payload: { text: errorText },
payloads: [{ text: errorText }],
meta: {
durationMs: elapsed,
queuedMs,

View File

@ -1,261 +0,0 @@
import { describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import type { SpawnResult } from "../process/exec.js";
import {
buildHeartbeatPrompt,
runHeartbeatPreHook,
} from "./heartbeat-prehook.js";
describe("buildHeartbeatPrompt", () => {
it("returns base prompt when no context", () => {
expect(buildHeartbeatPrompt("HEARTBEAT ultrathink")).toBe(
"HEARTBEAT ultrathink",
);
expect(buildHeartbeatPrompt("HEARTBEAT ultrathink", "")).toBe(
"HEARTBEAT ultrathink",
);
expect(buildHeartbeatPrompt("HEARTBEAT ultrathink", " ")).toBe(
"HEARTBEAT ultrathink",
);
});
it("appends context when provided", () => {
const result = buildHeartbeatPrompt(
"HEARTBEAT ultrathink",
"You have 3 unread emails",
);
expect(result).toBe(
"HEARTBEAT ultrathink\n\n---\nContext from pre-hook:\nYou have 3 unread emails",
);
});
it("trims context whitespace", () => {
const result = buildHeartbeatPrompt("HEARTBEAT", " context with spaces ");
expect(result).toContain("context with spaces");
expect(result).not.toContain(" context");
});
});
describe("runHeartbeatPreHook", () => {
it("returns empty result when no pre-hook configured", async () => {
const cfg: WarelayConfig = {};
const result = await runHeartbeatPreHook(cfg);
expect(result.durationMs).toBe(0);
expect(result.context).toBeUndefined();
expect(result.error).toBeUndefined();
});
it("returns empty result when pre-hook is empty array", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: [],
},
},
},
};
const result = await runHeartbeatPreHook(cfg);
expect(result.durationMs).toBe(0);
expect(result.context).toBeUndefined();
});
it("returns stdout as context on success", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["echo", "email summary"],
},
},
},
};
const mockRunner = vi.fn().mockResolvedValue({
stdout: "email summary\n",
stderr: "",
code: 0,
signal: null,
killed: false,
} satisfies SpawnResult);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.context).toBe("email summary");
expect(result.error).toBeUndefined();
expect(mockRunner).toHaveBeenCalledWith(["echo", "email summary"], {
timeoutMs: 30000,
});
});
it("returns error on non-zero exit", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["failing-script"],
},
},
},
};
const mockRunner = vi.fn().mockResolvedValue({
stdout: "",
stderr: "error output",
code: 1,
signal: null,
killed: false,
} satisfies SpawnResult);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.context).toBeUndefined();
expect(result.error).toContain("exited with code 1");
});
it("handles timeout gracefully", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["slow-script"],
heartbeatPreHookTimeoutSeconds: 5,
},
},
},
};
const mockRunner = vi.fn().mockResolvedValue({
stdout: "",
stderr: "",
code: null,
signal: "SIGKILL",
killed: true,
} satisfies SpawnResult);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.timedOut).toBe(true);
expect(result.error).toContain("timed out");
expect(result.context).toBeUndefined();
});
it("uses custom timeout from config", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["script"],
heartbeatPreHookTimeoutSeconds: 60,
},
},
},
};
const mockRunner = vi.fn().mockResolvedValue({
stdout: "ok",
stderr: "",
code: 0,
signal: null,
killed: false,
} satisfies SpawnResult);
await runHeartbeatPreHook(cfg, mockRunner);
expect(mockRunner).toHaveBeenCalledWith(["script"], { timeoutMs: 60000 });
});
it("returns empty context for whitespace-only stdout", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["script"],
},
},
},
};
const mockRunner = vi.fn().mockResolvedValue({
stdout: " \n\n ",
stderr: "",
code: 0,
signal: null,
killed: false,
} satisfies SpawnResult);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.context).toBeUndefined();
expect(result.error).toBeUndefined();
});
it("handles thrown error from command runner", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["script"],
},
},
},
};
const mockRunner = vi.fn().mockRejectedValue(new Error("spawn ENOENT"));
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.error).toBe("spawn ENOENT");
expect(result.context).toBeUndefined();
});
it("handles thrown timeout error (killed property)", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["script"],
},
},
},
};
const timeoutError = new Error("Command timed out");
(timeoutError as unknown as { killed: boolean }).killed = true;
const mockRunner = vi.fn().mockRejectedValue(timeoutError);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.timedOut).toBe(true);
expect(result.error).toContain("timed out");
});
it("caps large stdout to max size", async () => {
const cfg: WarelayConfig = {
inbound: {
reply: {
mode: "command",
command: ["echo"],
session: {
heartbeatPreHook: ["script"],
},
},
},
};
const largeOutput = "x".repeat(10000);
const mockRunner = vi.fn().mockResolvedValue({
stdout: largeOutput,
stderr: "",
code: 0,
signal: null,
killed: false,
} satisfies SpawnResult);
const result = await runHeartbeatPreHook(cfg, mockRunner);
expect(result.context).toBeDefined();
expect(result.context?.length).toBeLessThan(largeOutput.length);
expect(result.context).toContain("...[truncated]");
});
});

View File

@ -1,118 +0,0 @@
import type { WarelayConfig } from "../config/config.js";
import { danger, logVerbose } from "../globals.js";
import { logDebug, logWarn } from "../logger.js";
import { runCommandWithTimeout, type SpawnResult } from "../process/exec.js";
export type PreHookResult = {
context?: string;
durationMs: number;
error?: string;
timedOut?: boolean;
};
const DEFAULT_PREHOOK_TIMEOUT_SECONDS = 30;
const MAX_CONTEXT_CHARS = 8000;
export function buildHeartbeatPrompt(
basePrompt: string,
preHookContext?: string,
): string {
if (!preHookContext?.trim()) {
return basePrompt;
}
return `${basePrompt}\n\n---\nContext from pre-hook:\n${preHookContext.trim()}`;
}
function capContextSize(stdout: string): string {
const trimmed = stdout.trim();
if (trimmed.length <= MAX_CONTEXT_CHARS) {
return trimmed;
}
return `${trimmed.slice(0, MAX_CONTEXT_CHARS)}...[truncated]`;
}
export async function runHeartbeatPreHook(
cfg: WarelayConfig,
commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout,
): Promise<PreHookResult> {
const sessionCfg = cfg.inbound?.reply?.session;
const preHookCommand = sessionCfg?.heartbeatPreHook;
if (!preHookCommand?.length) {
return { durationMs: 0 };
}
const timeoutSeconds =
sessionCfg?.heartbeatPreHookTimeoutSeconds ??
DEFAULT_PREHOOK_TIMEOUT_SECONDS;
const timeoutMs = timeoutSeconds * 1000;
const started = Date.now();
logVerbose(`Running heartbeat pre-hook: ${preHookCommand.join(" ")}`);
try {
const result: SpawnResult = await commandRunner(preHookCommand, {
timeoutMs,
});
const durationMs = Date.now() - started;
if (result.killed || result.signal === "SIGKILL") {
const stderrPreview = result.stderr?.trim().slice(0, 200) || "(empty)";
logWarn(`Heartbeat pre-hook timed out after ${timeoutSeconds}s`);
logDebug(`Pre-hook stderr preview: ${stderrPreview}`);
return {
durationMs,
timedOut: true,
error: `Pre-hook timed out after ${timeoutSeconds}s`,
};
}
if ((result.code ?? 0) !== 0) {
const stderrPreview = result.stderr?.trim().slice(0, 200) || "(empty)";
const errorMsg = `Pre-hook exited with code ${result.code}`;
logWarn(errorMsg);
logDebug(`Pre-hook stderr preview: ${stderrPreview}`);
return {
durationMs,
error: errorMsg,
};
}
const stdout = result.stdout?.trim();
logVerbose(
`Pre-hook completed in ${durationMs}ms, output length: ${stdout?.length ?? 0}`,
);
if (stdout) {
logDebug(
`Pre-hook output: ${stdout.slice(0, 200)}${stdout.length > 200 ? "..." : ""}`,
);
}
const cappedContext = stdout ? capContextSize(stdout) : undefined;
return {
context: cappedContext || undefined,
durationMs,
};
} catch (err) {
const durationMs = Date.now() - started;
const anyErr = err as { killed?: boolean; signal?: string };
if (anyErr.killed || anyErr.signal === "SIGKILL") {
return {
durationMs,
timedOut: true,
error: `Pre-hook timed out after ${timeoutSeconds}s`,
};
}
const errorMsg = err instanceof Error ? err.message : String(err);
console.error(danger(`Heartbeat pre-hook failed: ${errorMsg}`));
return {
durationMs,
error: errorMsg,
};
}
}

View File

@ -0,0 +1,48 @@
import { describe, expect, it, vi } from "vitest";
import type { WarelayConfig } from "../config/config.js";
import { autoReplyIfConfigured } from "./reply.js";
describe("autoReplyIfConfigured chunking", () => {
it("sends a single Twilio message for multi-line text under limit", async () => {
const body = [
"Oh! Hi Peter! 🦞",
"",
"Sorry, I got a bit trigger-happy with the heartbeat response there. What's up?",
"",
"Everything working on your end?",
].join("\n");
const config: WarelayConfig = {
inbound: {
reply: {
mode: "text",
text: body,
},
},
};
const create = vi.fn().mockResolvedValue({});
const client = { messages: { create } } as unknown as Parameters<
typeof autoReplyIfConfigured
>[0];
const message = {
body: "ping",
from: "+15551234567",
to: "+15557654321",
sid: "SM123",
} as Parameters<typeof autoReplyIfConfigured>[1];
await autoReplyIfConfigured(client, message, config);
expect(create).toHaveBeenCalledTimes(1);
expect(create).toHaveBeenCalledWith(
expect.objectContaining({
body,
from: message.to,
to: message.from,
}),
);
});
});

View File

@ -1,5 +1,4 @@
import crypto from "node:crypto";
import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js";
import { loadConfig, type WarelayConfig } from "../config/config.js";
import {
@ -8,6 +7,7 @@ import {
deriveSessionKey,
loadSessionStore,
resolveStorePath,
type SessionEntry,
saveSessionStore,
} from "../config/sessions.js";
import { info, isVerbose, logVerbose } from "../globals.js";
@ -17,6 +17,7 @@ import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { splitMessage } from "../twilio/send.js";
import type { TwilioRequester } from "../twilio/types.js";
import { sendTypingIndicator } from "../twilio/typing.js";
import { chunkText } from "./chunk.js";
import { runCommandReply } from "./command-reply.js";
import {
applyTemplate,
@ -28,12 +29,54 @@ import type { GetReplyOptions, ReplyPayload } from "./types.js";
export type { GetReplyOptions, ReplyPayload } from "./types.js";
const TWILIO_TEXT_LIMIT = 1600;
const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit"]);
const ABORT_MEMORY = new Map<string, boolean>();
type ThinkLevel = "off" | "minimal" | "low" | "medium" | "high";
function normalizeThinkLevel(raw?: string | null): ThinkLevel | undefined {
if (!raw) return undefined;
const key = raw.toLowerCase();
if (["off"].includes(key)) return "off";
if (["min", "minimal"].includes(key)) return "minimal";
if (["low", "thinkhard", "think-hard", "think_hard"].includes(key))
return "low";
if (["med", "medium", "thinkharder", "think-harder", "harder"].includes(key))
return "medium";
if (
["high", "ultra", "ultrathink", "think-hard", "thinkhardest"].includes(key)
)
return "high";
if (["think"].includes(key)) return "minimal";
return undefined;
}
function extractThinkDirective(body?: string): {
cleaned: string;
thinkLevel?: ThinkLevel;
} {
if (!body) return { cleaned: "" };
const re = /\/think:([a-zA-Z-]+)/i;
const match = body.match(re);
const thinkLevel = normalizeThinkLevel(match?.[1]);
const cleaned = match ? body.replace(match[0], "").trim() : body;
return { cleaned, thinkLevel };
}
function isAbortTrigger(text?: string): boolean {
if (!text) return false;
const normalized = text.trim().toLowerCase();
return ABORT_TRIGGERS.has(normalized);
}
export async function getReplyFromConfig(
ctx: MsgContext,
opts?: GetReplyOptions,
configOverride?: WarelayConfig,
commandRunner: typeof runCommandWithTimeout = runCommandWithTimeout,
): Promise<ReplyPayload | undefined> {
): Promise<ReplyPayload | ReplyPayload[] | undefined> {
// Choose reply from config: static text or external command stdout.
const cfg = configOverride ?? loadConfig();
const reply = cfg.inbound?.reply;
@ -110,11 +153,13 @@ export async function getReplyFromConfig(
const storePath = resolveStorePath(sessionCfg?.store);
let sessionStore: ReturnType<typeof loadSessionStore> | undefined;
let sessionKey: string | undefined;
let sessionEntry: SessionEntry | undefined;
let sessionId: string | undefined;
let isNewSession = false;
let bodyStripped: string | undefined;
let systemSent = false;
let abortedLastRun = false;
if (sessionCfg) {
const trimmedBody = (ctx.Body ?? "").trim();
@ -142,13 +187,21 @@ export async function getReplyFromConfig(
if (!isNewSession && freshEntry) {
sessionId = entry.sessionId;
systemSent = entry.systemSent ?? false;
abortedLastRun = entry.abortedLastRun ?? false;
} else {
sessionId = crypto.randomUUID();
isNewSession = true;
systemSent = false;
abortedLastRun = false;
}
sessionStore[sessionKey] = { sessionId, updatedAt: Date.now(), systemSent };
sessionEntry = {
sessionId,
updatedAt: Date.now(),
systemSent,
abortedLastRun,
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
}
@ -159,11 +212,22 @@ export async function getReplyFromConfig(
IsNewSession: isNewSession ? "true" : "false",
};
const { cleaned: thinkCleaned, thinkLevel } = extractThinkDirective(
sessionCtx.BodyStripped ?? sessionCtx.Body ?? "",
);
sessionCtx.Body = thinkCleaned;
sessionCtx.BodyStripped = thinkCleaned;
// Optional allowlist by origin number (E.164 without whatsapp: prefix)
const allowFrom = cfg.inbound?.allowFrom;
const from = (ctx.From ?? "").replace(/^whatsapp:/, "");
const to = (ctx.To ?? "").replace(/^whatsapp:/, "");
const isSamePhone = from && to && from === to;
const abortKey = sessionKey ?? (from || undefined) ?? (to || undefined);
if (!sessionEntry && abortKey) {
abortedLastRun = ABORT_MEMORY.get(abortKey) ?? false;
}
// Same-phone mode (self-messaging) is always allowed
if (isSamePhone) {
@ -179,6 +243,23 @@ export async function getReplyFromConfig(
}
}
const abortRequested =
reply?.mode === "command" &&
isAbortTrigger((sessionCtx.BodyStripped ?? sessionCtx.Body ?? "").trim());
if (abortRequested) {
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = true;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
} else if (abortKey) {
ABORT_MEMORY.set(abortKey, true);
}
cleanupTyping();
return { text: "Agent was aborted." };
}
await startTypingLoop();
// Optional prefix injected before Body for templating/command prompts.
@ -192,16 +273,30 @@ export async function getReplyFromConfig(
? applyTemplate(reply.bodyPrefix, sessionCtx)
: "";
const baseBody = sessionCtx.BodyStripped ?? sessionCtx.Body ?? "";
const prefixedBodyBase = (() => {
let body = baseBody;
if (!sendSystemOnce || isFirstTurnInSession) {
body = bodyPrefix ? `${bodyPrefix}${body}` : body;
const abortedHint =
reply?.mode === "command" && abortedLastRun
? "Note: The previous agent run was aborted by the user. Resume carefully or ask for clarification."
: "";
let prefixedBodyBase = baseBody;
if (!sendSystemOnce || isFirstTurnInSession) {
prefixedBodyBase = bodyPrefix
? `${bodyPrefix}${prefixedBodyBase}`
: prefixedBodyBase;
}
if (sessionIntro) {
prefixedBodyBase = `${sessionIntro}\n\n${prefixedBodyBase}`;
}
if (abortedHint) {
prefixedBodyBase = `${abortedHint}\n\n${prefixedBodyBase}`;
if (sessionEntry && sessionStore && sessionKey) {
sessionEntry.abortedLastRun = false;
sessionEntry.updatedAt = Date.now();
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
} else if (abortKey) {
ABORT_MEMORY.set(abortKey, false);
}
if (sessionIntro) {
body = `${sessionIntro}\n\n${body}`;
}
return body;
})();
}
if (
sessionCfg &&
sendSystemOnce &&
@ -209,12 +304,18 @@ export async function getReplyFromConfig(
sessionStore &&
sessionKey
) {
sessionStore[sessionKey] = {
...(sessionStore[sessionKey] ?? {}),
sessionId: sessionId ?? crypto.randomUUID(),
const current = sessionEntry ??
sessionStore[sessionKey] ?? {
sessionId: sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
};
sessionEntry = {
...current,
sessionId: sessionId ?? current.sessionId ?? crypto.randomUUID(),
updatedAt: Date.now(),
systemSent: true,
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
systemSent = true;
}
@ -261,15 +362,29 @@ export async function getReplyFromConfig(
return result;
}
if (reply && reply.mode === "command" && reply.command?.length) {
const isHeartbeat = opts?.isHeartbeat === true;
if (reply && reply.mode === "command") {
const heartbeatCommand = isHeartbeat
? (reply as { heartbeatCommand?: string[] }).heartbeatCommand
: undefined;
const commandArgs = heartbeatCommand?.length
? heartbeatCommand
: reply.command;
if (!commandArgs?.length) {
cleanupTyping();
return undefined;
}
await onReplyStart();
const commandReply = {
...reply,
command: reply.command,
command: commandArgs,
mode: "command" as const,
};
try {
const { payload, meta } = await runCommandReply({
const runResult = await runCommandReply({
reply: commandReply,
templatingCtx,
sendSystemOnce,
@ -279,11 +394,47 @@ export async function getReplyFromConfig(
timeoutMs,
timeoutSeconds,
commandRunner,
thinkLevel,
});
const payloadArray = runResult.payloads ?? [];
const meta = runResult.meta;
const normalizedPayloads =
payloadArray.length === 1 ? payloadArray[0] : payloadArray;
if (
!normalizedPayloads ||
(Array.isArray(normalizedPayloads) && normalizedPayloads.length === 0)
) {
return undefined;
}
if (sessionCfg && sessionStore && sessionKey) {
const returnedSessionId = meta.agentMeta?.sessionId;
if (returnedSessionId && returnedSessionId !== sessionId) {
const entry = sessionEntry ??
sessionStore[sessionKey] ?? {
sessionId: returnedSessionId,
updatedAt: Date.now(),
systemSent,
abortedLastRun,
};
sessionEntry = {
...entry,
sessionId: returnedSessionId,
updatedAt: Date.now(),
};
sessionStore[sessionKey] = sessionEntry;
await saveSessionStore(storePath, sessionStore);
sessionId = returnedSessionId;
if (isVerbose()) {
logVerbose(
`Session id updated from agent meta: ${returnedSessionId} (store: ${storePath})`,
);
}
}
}
if (meta.agentMeta && isVerbose()) {
logVerbose(`Agent meta: ${JSON.stringify(meta.agentMeta)}`);
}
return payload;
return normalizedPayloads;
} finally {
cleanupTyping();
}
@ -340,17 +491,18 @@ export async function autoReplyIfConfigured(
const replyResult = await getReplyFromConfig(
ctx,
{
onReplyStart: () => sendTypingIndicator(client, runtime, message.sid),
onReplyStart: async () => {
await sendTypingIndicator(client, runtime, message.sid);
},
},
cfg,
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
)
return;
const replies = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
if (replies.length === 0) return;
const replyFrom = message.to;
const replyTo = message.from;
@ -363,23 +515,7 @@ export async function autoReplyIfConfigured(
return;
}
if (replyResult.text) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${replyResult.text.length}`,
);
} else {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media)`,
);
}
try {
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
const sendTwilio = async (body: string, media?: string) => {
let resolvedMedia = media;
if (resolvedMedia && !/^https?:\/\//i.test(resolvedMedia)) {
@ -419,22 +555,48 @@ export async function autoReplyIfConfigured(
}
};
if (mediaList.length === 0) {
await sendTwilio(replyResult.text ?? "");
} else {
// First media with body (if any), then remaining as separate media-only sends.
await sendTwilio(replyResult.text ?? "", mediaList[0]);
for (const extra of mediaList.slice(1)) {
await sendTwilio("", extra);
for (const replyPayload of replies) {
const mediaList = replyPayload.mediaUrls?.length
? replyPayload.mediaUrls
: replyPayload.mediaUrl
? [replyPayload.mediaUrl]
: [];
const text = replyPayload.text ?? "";
const chunks = chunkText(text, TWILIO_TEXT_LIMIT);
if (chunks.length === 0) chunks.push("");
for (let i = 0; i < chunks.length; i++) {
const body = chunks[i];
const attachMedia = i === 0 ? mediaList[0] : undefined;
if (body) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo}, body length ${body.length}`,
);
} else if (attachMedia) {
logVerbose(
`Auto-replying via Twilio: from ${replyFrom} to ${replyTo} (media only)`,
);
}
await sendTwilio(body, attachMedia);
if (i === 0 && mediaList.length > 1) {
for (const extra of mediaList.slice(1)) {
await sendTwilio("", extra);
}
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${attachMedia ? ", media" : ""})`,
),
);
}
}
}
if (isVerbose()) {
console.log(
info(
`↩️ Auto-replied to ${replyTo} (sid ${message.sid ?? "no-sid"}${replyResult.mediaUrl ? ", media" : ""})`,
),
);
}
} catch (err) {
const anyErr = err as {
code?: string | number;

View File

@ -1,5 +1,6 @@
export type GetReplyOptions = {
onReplyStart?: () => Promise<unknown> | void;
onReplyStart?: () => Promise<void> | void;
isHeartbeat?: boolean;
};
export type ReplyPayload = {

View File

@ -1,12 +1,10 @@
import { autoReplyIfConfigured } from "../auto-reply/reply.js";
import { loadConfig } from "../config/config.js";
import { readEnv } from "../env.js";
import { info } from "../globals.js";
import { ensureBinary } from "../infra/binaries.js";
import { ensurePortAvailable, handlePortError } from "../infra/ports.js";
import { ensureFunnel, getTailnetHostname } from "../infra/tailscale.js";
import { ensureMediaHosted } from "../media/host.js";
import { getQueueSize } from "../process/command-queue.js";
import {
logWebSelfId,
monitorWebProvider,
@ -14,7 +12,6 @@ import {
} from "../providers/web/index.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { createClient } from "../twilio/client.js";
import { runTwilioHeartbeatOnce } from "../twilio/heartbeat.js";
import { listRecentMessages } from "../twilio/messages.js";
import { monitorTwilio as monitorTwilioImpl } from "../twilio/monitor.js";
import { sendMessage, waitForFinalStatus } from "../twilio/send.js";
@ -54,7 +51,6 @@ export async function monitorTwilio(
lookbackMinutes: number,
clientOverride?: ReturnType<typeof createClient>,
maxIterations = Infinity,
opts?: { heartbeatNow?: boolean; heartbeatMinutes?: number },
) {
// Adapter that wires default deps/runtime for the Twilio monitor loop.
return monitorTwilioImpl(intervalSeconds, lookbackMinutes, {
@ -66,13 +62,8 @@ export async function monitorTwilio(
readEnv,
createClient,
sleep,
loadConfig,
runTwilioHeartbeatOnce,
getQueueSize,
},
runtime: defaultRuntime,
heartbeatNow: opts?.heartbeatNow,
heartbeatMinutes: opts?.heartbeatMinutes,
});
}

View File

@ -451,15 +451,7 @@ Examples:
ensureTwilioEnv();
logTwilioFrom();
await monitorTwilio(
intervalSeconds,
lookbackMinutes,
undefined,
Infinity,
{
heartbeatNow,
},
);
await monitorTwilio(intervalSeconds, lookbackMinutes);
});
program

View File

@ -121,6 +121,7 @@ const ReplySchema = z
z.literal("opencode"),
z.literal("pi"),
z.literal("codex"),
z.literal("gemini"),
]),
format: z.union([z.literal("text"), z.literal("json")]).optional(),
identityPrefix: z.string().optional(),

View File

@ -12,6 +12,7 @@ export type SessionEntry = {
sessionId: string;
updatedAt: number;
systemSent?: boolean;
abortedLastRun?: boolean;
};
export const SESSION_STORE_DEFAULT = path.join(CONFIG_DIR, "sessions.json");

View File

@ -671,6 +671,124 @@ describe("config and templating", () => {
expect(secondArgv[secondArgv.length - 1]).toBe("[sys] next");
});
it("stores session id returned by agent meta when it differs", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-sessionid.json`,
);
vi.spyOn(crypto, "randomUUID").mockReturnValue("initial-sid");
const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockResolvedValue({
stdout: '{"text":"hi","session_id":"agent-sid-123"}\n',
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["claude", "{{Body}}"],
agent: { kind: "claude", format: "json" as const },
session: { store: tmpStore },
},
},
};
await index.getReplyFromConfig(
{ Body: "/new hi", From: "+1", To: "+2" },
undefined,
cfg,
runSpy,
);
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { sessionId?: string };
expect(entry.sessionId).toBe("agent-sid-123");
});
it("aborts command when stop word is received and skips command runner", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-abort.json`,
);
const runSpy = vi.fn().mockResolvedValue({
stdout: "should-not-run",
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "{{Body}}"],
session: { store: tmpStore },
},
},
};
const result = await index.getReplyFromConfig(
{ Body: "stop", From: "+1", To: "+2" },
undefined,
cfg,
runSpy,
);
expect(result?.text).toMatch(/aborted/i);
expect(runSpy).not.toHaveBeenCalled();
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean };
expect(entry.abortedLastRun).toBe(true);
});
it("adds an abort hint to the next prompt and then clears the flag", async () => {
const tmpStore = path.join(
os.tmpdir(),
`warelay-store-${Date.now()}-aborthint.json`,
);
const runSpy = vi.fn().mockResolvedValue({
stdout: "ok\n",
stderr: "",
code: 0,
signal: null,
killed: false,
});
const cfg = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "{{Body}}"],
session: { store: tmpStore },
},
},
};
await index.getReplyFromConfig(
{ Body: "abort", From: "+1555", To: "+2666" },
undefined,
cfg,
runSpy,
);
const result = await index.getReplyFromConfig(
{ Body: "continue", From: "+1555", To: "+2666" },
undefined,
cfg,
runSpy,
);
const argv = runSpy.mock.calls[0][0];
const prompt = argv.at(-1) as string;
expect(prompt).toMatch(/previous agent run was aborted/i);
expect(prompt).toMatch(/continue/);
const persisted = JSON.parse(fs.readFileSync(tmpStore, "utf-8"));
const entry = Object.values(persisted)[0] as { abortedLastRun?: boolean };
expect(entry.abortedLastRun).toBe(false);
expect(result?.text).toBe("ok");
});
it("refreshes typing indicator while command runs", async () => {
const onReplyStart = vi.fn();
const runSpy = vi.spyOn(index, "runCommandWithTimeout").mockImplementation(
@ -777,7 +895,7 @@ describe("config and templating", () => {
const argv = runSpy.mock.calls[0][0];
expect(argv[0]).toBe("claude");
expect(argv.at(-1)).toContain("You are Clawd (Claude)");
expect(argv.at(-1)).toContain("/Users/steipete/clawd");
expect(argv.at(-1)).toContain("scratchpad");
expect(argv.at(-1)).toMatch(/hi$/);
// The helper should auto-add print and output format flags without disturbing the prompt position.
expect(argv.includes("-p") || argv.includes("--print")).toBe(true);
@ -845,7 +963,7 @@ describe("config and templating", () => {
expect(result?.text).toBe("Sure! What's up?");
const argv = runSpy.mock.calls[0][0];
expect(argv.at(-1)).toContain("You are Clawd (Claude)");
expect(argv.at(-1)).toContain("/Users/steipete/clawd");
expect(argv.at(-1)).toContain("scratchpad");
});
it("serializes command auto-replies via the queue", async () => {

View File

@ -7,11 +7,7 @@ import { afterEach, describe, expect, it, vi } from "vitest";
import { setVerbose } from "./globals.js";
import { logDebug, logError, logInfo, logSuccess, logWarn } from "./logger.js";
import {
DEFAULT_LOG_DIR,
resetLogger,
setLoggerOverride,
} from "./logging.js";
import { DEFAULT_LOG_DIR, resetLogger, setLoggerOverride } from "./logging.js";
import type { RuntimeEnv } from "./runtime.js";
describe("logger helpers", () => {

View File

@ -133,7 +133,11 @@ function pruneOldRollingLogs(dir: string): void {
const cutoff = Date.now() - MAX_LOG_AGE_MS;
for (const entry of entries) {
if (!entry.isFile()) continue;
if (!entry.name.startsWith(`${LOG_PREFIX}-`) || !entry.name.endsWith(LOG_SUFFIX)) continue;
if (
!entry.name.startsWith(`${LOG_PREFIX}-`) ||
!entry.name.endsWith(LOG_SUFFIX)
)
continue;
const fullPath = path.join(dir, entry.name);
try {
const stat = fs.statSync(fullPath);

View File

@ -54,7 +54,9 @@ describe("media server", () => {
const server = await startMediaServer(0, 5_000);
const port = (server.address() as AddressInfo).port;
// URL-encoded "../" to bypass client-side path normalization
const res = await fetch(`http://localhost:${port}/media/%2e%2e%2fpackage.json`);
const res = await fetch(
`http://localhost:${port}/media/%2e%2e%2fpackage.json`,
);
expect(res.status).toBe(400);
expect(await res.text()).toBe("invalid path");
await new Promise((r) => server.close(r));

View File

@ -4,6 +4,7 @@ import path from "node:path";
import express, { type Express } from "express";
import { danger } from "../globals.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { detectMime } from "./mime.js";
import { cleanOldMedia, getMediaDir } from "./store.js";
const DEFAULT_TTL_MS = 2 * 60 * 1000;
@ -19,7 +20,6 @@ export function attachMediaRoutes(
const id = req.params.id;
const mediaRoot = (await fs.realpath(mediaDir)) + path.sep;
const file = path.resolve(mediaRoot, id);
try {
const lstat = await fs.lstat(file);
if (lstat.isSymbolicLink()) {
@ -37,12 +37,15 @@ export function attachMediaRoutes(
res.status(410).send("expired");
return;
}
res.sendFile(realPath);
const data = await fs.readFile(realPath);
const mime = detectMime({ buffer: data, filePath: realPath });
if (mime) res.type(mime);
res.send(data);
// best-effort single-use cleanup after response ends
res.on("finish", () => {
setTimeout(() => {
fs.rm(realPath).catch(() => {});
}, 500);
}, 50);
});
} catch {
res.status(404).send("not found");

View File

@ -0,0 +1,73 @@
import fs from "node:fs/promises";
import path from "node:path";
import { PassThrough } from "node:stream";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
const realOs = await vi.importActual<typeof import("node:os")>("node:os");
const HOME = path.join(realOs.tmpdir(), "warelay-home-redirect");
const mockRequest = vi.fn();
vi.doMock("node:os", () => ({
default: { homedir: () => HOME },
homedir: () => HOME,
}));
vi.doMock("node:https", () => ({
request: (...args: unknown[]) => mockRequest(...args),
}));
const { saveMediaSource } = await import("./store.js");
describe("media store redirects", () => {
beforeAll(async () => {
await fs.rm(HOME, { recursive: true, force: true });
});
afterAll(async () => {
await fs.rm(HOME, { recursive: true, force: true });
vi.clearAllMocks();
});
it("follows redirects and keeps detected mime/extension", async () => {
let call = 0;
mockRequest.mockImplementation((_url, _opts, cb) => {
call += 1;
const res = new PassThrough();
const req = {
on: (event: string, handler: (...args: unknown[]) => void) => {
if (event === "error") res.on("error", handler);
return req;
},
end: () => undefined,
destroy: () => res.destroy(),
} as const;
if (call === 1) {
res.statusCode = 302;
res.headers = { location: "https://example.com/final" };
setImmediate(() => {
cb(res as unknown as Parameters<typeof cb>[0]);
res.end();
});
} else {
res.statusCode = 200;
res.headers = { "content-type": "text/plain" };
setImmediate(() => {
cb(res as unknown as Parameters<typeof cb>[0]);
res.write("redirected");
res.end();
});
}
return req;
});
const saved = await saveMediaSource("https://example.com/start");
expect(mockRequest).toHaveBeenCalledTimes(2);
expect(saved.contentType).toBe("text/plain");
expect(path.extname(saved.path)).toBe(".txt");
expect(await fs.readFile(saved.path, "utf8")).toBe("redirected");
});
});

View File

@ -48,9 +48,21 @@ async function downloadToFile(
url: string,
dest: string,
headers?: Record<string, string>,
maxRedirects = 5,
): Promise<{ headerMime?: string; sniffBuffer: Buffer; size: number }> {
return await new Promise((resolve, reject) => {
const req = request(url, { headers }, (res) => {
// Follow redirects
if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400) {
const location = res.headers.location;
if (!location || maxRedirects <= 0) {
reject(new Error(`Redirect loop or missing Location header`));
return;
}
const redirectUrl = new URL(location, url).href;
resolve(downloadToFile(redirectUrl, dest, headers, maxRedirects - 1));
return;
}
if (!res.statusCode || res.statusCode >= 400) {
reject(new Error(`HTTP ${res.statusCode ?? "?"} downloading media`));
return;
@ -107,9 +119,9 @@ export async function saveMediaSource(
const dir = subdir ? path.join(MEDIA_DIR, subdir) : MEDIA_DIR;
await fs.mkdir(dir, { recursive: true });
await cleanOldMedia();
const id = crypto.randomUUID();
const baseId = crypto.randomUUID();
if (looksLikeUrl(source)) {
const tempDest = path.join(dir, `${id}.tmp`);
const tempDest = path.join(dir, `${baseId}.tmp`);
const { headerMime, sniffBuffer, size } = await downloadToFile(
source,
tempDest,
@ -122,7 +134,8 @@ export async function saveMediaSource(
});
const ext =
extensionForMime(mime) ?? path.extname(new URL(source).pathname);
const finalDest = path.join(dir, ext ? `${id}${ext}` : id);
const id = ext ? `${baseId}${ext}` : baseId;
const finalDest = path.join(dir, id);
await fs.rename(tempDest, finalDest);
return { id, path: finalDest, size, contentType: mime };
}
@ -137,7 +150,8 @@ export async function saveMediaSource(
const buffer = await fs.readFile(source);
const mime = detectMime({ buffer, filePath: source });
const ext = extensionForMime(mime) ?? path.extname(source);
const dest = path.join(dir, ext ? `${id}${ext}` : id);
const id = ext ? `${baseId}${ext}` : baseId;
const dest = path.join(dir, id);
await fs.writeFile(dest, buffer);
return { id, path: dest, size: stat.size, contentType: mime };
}
@ -152,10 +166,11 @@ export async function saveMediaBuffer(
}
const dir = path.join(MEDIA_DIR, subdir);
await fs.mkdir(dir, { recursive: true });
const id = crypto.randomUUID();
const baseId = crypto.randomUUID();
const mime = detectMime({ buffer, headerMime: contentType });
const ext = extensionForMime(mime);
const dest = path.join(dir, ext ? `${id}${ext}` : id);
const id = ext ? `${baseId}${ext}` : baseId;
const dest = path.join(dir, id);
await fs.writeFile(dest, buffer);
return { id, path: dest, size: buffer.byteLength, contentType: mime };
}

153
src/process/tau-rpc.ts Normal file
View File

@ -0,0 +1,153 @@
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import readline from "node:readline";
import { piSpec } from "../agents/pi.js";
type TauRpcOptions = {
argv: string[];
cwd?: string;
timeoutMs: number;
};
type TauRpcResult = {
stdout: string;
stderr: string;
code: number;
signal?: NodeJS.Signals | null;
killed?: boolean;
};
class TauRpcClient {
private child: ChildProcessWithoutNullStreams | null = null;
private rl: readline.Interface | null = null;
private stderr = "";
private buffer: string[] = [];
private idleTimer: NodeJS.Timeout | null = null;
private seenAssistantEnd = false;
private readonly idleMs = 120;
private pending:
| {
resolve: (r: TauRpcResult) => void;
reject: (err: unknown) => void;
timer: NodeJS.Timeout;
}
| undefined;
constructor(
private readonly argv: string[],
private readonly cwd: string | undefined,
) {}
private ensureChild() {
if (this.child) return;
this.child = spawn(this.argv[0], this.argv.slice(1), {
cwd: this.cwd,
stdio: ["pipe", "pipe", "pipe"],
});
this.rl = readline.createInterface({ input: this.child.stdout });
this.rl.on("line", (line) => this.handleLine(line));
this.child.stderr.on("data", (d) => {
this.stderr += d.toString();
});
this.child.on("exit", (code, signal) => {
if (this.pending) {
this.pending.reject(
new Error(`tau rpc exited (code=${code}, signal=${signal})`),
);
clearTimeout(this.pending.timer);
this.pending = undefined;
}
this.dispose();
});
}
private handleLine(line: string) {
if (!this.pending) return;
this.buffer.push(line);
// Streamed JSON arrives line-by-line; mark when an assistant message finishes
// and resolve after a short idle to capture any follow-up events (e.g. tools)
// that belong to the same turn.
if (
line.includes('"type":"message_end"') &&
line.includes('"role":"assistant"')
) {
this.seenAssistantEnd = true;
}
if (this.seenAssistantEnd) {
if (this.idleTimer) clearTimeout(this.idleTimer);
this.idleTimer = setTimeout(() => {
if (!this.pending) return;
const out = this.buffer.join("\n");
// Only resolve once we have at least one assistant text payload; otherwise keep waiting.
const parsed = piSpec.parseOutput(out);
if (parsed.texts && parsed.texts.length > 0) {
const pending = this.pending;
this.pending = undefined;
this.buffer = [];
this.seenAssistantEnd = false;
clearTimeout(pending.timer);
pending.resolve({ stdout: out, stderr: this.stderr, code: 0 });
return;
}
// No assistant text yet; wait for more lines.
}, this.idleMs); // small idle window to group streaming blocks
}
}
async prompt(prompt: string, timeoutMs: number): Promise<TauRpcResult> {
this.ensureChild();
if (this.pending) {
throw new Error("tau rpc already handling a request");
}
const child = this.child;
if (!child) throw new Error("tau rpc child not initialized");
await new Promise<void>((resolve, reject) => {
const ok = child.stdin.write(
`${JSON.stringify({
type: "prompt",
message: { role: "user", content: [{ type: "text", text: prompt }] },
})}\n`,
(err) => (err ? reject(err) : resolve()),
);
if (!ok) child.stdin.once("drain", () => resolve());
});
return await new Promise<TauRpcResult>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending = undefined;
reject(new Error(`tau rpc timed out after ${timeoutMs}ms`));
child.kill("SIGKILL");
}, timeoutMs);
this.pending = { resolve, reject, timer };
});
}
dispose() {
this.rl?.close();
this.rl = null;
if (this.child && !this.child.killed) {
this.child.kill("SIGKILL");
}
this.child = null;
this.buffer = [];
this.stderr = "";
}
}
let singleton: { key: string; client: TauRpcClient } | undefined;
export async function runPiRpc(
opts: TauRpcOptions & { prompt: string },
): Promise<TauRpcResult> {
const key = `${opts.cwd ?? ""}|${opts.argv.join(" ")}`;
if (!singleton || singleton.key !== key) {
singleton?.client.dispose();
singleton = { key, client: new TauRpcClient(opts.argv, opts.cwd) };
}
return singleton.client.prompt(opts.prompt, opts.timeoutMs);
}
export function resetPiRpc() {
singleton?.client.dispose();
singleton = undefined;
}

View File

@ -1,6 +1,6 @@
import { beforeEach, describe, expect, it, type Mock, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { HEARTBEAT_PROMPT, HEARTBEAT_TOKEN } from "../web/auto-reply.js";
import { HEARTBEAT_TOKEN } from "../web/auto-reply.js";
import { runTwilioHeartbeatOnce } from "./heartbeat.js";
vi.mock("./send.js", () => ({
@ -11,34 +11,15 @@ vi.mock("../auto-reply/reply.js", () => ({
getReplyFromConfig: vi.fn(),
}));
vi.mock("../auto-reply/heartbeat-prehook.js", () => ({
runHeartbeatPreHook: vi.fn(),
buildHeartbeatPrompt: vi.fn((base: string, ctx?: string) =>
ctx ? `${base}\n\n---\nContext from pre-hook:\n${ctx}` : base,
),
}));
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn(() => ({})),
}));
// eslint-disable-next-line import/first
import { runHeartbeatPreHook } from "../auto-reply/heartbeat-prehook.js";
// eslint-disable-next-line import/first
import { getReplyFromConfig } from "../auto-reply/reply.js";
// eslint-disable-next-line import/first
import { sendMessage } from "./send.js";
const sendMessageMock = sendMessage as unknown as Mock;
const replyResolverMock = getReplyFromConfig as unknown as Mock;
const runHeartbeatPreHookMock = runHeartbeatPreHook as unknown as Mock;
const sendMessageMock = sendMessage as unknown as vi.Mock;
const replyResolverMock = getReplyFromConfig as unknown as vi.Mock;
describe("runTwilioHeartbeatOnce", () => {
beforeEach(() => {
vi.clearAllMocks();
runHeartbeatPreHookMock.mockResolvedValue({ durationMs: 0 });
});
it("sends manual override body and skips resolver", async () => {
sendMessageMock.mockResolvedValue({});
await runTwilioHeartbeatOnce({
@ -91,80 +72,4 @@ describe("runTwilioHeartbeatOnce", () => {
expect.anything(),
);
});
describe("pre-hook integration", () => {
it("runs pre-hook and includes context in prompt", async () => {
runHeartbeatPreHookMock.mockResolvedValue({
context: "You have 3 unread emails",
durationMs: 100,
});
replyResolverMock.mockResolvedValue({ text: "ALERT!" });
sendMessageMock.mockResolvedValue({});
await runTwilioHeartbeatOnce({ to: "+1555" });
expect(runHeartbeatPreHookMock).toHaveBeenCalled();
expect(replyResolverMock).toHaveBeenCalledWith(
expect.objectContaining({
Body: expect.stringContaining("Context from pre-hook"),
}),
undefined,
);
});
it("skips pre-hook when skipPreHook is true", async () => {
replyResolverMock.mockResolvedValue({ text: "ALERT!" });
sendMessageMock.mockResolvedValue({});
await runTwilioHeartbeatOnce({ to: "+1555", skipPreHook: true });
expect(runHeartbeatPreHookMock).not.toHaveBeenCalled();
expect(replyResolverMock).toHaveBeenCalledWith(
expect.objectContaining({
Body: HEARTBEAT_PROMPT,
}),
undefined,
);
});
it("continues with basic heartbeat on pre-hook failure", async () => {
runHeartbeatPreHookMock.mockResolvedValue({
error: "Pre-hook failed",
durationMs: 50,
});
replyResolverMock.mockResolvedValue({ text: "ALERT!" });
sendMessageMock.mockResolvedValue({});
await runTwilioHeartbeatOnce({ to: "+1555" });
expect(replyResolverMock).toHaveBeenCalledWith(
expect.objectContaining({
Body: HEARTBEAT_PROMPT,
}),
undefined,
);
expect(sendMessage).toHaveBeenCalled();
});
it("uses injected config to avoid loading real config in tests", async () => {
const testConfig = {
inbound: {
reply: {
mode: "command" as const,
command: ["echo"],
session: {
heartbeatPreHook: ["test-script"],
},
},
},
};
runHeartbeatPreHookMock.mockResolvedValue({ durationMs: 0 });
replyResolverMock.mockResolvedValue({ text: "OK" });
sendMessageMock.mockResolvedValue({});
await runTwilioHeartbeatOnce({ to: "+1555", cfg: testConfig });
expect(runHeartbeatPreHookMock).toHaveBeenCalledWith(testConfig);
});
});
});

View File

@ -1,9 +1,4 @@
import {
buildHeartbeatPrompt,
runHeartbeatPreHook,
} from "../auto-reply/heartbeat-prehook.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import { loadConfig, type WarelayConfig } from "../config/config.js";
import { danger, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
@ -19,8 +14,6 @@ export async function runTwilioHeartbeatOnce(opts: {
replyResolver?: ReplyResolver;
overrideBody?: string;
dryRun?: boolean;
skipPreHook?: boolean;
cfg?: WarelayConfig;
}) {
const {
to,
@ -28,10 +21,8 @@ export async function runTwilioHeartbeatOnce(opts: {
runtime = defaultRuntime,
overrideBody,
dryRun = false,
skipPreHook = false,
} = opts;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
const cfg = opts.cfg ?? loadConfig();
if (overrideBody && overrideBody.trim().length === 0) {
throw new Error("Override body must be non-empty when provided.");
@ -51,52 +42,40 @@ export async function runTwilioHeartbeatOnce(opts: {
return;
}
// Run pre-hook unless skipped
let heartbeatPrompt = HEARTBEAT_PROMPT;
if (!skipPreHook) {
const preHookResult = await runHeartbeatPreHook(cfg);
if (preHookResult.error) {
logInfo(
`Pre-hook failed: ${preHookResult.error} (continuing)`,
runtime,
);
}
heartbeatPrompt = buildHeartbeatPrompt(
HEARTBEAT_PROMPT,
preHookResult.context,
);
}
const replyResult = await replyResolver(
{
Body: heartbeatPrompt,
Body: HEARTBEAT_PROMPT,
From: to,
To: to,
MessageSid: undefined,
},
undefined,
{ isHeartbeat: true },
);
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
!replyPayload ||
(!replyPayload.text &&
!replyPayload.mediaUrl &&
!replyPayload.mediaUrls?.length)
) {
logInfo("heartbeat skipped: empty reply", runtime);
return;
}
const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0,
replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
);
const stripped = stripHeartbeatToken(replyResult.text);
const stripped = stripHeartbeatToken(replyPayload.text);
if (stripped.shouldSkip && !hasMedia) {
logInfo(success("heartbeat: ok (HEARTBEAT_OK)"), runtime);
return;
}
const finalText = stripped.text || replyResult.text || "";
const finalText = stripped.text || replyPayload.text || "";
if (dryRun) {
logInfo(
`[dry-run] heartbeat -> ${to}: ${finalText.slice(0, 200)}`,

View File

@ -1,41 +1,8 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import {
monitorTwilio,
resetSeenMessageSids,
resolveHeartbeatRecipient,
} from "./monitor.js";
// Base mock deps factory
function createMockDeps(overrides: Record<string, unknown> = {}) {
return {
autoReplyIfConfigured: vi.fn().mockResolvedValue(undefined),
listRecentMessages: vi.fn().mockResolvedValue([]),
readEnv: vi.fn(() => ({
accountSid: "AC",
whatsappFrom: "whatsapp:+1",
auth: { accountSid: "AC", authToken: "t" },
})),
createClient: vi.fn(() => ({ messages: { create: vi.fn() } }) as never),
sleep: vi.fn().mockResolvedValue(undefined),
loadConfig: vi.fn(() => ({})),
runTwilioHeartbeatOnce: vi.fn().mockResolvedValue(undefined),
getQueueSize: vi.fn(() => 0),
...overrides,
};
}
import { monitorTwilio } from "./monitor.js";
describe("monitorTwilio", () => {
beforeEach(() => {
resetSeenMessageSids();
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("processes inbound messages once with injected deps", async () => {
const listRecentMessages = vi.fn().mockResolvedValue([
{
@ -50,300 +17,29 @@ describe("monitorTwilio", () => {
status: null,
},
]);
const autoReplyIfConfigured = vi.fn().mockResolvedValue(undefined);
const readEnv = vi.fn(() => ({
accountSid: "AC",
whatsappFrom: "whatsapp:+1",
auth: { accountSid: "AC", authToken: "t" },
}));
const createClient = vi.fn(
() => ({ messages: { create: vi.fn() } }) as never,
);
const sleep = vi.fn().mockResolvedValue(undefined);
const deps = createMockDeps({ listRecentMessages });
const monitorPromise = monitorTwilio(0, 0, {
deps,
await monitorTwilio(0, 0, {
deps: {
autoReplyIfConfigured,
listRecentMessages,
readEnv,
createClient,
sleep,
},
maxIterations: 1,
});
// Advance timers to complete the iteration
await vi.runAllTimersAsync();
await monitorPromise;
expect(listRecentMessages).toHaveBeenCalledTimes(1);
expect(deps.autoReplyIfConfigured).toHaveBeenCalledTimes(1);
});
describe("heartbeat timer setup", () => {
it("sets up heartbeat timer when heartbeatMinutes is configured", async () => {
const setIntervalSpy = vi.spyOn(global, "setInterval");
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
allowFrom: ["+15551234567"], // Provide a recipient
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 1,
},
},
})),
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
});
await vi.runAllTimersAsync();
await monitorPromise;
// Heartbeat timer should have been set up with 60000ms (1 minute) interval
expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 60_000);
});
it("does not set up heartbeat timer when heartbeatMinutes is 0", async () => {
const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined);
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 0,
},
},
})),
runTwilioHeartbeatOnce,
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
});
// Advance timer past what would be the heartbeat interval
await vi.advanceTimersByTimeAsync(60_000);
await vi.runAllTimersAsync();
await monitorPromise;
// Heartbeat should not have been triggered
expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled();
});
});
describe("heartbeat immediate (heartbeatNow)", () => {
it("runs immediate heartbeat when heartbeatNow is true", async () => {
const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined);
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
allowFrom: ["+15551234567"],
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 10,
},
},
})),
runTwilioHeartbeatOnce,
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
heartbeatNow: true,
});
// Run immediate timer callbacks (for the immediate heartbeat)
await vi.runAllTimersAsync();
await monitorPromise;
// Heartbeat should have been called immediately
expect(runTwilioHeartbeatOnce).toHaveBeenCalled();
});
});
describe("heartbeat skips when busy", () => {
it("skips heartbeat when command queue is busy", async () => {
const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined);
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
allowFrom: ["+15551234567"],
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 1,
},
},
})),
runTwilioHeartbeatOnce,
getQueueSize: vi.fn(() => 1), // Queue is busy
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
heartbeatNow: true,
});
await vi.runAllTimersAsync();
await monitorPromise;
// Heartbeat should NOT have been called because queue is busy
expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled();
});
});
describe("heartbeat error handling", () => {
it("catches heartbeat errors without crashing", async () => {
const runtimeError = vi.fn();
const runTwilioHeartbeatOnce = vi
.fn()
.mockRejectedValue(new Error("Heartbeat failed"));
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
allowFrom: ["+15551234567"],
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 1,
},
},
})),
runTwilioHeartbeatOnce,
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
heartbeatNow: true,
runtime: {
log: vi.fn(),
error: runtimeError,
exit: vi.fn() as unknown as (code: number) => never,
},
});
await vi.runAllTimersAsync();
await monitorPromise;
// Should have logged error but not crashed
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("Heartbeat failed"),
);
});
});
describe("heartbeat idle time check", () => {
it("skips heartbeat when not idle long enough", async () => {
const runTwilioHeartbeatOnce = vi.fn().mockResolvedValue(undefined);
const nowMs = Date.now();
const recentInboundTime = nowMs - 2 * 60_000; // 2 minutes ago
const deps = createMockDeps({
listRecentMessages: vi.fn().mockResolvedValue([
{
sid: "m1",
direction: "inbound",
dateCreated: new Date(recentInboundTime),
from: "+15559999999",
to: "+15551234567",
body: "hi",
errorCode: null,
errorMessage: null,
status: null,
},
]),
loadConfig: vi.fn(() => ({
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 1,
session: {
heartbeatIdleMinutes: 5, // Require 5 min idle
},
},
},
})),
runTwilioHeartbeatOnce,
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 2,
});
// Advance to trigger heartbeat (60s)
await vi.advanceTimersByTimeAsync(60_000);
await vi.runAllTimersAsync();
await monitorPromise;
// Heartbeat should be skipped because last inbound was only 2 min ago (< 5 min)
expect(runTwilioHeartbeatOnce).not.toHaveBeenCalled();
});
});
describe("timer cleanup", () => {
it("clears heartbeat timer on exit", async () => {
const clearIntervalSpy = vi.spyOn(global, "clearInterval");
const deps = createMockDeps({
loadConfig: vi.fn(() => ({
inbound: {
reply: {
mode: "command" as const,
command: ["echo", "test"],
heartbeatMinutes: 1,
},
},
})),
});
const monitorPromise = monitorTwilio(5, 5, {
deps,
maxIterations: 1,
});
await vi.runAllTimersAsync();
await monitorPromise;
// clearInterval should have been called for cleanup
expect(clearIntervalSpy).toHaveBeenCalled();
});
});
});
describe("resolveHeartbeatRecipient", () => {
it("returns lastInboundFrom when available", () => {
const cfg = { inbound: { allowFrom: ["+15551234567"] } };
const result = resolveHeartbeatRecipient(cfg, "whatsapp:+15559999999");
expect(result).toBe("+15559999999");
});
it("strips whatsapp: prefix from lastInboundFrom", () => {
const cfg = {};
const result = resolveHeartbeatRecipient(cfg, "whatsapp:+15559999999");
expect(result).toBe("+15559999999");
});
it("falls back to first non-wildcard allowFrom entry", () => {
const cfg = {
inbound: { allowFrom: ["*", "+15551234567", "+15552222222"] },
};
const result = resolveHeartbeatRecipient(cfg, undefined);
expect(result).toBe("+15551234567");
});
it("returns null when allowFrom is empty", () => {
const cfg = { inbound: { allowFrom: [] } };
const result = resolveHeartbeatRecipient(cfg, undefined);
expect(result).toBeNull();
});
it("returns null when allowFrom contains only wildcards", () => {
const cfg = { inbound: { allowFrom: ["*"] } };
const result = resolveHeartbeatRecipient(cfg, undefined);
expect(result).toBeNull();
});
it("returns null when no allowFrom and no lastInboundFrom", () => {
const cfg = {};
const result = resolveHeartbeatRecipient(cfg, undefined);
expect(result).toBeNull();
expect(autoReplyIfConfigured).toHaveBeenCalledTimes(1);
});
});

View File

@ -1,15 +1,11 @@
import type { MessageInstance } from "twilio/lib/rest/api/v2010/account/message.js";
import { autoReplyIfConfigured } from "../auto-reply/reply.js";
import { loadConfig, type WarelayConfig } from "../config/config.js";
import { readEnv } from "../env.js";
import { danger } from "../globals.js";
import { logDebug, logInfo, logWarn } from "../logger.js";
import { getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164, sleep, withWhatsAppPrefix } from "../utils.js";
import { resolveReplyHeartbeatMinutes } from "../web/auto-reply.js";
import { sleep, withWhatsAppPrefix } from "../utils.js";
import { createClient } from "./client.js";
import { runTwilioHeartbeatOnce } from "./heartbeat.js";
type MonitorDeps = {
autoReplyIfConfigured: typeof autoReplyIfConfigured;
@ -21,10 +17,6 @@ type MonitorDeps = {
readEnv: typeof readEnv;
createClient: typeof createClient;
sleep: typeof sleep;
// Heartbeat dependencies
loadConfig: typeof loadConfig;
runTwilioHeartbeatOnce: typeof runTwilioHeartbeatOnce;
getQueueSize: typeof getQueueSize;
};
const DEFAULT_POLL_INTERVAL_SECONDS = 5;
@ -46,9 +38,6 @@ type MonitorOptions = {
maxIterations?: number;
deps?: MonitorDeps;
runtime?: RuntimeEnv;
// Heartbeat options
heartbeatNow?: boolean; // Run heartbeat immediately on start
heartbeatMinutes?: number; // Override config value
};
const defaultDeps: MonitorDeps = {
@ -57,104 +46,8 @@ const defaultDeps: MonitorDeps = {
readEnv,
createClient,
sleep,
// Heartbeat deps
loadConfig,
runTwilioHeartbeatOnce,
getQueueSize,
};
// Lightweight mutex for serializing heartbeat and auto-reply
let inFlightLock: Promise<void> = Promise.resolve();
function acquireLock(): Promise<() => void> {
let release: (() => void) | undefined;
const prev = inFlightLock;
inFlightLock = new Promise<void>((resolve) => {
release = resolve;
});
return prev.then(() => {
if (!release) throw new Error("Lock release function not set");
return release;
});
}
// Resolve recipient for heartbeat: uses lastInboundFrom or first non-wildcard allowFrom entry
function resolveHeartbeatRecipient(
cfg: WarelayConfig,
lastInboundFrom: string | undefined,
): string | null {
// Prefer last inbound sender
if (lastInboundFrom) {
// Strip whatsapp: prefix if present
const cleaned = lastInboundFrom.replace(/^whatsapp:/, "");
return normalizeE164(cleaned);
}
// Fall back to first non-wildcard allowFrom entry
const allowFrom = cfg.inbound?.allowFrom ?? [];
const nonWildcard = allowFrom.filter((v) => v !== "*");
if (nonWildcard.length > 0 && nonWildcard[0]) {
return normalizeE164(nonWildcard[0]);
}
return null;
}
// State tracking for heartbeat
type HeartbeatState = {
lastInboundFrom: string | undefined;
lastInboundAt: number | undefined;
};
// Run heartbeat once with serialization
async function runTwilioHeartbeatLoop(params: {
deps: MonitorDeps;
runtime: RuntimeEnv;
cfg: WarelayConfig;
state: HeartbeatState;
}) {
const { deps, runtime, cfg, state } = params;
const release = await acquireLock();
try {
// Check if command queue is busy
if (deps.getQueueSize() > 0) {
logInfo("heartbeat: skipped (requests in flight)", runtime);
return;
}
const recipient = resolveHeartbeatRecipient(cfg, state.lastInboundFrom);
if (!recipient) {
logInfo(
"heartbeat: skipped (no recipient - configure allowFrom or wait for inbound)",
runtime,
);
return;
}
// Check idle time threshold if configured
const idleMinutes =
cfg.inbound?.reply?.session?.heartbeatIdleMinutes ??
cfg.inbound?.reply?.session?.idleMinutes;
if (idleMinutes && state.lastInboundAt) {
const idleMs = Date.now() - state.lastInboundAt;
if (idleMs < idleMinutes * 60_000) {
logInfo(
`heartbeat: skipped (idle ${Math.floor(idleMs / 60_000)}m < ${idleMinutes}m)`,
runtime,
);
return;
}
}
await deps.runTwilioHeartbeatOnce({
to: recipient,
runtime,
cfg,
});
} finally {
release();
}
}
// Poll Twilio for inbound messages and auto-reply when configured.
export async function monitorTwilio(
pollSeconds: number,
@ -169,121 +62,44 @@ export async function monitorTwilio(
const env = deps.readEnv(runtime);
const from = withWhatsAppPrefix(env.whatsappFrom);
const client = opts?.client ?? deps.createClient(env);
// Load config and resolve heartbeat minutes
const cfg = deps.loadConfig();
const heartbeatMinutes = resolveReplyHeartbeatMinutes(
cfg,
opts?.heartbeatMinutes,
);
// Heartbeat state tracking
const heartbeatState: HeartbeatState = {
lastInboundFrom: undefined,
lastInboundAt: undefined,
};
let heartbeatTimer: NodeJS.Timeout | null = null;
// Cleanup function for the heartbeat timer
const clearHeartbeatTimer = () => {
if (heartbeatTimer) {
clearInterval(heartbeatTimer);
heartbeatTimer = null;
}
};
// Log startup info
const heartbeatInfo = heartbeatMinutes
? `Heartbeat: every ${heartbeatMinutes}m`
: "Heartbeat: disabled";
logInfo(
`📡 Monitoring inbound messages to ${from} (poll ${pollSeconds}s, lookback ${lookbackMinutes}m) | ${heartbeatInfo}`,
`📡 Monitoring inbound messages to ${from} (poll ${pollSeconds}s, lookback ${lookbackMinutes}m)`,
runtime,
);
// Set up heartbeat timer if enabled
if (heartbeatMinutes) {
const intervalMs = heartbeatMinutes * 60_000;
heartbeatTimer = setInterval(() => {
void runTwilioHeartbeatLoop({
deps,
runtime,
cfg,
state: heartbeatState,
}).catch((err) => {
runtime.error(danger(`Heartbeat error: ${String(err)}`));
});
}, intervalMs);
// Run immediate heartbeat if requested
if (opts?.heartbeatNow) {
void runTwilioHeartbeatLoop({
deps,
runtime,
cfg,
state: heartbeatState,
}).catch((err) => {
runtime.error(danger(`Immediate heartbeat error: ${String(err)}`));
});
}
}
let lastSeenSid: string | undefined;
let iterations = 0;
try {
while (iterations < maxIterations) {
let messages: ListedMessage[] = [];
try {
messages =
(await deps.listRecentMessages(lookbackMinutes, 50, client)) ?? [];
backoffMs = 1_000; // reset after success
} catch (err) {
logWarn(
`Twilio polling failed (will retry in ${backoffMs}ms): ${String(err)}`,
runtime,
);
await deps.sleep(backoffMs);
backoffMs = Math.min(backoffMs * 2, 10_000);
continue;
}
const inboundOnly = messages.filter((m) => m.direction === "inbound");
// Sort newest -> oldest without relying on external helpers (avoids test mocks clobbering imports).
const newestFirst = [...inboundOnly].sort(
(a, b) =>
(b.dateCreated?.getTime() ?? 0) - (a.dateCreated?.getTime() ?? 0),
);
// Update heartbeat state from newest inbound message
if (newestFirst.length > 0 && newestFirst[0].from) {
heartbeatState.lastInboundFrom = newestFirst[0].from;
heartbeatState.lastInboundAt = newestFirst[0].dateCreated?.getTime();
}
await handleMessages(messages, client, lastSeenSid, deps, runtime);
lastSeenSid = newestFirst.length ? newestFirst[0].sid : lastSeenSid;
iterations += 1;
if (iterations >= maxIterations) break;
await deps.sleep(
Math.max(pollSeconds, DEFAULT_POLL_INTERVAL_SECONDS) * 1000,
while (iterations < maxIterations) {
let messages: ListedMessage[] = [];
try {
messages =
(await deps.listRecentMessages(lookbackMinutes, 50, client)) ?? [];
backoffMs = 1_000; // reset after success
} catch (err) {
logWarn(
`Twilio polling failed (will retry in ${backoffMs}ms): ${String(err)}`,
runtime,
);
await deps.sleep(backoffMs);
backoffMs = Math.min(backoffMs * 2, 10_000);
continue;
}
} finally {
clearHeartbeatTimer();
const inboundOnly = messages.filter((m) => m.direction === "inbound");
// Sort newest -> oldest without relying on external helpers (avoids test mocks clobbering imports).
const newestFirst = [...inboundOnly].sort(
(a, b) =>
(b.dateCreated?.getTime() ?? 0) - (a.dateCreated?.getTime() ?? 0),
);
await handleMessages(messages, client, lastSeenSid, deps, runtime);
lastSeenSid = newestFirst.length ? newestFirst[0].sid : lastSeenSid;
iterations += 1;
if (iterations >= maxIterations) break;
await deps.sleep(
Math.max(pollSeconds, DEFAULT_POLL_INTERVAL_SECONDS) * 1000,
);
}
}
// Track all seen message SIDs to avoid re-processing
const seenMessageSids = new Set<string>();
// Export for testing - reset seen message SIDs between test runs
export function resetSeenMessageSids() {
seenMessageSids.clear();
}
// Export for testing
export { resolveHeartbeatRecipient };
async function handleMessages(
messages: ListedMessage[],
client: ReturnType<typeof createClient>,
@ -293,16 +109,6 @@ async function handleMessages(
) {
for (const m of messages) {
if (!m.sid) continue;
// Skip messages we've already seen/logged
if (seenMessageSids.has(m.sid)) continue;
seenMessageSids.add(m.sid);
// Limit set size to prevent memory leak
if (seenMessageSids.size > 1000) {
const oldestSids = Array.from(seenMessageSids).slice(0, 500);
for (const sid of oldestSids) {
seenMessageSids.delete(sid);
}
}
if (lastSeenSid && m.sid === lastSeenSid) break; // stop at previously seen
logDebug(`[${m.sid}] ${m.from ?? "?"} -> ${m.to ?? "?"}: ${m.body ?? ""}`);
if (m.direction !== "inbound") continue;

View File

@ -69,7 +69,7 @@ export async function startWebhook(
}
const client = createClient(env);
let replyResult: ReplyPayload | undefined =
let replyResult: ReplyPayload | ReplyPayload[] | undefined =
autoReply !== undefined ? { text: autoReply } : undefined;
if (!replyResult) {
replyResult = await getReplyFromConfig(
@ -83,14 +83,20 @@ export async function startWebhook(
MediaType: mediaType,
},
{
onReplyStart: () => sendTypingIndicator(client, runtime, MessageSid),
onReplyStart: async () => {
await sendTypingIndicator(client, runtime, MessageSid);
},
},
);
}
if (replyResult && (replyResult.text || replyResult.mediaUrl)) {
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if (replyPayload && (replyPayload.text || replyPayload.mediaUrl)) {
try {
let mediaUrl = replyResult.mediaUrl;
let mediaUrl = replyPayload.mediaUrl;
if (mediaUrl && !/^https?:\/\//i.test(mediaUrl)) {
const hosted = await mediaHost.ensureMediaHosted(mediaUrl);
mediaUrl = hosted.url;
@ -98,7 +104,7 @@ export async function startWebhook(
await client.messages.create({
from: To,
to: From,
body: replyResult.text ?? "",
body: replyPayload.text ?? "",
...(mediaUrl ? { mediaUrl: [mediaUrl] } : {}),
});
if (verbose)

View File

@ -1,7 +1,4 @@
import {
buildHeartbeatPrompt,
runHeartbeatPreHook,
} from "../auto-reply/heartbeat-prehook.js";
import { chunkText } from "../auto-reply/chunk.js";
import { getReplyFromConfig } from "../auto-reply/reply.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { waitForever } from "../cli/wait.js";
@ -16,7 +13,7 @@ import {
import { danger, info, isVerbose, logVerbose, success } from "../globals.js";
import { logInfo } from "../logger.js";
import { getChildLogger } from "../logging.js";
import { enqueueCommand, getQueueSize } from "../process/command-queue.js";
import { getQueueSize } from "../process/command-queue.js";
import { defaultRuntime, type RuntimeEnv } from "../runtime.js";
import { normalizeE164 } from "../utils.js";
import { monitorWebInbox } from "./inbound.js";
@ -33,6 +30,8 @@ import {
} from "./reconnect.js";
import { getWebAuthAgeMs } from "./session.js";
const WEB_TEXT_LIMIT = 4000;
/**
* Send a message via IPC if relay is running, otherwise fall back to direct.
* This avoids Signal session corruption from multiple Baileys connections.
@ -73,7 +72,7 @@ const formatDuration = (ms: number) =>
const DEFAULT_REPLY_HEARTBEAT_MINUTES = 30;
export const HEARTBEAT_TOKEN = "HEARTBEAT_OK";
export const HEARTBEAT_PROMPT = "HEARTBEAT ultrathink";
export const HEARTBEAT_PROMPT = "HEARTBEAT /think:high";
export function resolveReplyHeartbeatMinutes(
cfg: ReturnType<typeof loadConfig>,
@ -109,7 +108,6 @@ export async function runWebHeartbeatOnce(opts: {
sessionId?: string;
overrideBody?: string;
dryRun?: boolean;
skipPreHook?: boolean;
}) {
const {
cfg: cfgOverride,
@ -118,7 +116,6 @@ export async function runWebHeartbeatOnce(opts: {
sessionId,
overrideBody,
dryRun = false,
skipPreHook = false,
} = opts;
const _runtime = opts.runtime ?? defaultRuntime;
const replyResolver = opts.replyResolver ?? getReplyFromConfig;
@ -187,51 +184,25 @@ export async function runWebHeartbeatOnce(opts: {
return;
}
// Run pre-hook unless skipped or overrideBody provided
let heartbeatPrompt = HEARTBEAT_PROMPT;
if (!skipPreHook) {
const preHookResult = await runHeartbeatPreHook(cfg);
if (preHookResult.error) {
heartbeatLogger.warn(
{
to,
error: preHookResult.error,
durationMs: preHookResult.durationMs,
timedOut: preHookResult.timedOut,
},
"heartbeat pre-hook failed (continuing with basic heartbeat)",
);
} else if (preHookResult.context) {
heartbeatLogger.info(
{
to,
contextLength: preHookResult.context.length,
durationMs: preHookResult.durationMs,
},
"heartbeat pre-hook succeeded",
);
}
heartbeatPrompt = buildHeartbeatPrompt(
HEARTBEAT_PROMPT,
preHookResult.context,
);
}
const replyResult = await replyResolver(
{
Body: heartbeatPrompt,
Body: HEARTBEAT_PROMPT,
From: to,
To: to,
MessageSid: sessionId ?? sessionSnapshot.entry?.sessionId,
},
undefined,
{ isHeartbeat: true },
cfg,
);
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
!replyPayload ||
(!replyPayload.text &&
!replyPayload.mediaUrl &&
!replyPayload.mediaUrls?.length)
) {
heartbeatLogger.info(
{
@ -246,9 +217,9 @@ export async function runWebHeartbeatOnce(opts: {
}
const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0,
replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
);
const stripped = stripHeartbeatToken(replyResult.text);
const stripped = stripHeartbeatToken(replyPayload.text);
if (stripped.shouldSkip && !hasMedia) {
// Don't let heartbeats keep sessions alive: restore previous updatedAt so idle expiry still works.
const sessionCfg = cfg.inbound?.reply?.session;
@ -260,7 +231,7 @@ export async function runWebHeartbeatOnce(opts: {
}
heartbeatLogger.info(
{ to, reason: "heartbeat-token", rawLength: replyResult.text?.length },
{ to, reason: "heartbeat-token", rawLength: replyPayload.text?.length },
"heartbeat skipped",
);
console.log(success("heartbeat: ok (HEARTBEAT_OK)"));
@ -274,7 +245,7 @@ export async function runWebHeartbeatOnce(opts: {
);
}
const finalText = stripped.text || replyResult.text || "";
const finalText = stripped.text || replyPayload.text || "";
if (dryRun) {
heartbeatLogger.info(
{ to, reason: "dry-run", chars: finalText.length },
@ -407,14 +378,18 @@ async function deliverWebReply(params: {
skipLog,
} = params;
const replyStarted = Date.now();
const textChunks = chunkText(replyResult.text || "", WEB_TEXT_LIMIT);
const mediaList = replyResult.mediaUrls?.length
? replyResult.mediaUrls
: replyResult.mediaUrl
? [replyResult.mediaUrl]
: [];
if (mediaList.length === 0 && replyResult.text) {
await msg.reply(replyResult.text || "");
// Text-only replies
if (mediaList.length === 0 && textChunks.length) {
for (const chunk of textChunks) {
await msg.reply(chunk);
}
if (!skipLog) {
logInfo(
`✅ Sent web reply to ${msg.from} (${(Date.now() - replyStarted).toFixed(0)}ms)`,
@ -438,8 +413,12 @@ async function deliverWebReply(params: {
return;
}
const cleanText = replyResult.text ?? undefined;
const remainingText = [...textChunks];
// Media (with optional caption on first item)
for (const [index, mediaUrl] of mediaList.entries()) {
const caption =
index === 0 ? remainingText.shift() || undefined : undefined;
try {
const media = await loadWebMedia(mediaUrl, maxMediaBytes);
if (isVerbose()) {
@ -450,7 +429,6 @@ async function deliverWebReply(params: {
`Web auto-reply media source: ${mediaUrl} (kind ${media.kind})`,
);
}
const caption = index === 0 ? cleanText || undefined : undefined;
if (media.kind === "image") {
await msg.sendMedia({
image: media.buffer,
@ -490,7 +468,7 @@ async function deliverWebReply(params: {
connectionId: connectionId ?? null,
to: msg.from,
from: msg.to,
text: index === 0 ? (cleanText ?? null) : null,
text: caption ?? null,
mediaUrl,
mediaSizeBytes: media.buffer.length,
mediaKind: media.kind,
@ -502,12 +480,21 @@ async function deliverWebReply(params: {
console.error(
danger(`Failed sending web media to ${msg.from}: ${String(err)}`),
);
if (index === 0 && cleanText) {
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
await msg.reply(cleanText || "");
replyLogger.warn({ err, mediaUrl }, "failed to send web media reply");
if (index === 0) {
const fallbackText = remainingText.shift() ?? caption ?? "";
if (fallbackText) {
console.log(`⚠️ Media skipped; sent text-only to ${msg.from}`);
await msg.reply(fallbackText);
}
}
}
}
// Remaining text chunks after media
for (const chunk of remainingText) {
await msg.reply(chunk);
}
}
export async function monitorWebProvider(
@ -657,84 +644,85 @@ export async function monitorWebProvider(
: new Date().toISOString();
console.log(`\n[${tsDisplay}] ${from} -> ${latest.to}: ${combinedBody}`);
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
),
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: combinedBody,
From: latest.from,
To: latest.to,
MessageSid: latest.id,
MediaPath: latest.mediaPath,
MediaUrl: latest.mediaUrl,
MediaType: latest.mediaType,
},
{
onReplyStart: latest.sendComposing,
},
);
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
) {
const replyList = replyResult
? Array.isArray(replyResult)
? replyResult
: [replyResult]
: [];
if (replyList.length === 0) {
logVerbose("Skipping auto-reply: no text/media returned from resolver");
return;
}
// Apply response prefix if configured (skip for HEARTBEAT_OK to preserve exact match)
const responsePrefix = cfg.inbound?.responsePrefix;
if (
responsePrefix &&
replyResult.text &&
replyResult.text.trim() !== HEARTBEAT_TOKEN
) {
if (!replyResult.text.startsWith(responsePrefix)) {
replyResult.text = `${responsePrefix} ${replyResult.text}`;
for (const replyPayload of replyList) {
if (
responsePrefix &&
replyPayload.text &&
replyPayload.text.trim() !== HEARTBEAT_TOKEN &&
!replyPayload.text.startsWith(responsePrefix)
) {
replyPayload.text = `${responsePrefix} ${replyPayload.text}`;
}
}
try {
await deliverWebReply({
replyResult,
msg: latest,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
try {
await deliverWebReply({
replyResult: replyPayload,
msg: latest,
maxMediaBytes,
replyLogger,
runtime,
connectionId,
});
if (replyResult.text) {
recentlySent.add(replyResult.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyResult.text.substring(0, 50)}...`,
);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
if (replyPayload.text) {
recentlySent.add(replyPayload.text);
recentlySent.add(combinedBody); // Prevent echo on the batch text itself
logVerbose(
`Added to echo detection set (size now: ${recentlySent.size}): ${replyPayload.text.substring(0, 50)}...`,
);
if (recentlySent.size > MAX_RECENT_MESSAGES) {
const firstKey = recentlySent.values().next().value;
if (firstKey) recentlySent.delete(firstKey);
}
}
}
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${from} (web${replyResult.mediaUrl || replyResult.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
),
);
} else {
console.log(
success(
`↩️ ${replyResult.text ?? "<media>"}${replyResult.mediaUrl || replyResult.mediaUrls?.length ? " (media)" : ""}`,
),
if (isVerbose()) {
console.log(
success(
`↩️ Auto-replied to ${from} (web${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? ", media" : ""}; batched ${messages.length})`,
),
);
} else {
console.log(
success(
`↩️ ${replyPayload.text ?? "<media>"}${replyPayload.mediaUrl || replyPayload.mediaUrls?.length ? " (media)" : ""}`,
),
);
}
} catch (err) {
console.error(
danger(`Failed sending web auto-reply to ${from}: ${String(err)}`),
);
}
} catch (err) {
console.error(
danger(`Failed sending web auto-reply to ${from}: ${String(err)}`),
);
}
};
@ -967,59 +955,31 @@ export async function monitorWebProvider(
"reply heartbeat start",
);
}
// Run pre-hook to gather context
const preHookResult = await runHeartbeatPreHook(cfg);
if (preHookResult.error) {
heartbeatLogger.warn(
{
connectionId,
error: preHookResult.error,
durationMs: preHookResult.durationMs,
timedOut: preHookResult.timedOut,
},
"heartbeat pre-hook failed (continuing with basic heartbeat)",
);
} else if (preHookResult.context) {
heartbeatLogger.info(
{
connectionId,
contextLength: preHookResult.context.length,
durationMs: preHookResult.durationMs,
},
"heartbeat pre-hook succeeded",
);
}
const heartbeatPrompt = buildHeartbeatPrompt(
HEARTBEAT_PROMPT,
preHookResult.context,
const replyResult = await (replyResolver ?? getReplyFromConfig)(
{
Body: HEARTBEAT_PROMPT,
From: lastInboundMsg.from,
To: lastInboundMsg.to,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: lastInboundMsg.sendComposing,
isHeartbeat: true,
},
);
const hbFrom = lastInboundMsg.from;
const hbTo = lastInboundMsg.to;
const hbComposing = lastInboundMsg.sendComposing;
const replyResult = await enqueueCommand(() =>
(replyResolver ?? getReplyFromConfig)(
{
Body: heartbeatPrompt,
From: hbFrom,
To: hbTo,
MessageSid: snapshot.entry?.sessionId,
MediaPath: undefined,
MediaUrl: undefined,
MediaType: undefined,
},
{
onReplyStart: hbComposing,
},
),
);
const replyPayload = Array.isArray(replyResult)
? replyResult[0]
: replyResult;
if (
!replyResult ||
(!replyResult.text &&
!replyResult.mediaUrl &&
!replyResult.mediaUrls?.length)
!replyPayload ||
(!replyPayload.text &&
!replyPayload.mediaUrl &&
!replyPayload.mediaUrls?.length)
) {
heartbeatLogger.info(
{
@ -1033,9 +993,9 @@ export async function monitorWebProvider(
return;
}
const stripped = stripHeartbeatToken(replyResult.text);
const stripped = stripHeartbeatToken(replyPayload.text);
const hasMedia = Boolean(
replyResult.mediaUrl || (replyResult.mediaUrls?.length ?? 0) > 0,
replyPayload.mediaUrl || (replyPayload.mediaUrls?.length ?? 0) > 0,
);
if (stripped.shouldSkip && !hasMedia) {
heartbeatLogger.info(
@ -1043,7 +1003,7 @@ export async function monitorWebProvider(
connectionId,
durationMs: Date.now() - tickStart,
reason: "heartbeat-token",
rawLength: replyResult.text?.length ?? 0,
rawLength: replyPayload.text?.length ?? 0,
},
"reply heartbeat skipped",
);
@ -1063,7 +1023,7 @@ export async function monitorWebProvider(
}
const cleanedReply: ReplyPayload = {
...replyResult,
...replyPayload,
text: finalText,
};

View File

@ -100,7 +100,12 @@ describe("web inbound media saves with extension", () => {
};
realSock.ev.emit("messages.upsert", upsert);
await new Promise((resolve) => setTimeout(resolve, 5));
// Allow a brief window for the async handler to fire on slower runners.
for (let i = 0; i < 10; i++) {
if (onMessage.mock.calls.length > 0) break;
await new Promise((resolve) => setTimeout(resolve, 5));
}
expect(onMessage).toHaveBeenCalledTimes(1);
const msg = onMessage.mock.calls[0][0];

View File

@ -1,4 +1,5 @@
import fs from "node:fs/promises";
import path from "node:path";
import sharp from "sharp";
import { isVerbose, logVerbose } from "../globals.js";
@ -12,7 +13,12 @@ import { detectMime } from "../media/mime.js";
export async function loadWebMedia(
mediaUrl: string,
maxBytes?: number,
): Promise<{ buffer: Buffer; contentType?: string; kind: MediaKind }> {
): Promise<{
buffer: Buffer;
contentType?: string;
kind: MediaKind;
fileName?: string;
}> {
if (mediaUrl.startsWith("file://")) {
mediaUrl = mediaUrl.replace("file://", "");
}
@ -40,6 +46,14 @@ export async function loadWebMedia(
};
if (/^https?:\/\//i.test(mediaUrl)) {
let fileName: string | undefined;
try {
const url = new URL(mediaUrl);
const base = path.basename(url.pathname);
fileName = base || undefined;
} catch {
// ignore parse errors; leave undefined
}
const res = await fetch(mediaUrl);
if (!res.ok || !res.body) {
throw new Error(`Failed to fetch media: HTTP ${res.status}`);
@ -56,7 +70,7 @@ export async function loadWebMedia(
maxBytesForKind(kind),
);
if (kind === "image") {
return optimizeAndClampImage(array, cap);
return { ...(await optimizeAndClampImage(array, cap)), fileName };
}
if (array.length > cap) {
throw new Error(
@ -65,19 +79,25 @@ export async function loadWebMedia(
).toFixed(2)}MB)`,
);
}
return { buffer: array, contentType: contentType ?? undefined, kind };
return {
buffer: array,
contentType: contentType ?? undefined,
kind,
fileName,
};
}
// Local path
const data = await fs.readFile(mediaUrl);
const mime = detectMime({ buffer: data, filePath: mediaUrl });
const kind = mediaKindFromMime(mime);
const fileName = path.basename(mediaUrl) || undefined;
const cap = Math.min(
maxBytes ?? maxBytesForKind(kind),
maxBytesForKind(kind),
);
if (kind === "image") {
return optimizeAndClampImage(data, cap);
return { ...(await optimizeAndClampImage(data, cap)), fileName };
}
if (data.length > cap) {
throw new Error(
@ -86,7 +106,7 @@ export async function loadWebMedia(
).toFixed(2)}MB)`,
);
}
return { buffer: data, contentType: mime, kind };
return { buffer: data, contentType: mime, kind, fileName };
}
export async function optimizeImageToJpeg(

View File

@ -1,3 +1,4 @@
import type { AnyMessageContent } from "@whiskeysockets/baileys";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
@ -17,6 +18,11 @@ vi.mock("./session.js", () => {
};
});
const loadWebMediaMock = vi.fn();
vi.mock("./media.js", () => ({
loadWebMedia: (...args: unknown[]) => loadWebMediaMock(...args),
}));
import { sendMessageWeb } from "./outbound.js";
const { createWaSocket } = await import("./session.js");
@ -37,4 +43,98 @@ describe("web outbound", () => {
expect(sock.sendMessage).toHaveBeenCalled();
expect(sock.ws.close).toHaveBeenCalled();
});
it("maps audio to PTT with opus mime when ogg", async () => {
const buf = Buffer.from("audio");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "audio/ogg",
kind: "audio",
});
await sendMessageWeb("+1555", "voice note", {
verbose: false,
mediaUrl: "/tmp/voice.ogg",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
audio: buf,
ptt: true,
mimetype: "audio/ogg; codecs=opus",
});
});
it("maps video with caption", async () => {
const buf = Buffer.from("video");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "video/mp4",
kind: "video",
});
await sendMessageWeb("+1555", "clip", {
verbose: false,
mediaUrl: "/tmp/video.mp4",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
video: buf,
caption: "clip",
mimetype: "video/mp4",
});
});
it("maps image with caption", async () => {
const buf = Buffer.from("img");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "image/jpeg",
kind: "image",
});
await sendMessageWeb("+1555", "pic", {
verbose: false,
mediaUrl: "/tmp/pic.jpg",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
image: buf,
caption: "pic",
mimetype: "image/jpeg",
});
});
it("maps other kinds to document with filename", async () => {
const buf = Buffer.from("pdf");
loadWebMediaMock.mockResolvedValueOnce({
buffer: buf,
contentType: "application/pdf",
kind: "document",
fileName: "file.pdf",
});
await sendMessageWeb("+1555", "doc", {
verbose: false,
mediaUrl: "/tmp/file.pdf",
});
const sock = await createWaSocket();
const [, payload] = sock.sendMessage.mock.calls.at(-1) as [
string,
AnyMessageContent,
];
expect(payload).toMatchObject({
document: buf,
fileName: "file.pdf",
caption: "doc",
mimetype: "application/pdf",
});
});
});

View File

@ -35,11 +35,39 @@ export async function sendMessageWeb(
let payload: AnyMessageContent = { text: body };
if (options.mediaUrl) {
const media = await loadWebMedia(options.mediaUrl);
payload = {
image: media.buffer,
caption: body || undefined,
mimetype: media.contentType,
};
const caption = body || undefined;
if (media.kind === "audio") {
// WhatsApp expects explicit opus codec for PTT voice notes.
const mimetype =
media.contentType === "audio/ogg"
? "audio/ogg; codecs=opus"
: (media.contentType ?? "application/octet-stream");
payload = { audio: media.buffer, ptt: true, mimetype };
} else if (media.kind === "video") {
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
video: media.buffer,
caption,
mimetype,
};
} else if (media.kind === "image") {
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
image: media.buffer,
caption,
mimetype,
};
} else {
// Fallback to document for anything else (pdf, etc.).
const fileName = media.fileName ?? "file";
const mimetype = media.contentType ?? "application/octet-stream";
payload = {
document: media.buffer,
fileName,
caption,
mimetype,
};
}
}
logInfo(
`📤 Sending via web session -> ${jid}${options.mediaUrl ? " (media)" : ""}`,