fix: don't append heartbeat prompt to cron system events
When cron jobs fire system events (like MORNING_BRIEF, EVENING_DIGEST, etc.), the heartbeat prompt was being appended to ALL events. This caused agents to treat custom cron events as heartbeats and reply HEARTBEAT_OK. Now cron events with pending system events: - Use empty Body (system event IS the prompt) - Set Provider to 'cron-event' to distinguish from heartbeats - Don't skip responses that contain HEARTBEAT_OK text Fixes #3579
This commit is contained in:
parent
cb4b3f74b5
commit
18c8a3abb1
@ -0,0 +1,326 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import * as replyModule from "../auto-reply/reply.js";
|
||||
import type { MoltbotConfig } from "../config/config.js";
|
||||
import { resolveMainSessionKey } from "../config/sessions.js";
|
||||
import { runHeartbeatOnce } from "./heartbeat-runner.js";
|
||||
import { enqueueSystemEvent, resetSystemEventsForTest } from "./system-events.js";
|
||||
import { setActivePluginRegistry } from "../plugins/runtime.js";
|
||||
import { createPluginRuntime } from "../plugins/runtime/index.js";
|
||||
import { createTestRegistry } from "../test-utils/channel-plugins.js";
|
||||
import { telegramPlugin } from "../../extensions/telegram/src/channel.js";
|
||||
import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js";
|
||||
|
||||
// Avoid pulling optional runtime deps during isolated runs.
|
||||
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
|
||||
|
||||
beforeEach(() => {
|
||||
const runtime = createPluginRuntime();
|
||||
setTelegramRuntime(runtime);
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([{ pluginId: "telegram", plugin: telegramPlugin, source: "test" }]),
|
||||
);
|
||||
resetSystemEventsForTest();
|
||||
});
|
||||
|
||||
describe("cron system events", () => {
|
||||
it("does not append heartbeat prompt for cron:* events with pending system events", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "test-token";
|
||||
try {
|
||||
const cfg: MoltbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: { telegram: { botToken: "test-bot-token-123" } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const sessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
// Enqueue a custom system event (simulating cron job behavior)
|
||||
enqueueSystemEvent("[DAILY_TASK] Time for focused work!", { sessionKey });
|
||||
|
||||
replySpy.mockResolvedValue({ text: "I'll get started on your tasks!" });
|
||||
const sendTelegram = vi.fn().mockResolvedValue({
|
||||
messageId: "m1",
|
||||
chatId: "123456",
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
reason: "cron:daily-task",
|
||||
deps: {
|
||||
sendTelegram,
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// The reply should have been called with the system event text as Body (no heartbeat prompt)
|
||||
expect(replySpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
Body: "[DAILY_TASK] Time for focused work!", // System event IS the prompt
|
||||
Provider: "cron-event",
|
||||
}),
|
||||
expect.anything(),
|
||||
expect.anything(),
|
||||
);
|
||||
|
||||
// Response should be delivered (not skipped due to HEARTBEAT_OK handling)
|
||||
expect(sendTelegram).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("uses heartbeat prompt for cron events without pending system events", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "test-token";
|
||||
try {
|
||||
const cfg: MoltbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: { telegram: { botToken: "test-bot-token-123" } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const sessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
// No system events enqueued - this is a bare cron wake
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
const sendTelegram = vi.fn().mockResolvedValue({
|
||||
messageId: "m1",
|
||||
chatId: "123456",
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
reason: "cron:empty-job",
|
||||
deps: {
|
||||
sendTelegram,
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// The reply should have been called WITH the heartbeat prompt
|
||||
expect(replySpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
Body: expect.stringContaining("HEARTBEAT_OK"),
|
||||
Provider: "heartbeat",
|
||||
}),
|
||||
expect.anything(),
|
||||
expect.anything(),
|
||||
);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("delivers cron response even if it contains HEARTBEAT_OK text", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "test-token";
|
||||
try {
|
||||
const cfg: MoltbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: { telegram: { botToken: "test-bot-token-123" } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const sessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
// Enqueue a system event
|
||||
enqueueSystemEvent("[BLOG_POSTS] Check for new posts", { sessionKey });
|
||||
|
||||
// Agent responds with HEARTBEAT_OK (incorrectly, but we shouldn't skip it)
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK - No new posts to process." });
|
||||
const sendTelegram = vi.fn().mockResolvedValue({
|
||||
messageId: "m1",
|
||||
chatId: "123456",
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
reason: "cron:blog-posts",
|
||||
deps: {
|
||||
sendTelegram,
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// Response should be delivered even though it contains HEARTBEAT_OK
|
||||
// because this is a cron event, not a heartbeat
|
||||
expect(sendTelegram).toHaveBeenCalledTimes(1);
|
||||
expect(sendTelegram).toHaveBeenCalledWith(
|
||||
"123456",
|
||||
expect.stringContaining("No new posts"),
|
||||
expect.any(Object),
|
||||
);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("uses heartbeat prompt for regular interval heartbeats", async () => {
|
||||
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
|
||||
const storePath = path.join(tmpDir, "sessions.json");
|
||||
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
|
||||
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
|
||||
process.env.TELEGRAM_BOT_TOKEN = "test-token";
|
||||
try {
|
||||
const cfg: MoltbotConfig = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: tmpDir,
|
||||
heartbeat: { every: "5m", target: "telegram" },
|
||||
},
|
||||
},
|
||||
channels: { telegram: { botToken: "test-bot-token-123" } },
|
||||
session: { store: storePath },
|
||||
};
|
||||
const sessionKey = resolveMainSessionKey(cfg);
|
||||
|
||||
await fs.writeFile(
|
||||
storePath,
|
||||
JSON.stringify(
|
||||
{
|
||||
[sessionKey]: {
|
||||
sessionId: "sid",
|
||||
updatedAt: Date.now(),
|
||||
lastChannel: "telegram",
|
||||
lastProvider: "telegram",
|
||||
lastTo: "123456",
|
||||
},
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
|
||||
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
|
||||
const sendTelegram = vi.fn().mockResolvedValue({
|
||||
messageId: "m1",
|
||||
chatId: "123456",
|
||||
});
|
||||
|
||||
await runHeartbeatOnce({
|
||||
cfg,
|
||||
reason: "interval", // Regular heartbeat
|
||||
deps: {
|
||||
sendTelegram,
|
||||
getQueueSize: () => 0,
|
||||
nowMs: () => 0,
|
||||
},
|
||||
});
|
||||
|
||||
// Regular heartbeat should use heartbeat prompt
|
||||
expect(replySpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
Body: expect.stringContaining("HEARTBEAT_OK"),
|
||||
Provider: "heartbeat",
|
||||
}),
|
||||
expect.anything(),
|
||||
expect.anything(),
|
||||
);
|
||||
} finally {
|
||||
replySpy.mockRestore();
|
||||
if (prevTelegramToken === undefined) {
|
||||
delete process.env.TELEGRAM_BOT_TOKEN;
|
||||
} else {
|
||||
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
|
||||
}
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
@ -496,15 +496,27 @@ export async function runHeartbeatOnce(opts: {
|
||||
// If so, use a specialized prompt that instructs the model to relay the result
|
||||
// instead of the standard heartbeat prompt with "reply HEARTBEAT_OK".
|
||||
const isExecEvent = opts.reason === "exec-event";
|
||||
const pendingEvents = isExecEvent ? peekSystemEvents(sessionKey) : [];
|
||||
// Check if this is a cron job system event (not a periodic heartbeat).
|
||||
// Cron jobs enqueue system events before calling runHeartbeatOnce.
|
||||
const isCronEvent = typeof opts.reason === "string" && opts.reason.startsWith("cron:");
|
||||
// Always peek system events for cron and exec events to determine prompt type.
|
||||
const pendingEvents = isExecEvent || isCronEvent ? peekSystemEvents(sessionKey) : [];
|
||||
const hasExecCompletion = pendingEvents.some((evt) => evt.includes("Exec finished"));
|
||||
// Cron system events should NOT get the heartbeat prompt - the system event IS the prompt.
|
||||
// Only periodic heartbeats (reason: "interval" or no cron prefix) get the heartbeat prompt.
|
||||
const hasCronSystemEvents = isCronEvent && pendingEvents.length > 0;
|
||||
|
||||
const prompt = hasExecCompletion ? EXEC_EVENT_PROMPT : resolveHeartbeatPrompt(cfg, heartbeat);
|
||||
// For cron events with pending system events, use the system event text directly
|
||||
// (without appending the heartbeat prompt). The system event IS the prompt.
|
||||
const cronEventBody = hasCronSystemEvents ? pendingEvents.join("\n\n") : null;
|
||||
const prompt = hasExecCompletion
|
||||
? EXEC_EVENT_PROMPT
|
||||
: (cronEventBody ?? resolveHeartbeatPrompt(cfg, heartbeat));
|
||||
const ctx = {
|
||||
Body: prompt,
|
||||
From: sender,
|
||||
To: sender,
|
||||
Provider: hasExecCompletion ? "exec-event" : "heartbeat",
|
||||
Provider: hasExecCompletion ? "exec-event" : hasCronSystemEvents ? "cron-event" : "heartbeat",
|
||||
SessionKey: sessionKey,
|
||||
};
|
||||
if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) {
|
||||
@ -586,7 +598,19 @@ export async function runHeartbeatOnce(opts: {
|
||||
normalized.text = execFallbackText;
|
||||
normalized.shouldSkip = false;
|
||||
}
|
||||
const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion;
|
||||
// For cron system events, don't skip even if response contains HEARTBEAT_OK.
|
||||
// The agent wasn't asked to reply with HEARTBEAT_OK - if they do, it's just incidental text.
|
||||
// Preserve the original reply text for cron events when token stripping would empty it.
|
||||
const cronFallbackText =
|
||||
hasCronSystemEvents && !normalized.text.trim() && replyPayload.text?.trim()
|
||||
? replyPayload.text.trim()
|
||||
: null;
|
||||
if (cronFallbackText) {
|
||||
normalized.text = cronFallbackText;
|
||||
normalized.shouldSkip = false;
|
||||
}
|
||||
const shouldSkipMain =
|
||||
normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion && !hasCronSystemEvents;
|
||||
if (shouldSkipMain && reasoningPayloads.length === 0) {
|
||||
await restoreHeartbeatUpdatedAt({
|
||||
storePath,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user