diff --git a/src/infra/heartbeat-runner.cron-events-no-heartbeat-prompt.test.ts b/src/infra/heartbeat-runner.cron-events-no-heartbeat-prompt.test.ts new file mode 100644 index 000000000..cafb784a1 --- /dev/null +++ b/src/infra/heartbeat-runner.cron-events-no-heartbeat-prompt.test.ts @@ -0,0 +1,326 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import * as replyModule from "../auto-reply/reply.js"; +import type { MoltbotConfig } from "../config/config.js"; +import { resolveMainSessionKey } from "../config/sessions.js"; +import { runHeartbeatOnce } from "./heartbeat-runner.js"; +import { enqueueSystemEvent, resetSystemEventsForTest } from "./system-events.js"; +import { setActivePluginRegistry } from "../plugins/runtime.js"; +import { createPluginRuntime } from "../plugins/runtime/index.js"; +import { createTestRegistry } from "../test-utils/channel-plugins.js"; +import { telegramPlugin } from "../../extensions/telegram/src/channel.js"; +import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js"; + +// Avoid pulling optional runtime deps during isolated runs. +vi.mock("jiti", () => ({ createJiti: () => () => ({}) })); + +beforeEach(() => { + const runtime = createPluginRuntime(); + setTelegramRuntime(runtime); + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", plugin: telegramPlugin, source: "test" }]), + ); + resetSystemEventsForTest(); +}); + +describe("cron system events", () => { + it("does not append heartbeat prompt for cron:* events with pending system events", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = "test-token"; + try { + const cfg: MoltbotConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { every: "5m", target: "telegram" }, + }, + }, + channels: { telegram: { botToken: "test-bot-token-123" } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); + + // Enqueue a custom system event (simulating cron job behavior) + enqueueSystemEvent("[DAILY_TASK] Time for focused work!", { sessionKey }); + + replySpy.mockResolvedValue({ text: "I'll get started on your tasks!" }); + const sendTelegram = vi.fn().mockResolvedValue({ + messageId: "m1", + chatId: "123456", + }); + + await runHeartbeatOnce({ + cfg, + reason: "cron:daily-task", + deps: { + sendTelegram, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // The reply should have been called with the system event text as Body (no heartbeat prompt) + expect(replySpy).toHaveBeenCalledWith( + expect.objectContaining({ + Body: "[DAILY_TASK] Time for focused work!", // System event IS the prompt + Provider: "cron-event", + }), + expect.anything(), + expect.anything(), + ); + + // Response should be delivered (not skipped due to HEARTBEAT_OK handling) + expect(sendTelegram).toHaveBeenCalledTimes(1); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + + it("uses heartbeat prompt for cron events without pending system events", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = "test-token"; + try { + const cfg: MoltbotConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { every: "5m", target: "telegram" }, + }, + }, + channels: { telegram: { botToken: "test-bot-token-123" } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); + + // No system events enqueued - this is a bare cron wake + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + const sendTelegram = vi.fn().mockResolvedValue({ + messageId: "m1", + chatId: "123456", + }); + + await runHeartbeatOnce({ + cfg, + reason: "cron:empty-job", + deps: { + sendTelegram, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // The reply should have been called WITH the heartbeat prompt + expect(replySpy).toHaveBeenCalledWith( + expect.objectContaining({ + Body: expect.stringContaining("HEARTBEAT_OK"), + Provider: "heartbeat", + }), + expect.anything(), + expect.anything(), + ); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + + it("delivers cron response even if it contains HEARTBEAT_OK text", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = "test-token"; + try { + const cfg: MoltbotConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { every: "5m", target: "telegram" }, + }, + }, + channels: { telegram: { botToken: "test-bot-token-123" } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); + + // Enqueue a system event + enqueueSystemEvent("[BLOG_POSTS] Check for new posts", { sessionKey }); + + // Agent responds with HEARTBEAT_OK (incorrectly, but we shouldn't skip it) + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK - No new posts to process." }); + const sendTelegram = vi.fn().mockResolvedValue({ + messageId: "m1", + chatId: "123456", + }); + + await runHeartbeatOnce({ + cfg, + reason: "cron:blog-posts", + deps: { + sendTelegram, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // Response should be delivered even though it contains HEARTBEAT_OK + // because this is a cron event, not a heartbeat + expect(sendTelegram).toHaveBeenCalledTimes(1); + expect(sendTelegram).toHaveBeenCalledWith( + "123456", + expect.stringContaining("No new posts"), + expect.any(Object), + ); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + + it("uses heartbeat prompt for regular interval heartbeats", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-")); + const storePath = path.join(tmpDir, "sessions.json"); + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = "test-token"; + try { + const cfg: MoltbotConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { every: "5m", target: "telegram" }, + }, + }, + channels: { telegram: { botToken: "test-bot-token-123" } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId: "sid", + updatedAt: Date.now(), + lastChannel: "telegram", + lastProvider: "telegram", + lastTo: "123456", + }, + }, + null, + 2, + ), + ); + + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + const sendTelegram = vi.fn().mockResolvedValue({ + messageId: "m1", + chatId: "123456", + }); + + await runHeartbeatOnce({ + cfg, + reason: "interval", // Regular heartbeat + deps: { + sendTelegram, + getQueueSize: () => 0, + nowMs: () => 0, + }, + }); + + // Regular heartbeat should use heartbeat prompt + expect(replySpy).toHaveBeenCalledWith( + expect.objectContaining({ + Body: expect.stringContaining("HEARTBEAT_OK"), + Provider: "heartbeat", + }), + expect.anything(), + expect.anything(), + ); + } finally { + replySpy.mockRestore(); + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); +}); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 8e0c9a4ee..5e5f82715 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -496,15 +496,27 @@ export async function runHeartbeatOnce(opts: { // If so, use a specialized prompt that instructs the model to relay the result // instead of the standard heartbeat prompt with "reply HEARTBEAT_OK". const isExecEvent = opts.reason === "exec-event"; - const pendingEvents = isExecEvent ? peekSystemEvents(sessionKey) : []; + // Check if this is a cron job system event (not a periodic heartbeat). + // Cron jobs enqueue system events before calling runHeartbeatOnce. + const isCronEvent = typeof opts.reason === "string" && opts.reason.startsWith("cron:"); + // Always peek system events for cron and exec events to determine prompt type. + const pendingEvents = isExecEvent || isCronEvent ? peekSystemEvents(sessionKey) : []; const hasExecCompletion = pendingEvents.some((evt) => evt.includes("Exec finished")); + // Cron system events should NOT get the heartbeat prompt - the system event IS the prompt. + // Only periodic heartbeats (reason: "interval" or no cron prefix) get the heartbeat prompt. + const hasCronSystemEvents = isCronEvent && pendingEvents.length > 0; - const prompt = hasExecCompletion ? EXEC_EVENT_PROMPT : resolveHeartbeatPrompt(cfg, heartbeat); + // For cron events with pending system events, use the system event text directly + // (without appending the heartbeat prompt). The system event IS the prompt. + const cronEventBody = hasCronSystemEvents ? pendingEvents.join("\n\n") : null; + const prompt = hasExecCompletion + ? EXEC_EVENT_PROMPT + : (cronEventBody ?? resolveHeartbeatPrompt(cfg, heartbeat)); const ctx = { Body: prompt, From: sender, To: sender, - Provider: hasExecCompletion ? "exec-event" : "heartbeat", + Provider: hasExecCompletion ? "exec-event" : hasCronSystemEvents ? "cron-event" : "heartbeat", SessionKey: sessionKey, }; if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) { @@ -586,7 +598,19 @@ export async function runHeartbeatOnce(opts: { normalized.text = execFallbackText; normalized.shouldSkip = false; } - const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion; + // For cron system events, don't skip even if response contains HEARTBEAT_OK. + // The agent wasn't asked to reply with HEARTBEAT_OK - if they do, it's just incidental text. + // Preserve the original reply text for cron events when token stripping would empty it. + const cronFallbackText = + hasCronSystemEvents && !normalized.text.trim() && replyPayload.text?.trim() + ? replyPayload.text.trim() + : null; + if (cronFallbackText) { + normalized.text = cronFallbackText; + normalized.shouldSkip = false; + } + const shouldSkipMain = + normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion && !hasCronSystemEvents; if (shouldSkipMain && reasoningPayloads.length === 0) { await restoreHeartbeatUpdatedAt({ storePath,