This commit is contained in:
Echo 2026-01-30 00:52:01 -04:00 committed by GitHub
commit 622d854dbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 358 additions and 4 deletions

View File

@ -0,0 +1,326 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as replyModule from "../auto-reply/reply.js";
import type { MoltbotConfig } from "../config/config.js";
import { resolveMainSessionKey } from "../config/sessions.js";
import { runHeartbeatOnce } from "./heartbeat-runner.js";
import { enqueueSystemEvent, resetSystemEventsForTest } from "./system-events.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { createPluginRuntime } from "../plugins/runtime/index.js";
import { createTestRegistry } from "../test-utils/channel-plugins.js";
import { telegramPlugin } from "../../extensions/telegram/src/channel.js";
import { setTelegramRuntime } from "../../extensions/telegram/src/runtime.js";
// Avoid pulling optional runtime deps during isolated runs.
vi.mock("jiti", () => ({ createJiti: () => () => ({}) }));
beforeEach(() => {
const runtime = createPluginRuntime();
setTelegramRuntime(runtime);
setActivePluginRegistry(
createTestRegistry([{ pluginId: "telegram", plugin: telegramPlugin, source: "test" }]),
);
resetSystemEventsForTest();
});
describe("cron system events", () => {
it("does not append heartbeat prompt for cron:* events with pending system events", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "test-token";
try {
const cfg: MoltbotConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: { every: "5m", target: "telegram" },
},
},
channels: { telegram: { botToken: "test-bot-token-123" } },
session: { store: storePath },
};
const sessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "sid",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123456",
},
},
null,
2,
),
);
// Enqueue a custom system event (simulating cron job behavior)
enqueueSystemEvent("[DAILY_TASK] Time for focused work!", { sessionKey });
replySpy.mockResolvedValue({ text: "I'll get started on your tasks!" });
const sendTelegram = vi.fn().mockResolvedValue({
messageId: "m1",
chatId: "123456",
});
await runHeartbeatOnce({
cfg,
reason: "cron:daily-task",
deps: {
sendTelegram,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
// The reply should have been called with the system event text as Body (no heartbeat prompt)
expect(replySpy).toHaveBeenCalledWith(
expect.objectContaining({
Body: "[DAILY_TASK] Time for focused work!", // System event IS the prompt
Provider: "cron-event",
}),
expect.anything(),
expect.anything(),
);
// Response should be delivered (not skipped due to HEARTBEAT_OK handling)
expect(sendTelegram).toHaveBeenCalledTimes(1);
} finally {
replySpy.mockRestore();
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
it("uses heartbeat prompt for cron events without pending system events", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "test-token";
try {
const cfg: MoltbotConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: { every: "5m", target: "telegram" },
},
},
channels: { telegram: { botToken: "test-bot-token-123" } },
session: { store: storePath },
};
const sessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "sid",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123456",
},
},
null,
2,
),
);
// No system events enqueued - this is a bare cron wake
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const sendTelegram = vi.fn().mockResolvedValue({
messageId: "m1",
chatId: "123456",
});
await runHeartbeatOnce({
cfg,
reason: "cron:empty-job",
deps: {
sendTelegram,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
// The reply should have been called WITH the heartbeat prompt
expect(replySpy).toHaveBeenCalledWith(
expect.objectContaining({
Body: expect.stringContaining("HEARTBEAT_OK"),
Provider: "heartbeat",
}),
expect.anything(),
expect.anything(),
);
} finally {
replySpy.mockRestore();
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
it("delivers cron response even if it contains HEARTBEAT_OK text", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "test-token";
try {
const cfg: MoltbotConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: { every: "5m", target: "telegram" },
},
},
channels: { telegram: { botToken: "test-bot-token-123" } },
session: { store: storePath },
};
const sessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "sid",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123456",
},
},
null,
2,
),
);
// Enqueue a system event
enqueueSystemEvent("[BLOG_POSTS] Check for new posts", { sessionKey });
// Agent responds with HEARTBEAT_OK (incorrectly, but we shouldn't skip it)
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK - No new posts to process." });
const sendTelegram = vi.fn().mockResolvedValue({
messageId: "m1",
chatId: "123456",
});
await runHeartbeatOnce({
cfg,
reason: "cron:blog-posts",
deps: {
sendTelegram,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
// Response should be delivered even though it contains HEARTBEAT_OK
// because this is a cron event, not a heartbeat
expect(sendTelegram).toHaveBeenCalledTimes(1);
expect(sendTelegram).toHaveBeenCalledWith(
"123456",
expect.stringContaining("No new posts"),
expect.any(Object),
);
} finally {
replySpy.mockRestore();
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
it("uses heartbeat prompt for regular interval heartbeats", async () => {
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-hb-"));
const storePath = path.join(tmpDir, "sessions.json");
const replySpy = vi.spyOn(replyModule, "getReplyFromConfig");
const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN;
process.env.TELEGRAM_BOT_TOKEN = "test-token";
try {
const cfg: MoltbotConfig = {
agents: {
defaults: {
workspace: tmpDir,
heartbeat: { every: "5m", target: "telegram" },
},
},
channels: { telegram: { botToken: "test-bot-token-123" } },
session: { store: storePath },
};
const sessionKey = resolveMainSessionKey(cfg);
await fs.writeFile(
storePath,
JSON.stringify(
{
[sessionKey]: {
sessionId: "sid",
updatedAt: Date.now(),
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: "123456",
},
},
null,
2,
),
);
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
const sendTelegram = vi.fn().mockResolvedValue({
messageId: "m1",
chatId: "123456",
});
await runHeartbeatOnce({
cfg,
reason: "interval", // Regular heartbeat
deps: {
sendTelegram,
getQueueSize: () => 0,
nowMs: () => 0,
},
});
// Regular heartbeat should use heartbeat prompt
expect(replySpy).toHaveBeenCalledWith(
expect.objectContaining({
Body: expect.stringContaining("HEARTBEAT_OK"),
Provider: "heartbeat",
}),
expect.anything(),
expect.anything(),
);
} finally {
replySpy.mockRestore();
if (prevTelegramToken === undefined) {
delete process.env.TELEGRAM_BOT_TOKEN;
} else {
process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken;
}
await fs.rm(tmpDir, { recursive: true, force: true });
}
});
});

View File

@ -499,15 +499,31 @@ export async function runHeartbeatOnce(opts: {
// If so, use a specialized prompt that instructs the model to relay the result
// instead of the standard heartbeat prompt with "reply HEARTBEAT_OK".
const isExecEvent = opts.reason === "exec-event";
const pendingEvents = isExecEvent ? peekSystemEvents(sessionKey) : [];
// Check if this is a cron job system event (not a periodic heartbeat).
// Cron jobs enqueue system events before calling runHeartbeatOnce.
// SECURITY: opts.reason is set internally by the heartbeat scheduler and cron runner.
// It is NOT user-controlled. Cron jobs are defined in admin-only gateway config.
const isCronEvent = typeof opts.reason === "string" && opts.reason.startsWith("cron:");
// Always peek system events for cron and exec events to determine prompt type.
const pendingEvents = isExecEvent || isCronEvent ? peekSystemEvents(sessionKey) : [];
const hasExecCompletion = pendingEvents.some((evt) => evt.includes("Exec finished"));
// Cron system events should NOT get the heartbeat prompt - the system event IS the prompt.
// Only periodic heartbeats (reason: "interval" or no cron prefix) get the heartbeat prompt.
const hasCronSystemEvents = isCronEvent && pendingEvents.length > 0;
const prompt = hasExecCompletion ? EXEC_EVENT_PROMPT : resolveHeartbeatPrompt(cfg, heartbeat);
// For cron events with pending system events, use the system event text directly
// (without appending the heartbeat prompt). The system event IS the prompt.
// SECURITY: System events originate from admin-controlled cron job configs (payload.text).
// User input cannot reach enqueueSystemEvent() - only the cron scheduler calls it.
const cronEventBody = hasCronSystemEvents ? pendingEvents.join("\n\n") : null;
const prompt = hasExecCompletion
? EXEC_EVENT_PROMPT
: (cronEventBody ?? resolveHeartbeatPrompt(cfg, heartbeat));
const ctx = {
Body: prompt,
From: sender,
To: sender,
Provider: hasExecCompletion ? "exec-event" : "heartbeat",
Provider: hasExecCompletion ? "exec-event" : hasCronSystemEvents ? "cron-event" : "heartbeat",
SessionKey: sessionKey,
};
if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) {
@ -589,7 +605,19 @@ export async function runHeartbeatOnce(opts: {
normalized.text = execFallbackText;
normalized.shouldSkip = false;
}
const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion;
// For cron system events, don't skip even if response contains HEARTBEAT_OK.
// The agent wasn't asked to reply with HEARTBEAT_OK - if they do, it's just incidental text.
// Preserve the original reply text for cron events when token stripping would empty it.
const cronFallbackText =
hasCronSystemEvents && !normalized.text.trim() && replyPayload.text?.trim()
? replyPayload.text.trim()
: null;
if (cronFallbackText) {
normalized.text = cronFallbackText;
normalized.shouldSkip = false;
}
const shouldSkipMain =
normalized.shouldSkip && !normalized.hasMedia && !hasExecCompletion && !hasCronSystemEvents;
if (shouldSkipMain && reasoningPayloads.length === 0) {
await restoreHeartbeatUpdatedAt({
storePath,