diff --git a/src/cron/service.direct-message.test.ts b/src/cron/service.direct-message.test.ts new file mode 100644 index 000000000..429cfe296 --- /dev/null +++ b/src/cron/service.direct-message.test.ts @@ -0,0 +1,259 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService direct message payload", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("runs a direct message job and sends without spinning up an agent", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const sendDirectMessage = vi.fn(async () => ({ ok: true, messageId: "msg123" })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + sendDirectMessage, + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:02.000Z"); + const job = await cron.add({ + name: "birthday reminder", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "direct", + wakeMode: "now", + payload: { + kind: "message", + text: "🎂 Emma's birthday tomorrow!", + channel: "telegram", + to: "6450265544", + }, + }); + + expect(job.state.nextRunAtMs).toBe(atMs); + + vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); + await vi.runOnlyPendingTimersAsync(); + + // Direct message should have been sent + expect(sendDirectMessage).toHaveBeenCalledWith({ + channel: "telegram", + text: "🎂 Emma's birthday tomorrow!", + to: "6450265544", + }); + + // Should NOT have called agent-related functions + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + expect(updated?.enabled).toBe(false); + expect(updated?.state.lastStatus).toBe("ok"); + + cron.stop(); + await store.cleanup(); + }); + + it("reports error when sendDirectMessage is not configured", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + // sendDirectMessage is NOT provided + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:02.000Z"); + const job = await cron.add({ + name: "direct message test", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "direct", + wakeMode: "now", + payload: { + kind: "message", + text: "Hello!", + channel: "telegram", + }, + }); + + vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); + await vi.runOnlyPendingTimersAsync(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + expect(updated?.state.lastStatus).toBe("error"); + expect(updated?.state.lastError).toBe("sendDirectMessage not configured"); + + cron.stop(); + await store.cleanup(); + }); + + it("rejects direct sessionTarget with non-message payload", async () => { + const store = await makeStorePath(); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + sendDirectMessage: vi.fn(async () => ({ ok: true })), + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:02.000Z"); + + // Should throw when trying to create a direct job with systemEvent payload + await expect( + cron.add({ + name: "invalid combo", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "direct", + wakeMode: "now", + payload: { kind: "systemEvent", text: "this should fail" }, + }), + ).rejects.toThrow('direct cron jobs require payload.kind="message"'); + + cron.stop(); + await store.cleanup(); + }); + + it("handles sendDirectMessage failure gracefully", async () => { + const store = await makeStorePath(); + const sendDirectMessage = vi.fn(async () => ({ + ok: false, + error: "Channel not configured", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + sendDirectMessage, + }); + + await cron.start(); + const atMs = Date.parse("2025-12-13T00:00:02.000Z"); + const job = await cron.add({ + name: "failing message", + enabled: true, + schedule: { kind: "at", atMs }, + sessionTarget: "direct", + wakeMode: "now", + payload: { + kind: "message", + text: "Test message", + channel: "discord", + }, + }); + + vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); + await vi.runOnlyPendingTimersAsync(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((j) => j.id === job.id); + expect(updated?.state.lastStatus).toBe("error"); + expect(updated?.state.lastError).toBe("Channel not configured"); + + cron.stop(); + await store.cleanup(); + }); + + it("runs recurring direct message jobs", async () => { + const store = await makeStorePath(); + const sendDirectMessage = vi.fn(async () => ({ ok: true })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + sendDirectMessage, + }); + + await cron.start(); + // Create a job that will run at specific times + const anchorMs = Date.parse("2025-12-13T00:01:00.000Z"); + const job = await cron.add({ + name: "recurring reminder", + enabled: true, + schedule: { kind: "every", everyMs: 60000, anchorMs }, + sessionTarget: "direct", + wakeMode: "now", + payload: { + kind: "message", + text: "Hourly check-in", + channel: "telegram", + to: "123456", + }, + }); + + // First run at anchor time + vi.setSystemTime(new Date("2025-12-13T00:01:00.000Z")); + await vi.runOnlyPendingTimersAsync(); + expect(sendDirectMessage).toHaveBeenCalledTimes(1); + + // After first run, job should be rescheduled and still enabled + const jobsAfterFirst = await cron.list(); + const afterFirst = jobsAfterFirst.find((j) => j.id === job.id); + expect(afterFirst?.enabled).toBe(true); + expect(afterFirst?.state.lastStatus).toBe("ok"); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 132156a0c..9e37d610d 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -25,6 +25,9 @@ export function assertSupportedJobSpec(job: Pick = { ...existing }; + if (typeof patch.text === "string") next.text = patch.text; + if (typeof patch.channel === "string") next.channel = patch.channel; + if (typeof patch.to === "string") next.to = patch.to; + return next; + } + if (existing.kind !== "agentTurn") { return buildPayloadFromPatch(patch); } @@ -157,6 +171,21 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload { return { kind: "systemEvent", text: patch.text }; } + if (patch.kind === "message") { + if (typeof patch.text !== "string" || patch.text.length === 0) { + throw new Error('cron.update payload.kind="message" requires text'); + } + if (typeof patch.channel !== "string" || patch.channel.length === 0) { + throw new Error('cron.update payload.kind="message" requires channel'); + } + return { + kind: "message", + text: patch.text, + channel: patch.channel, + to: patch.to, + }; + } + if (typeof patch.message !== "string" || patch.message.length === 0) { throw new Error('cron.update payload.kind="agentTurn" requires message'); } diff --git a/src/cron/service/normalize.ts b/src/cron/service/normalize.ts index 161b118fa..452ed06f1 100644 --- a/src/cron/service/normalize.ts +++ b/src/cron/service/normalize.ts @@ -36,7 +36,9 @@ export function inferLegacyName(job: { ? job.payload.text : job?.payload?.kind === "agentTurn" && typeof job.payload.message === "string" ? job.payload.message - : ""; + : job?.payload?.kind === "message" && typeof job.payload.text === "string" + ? job.payload.text + : ""; const firstLine = text .split("\n") @@ -55,5 +57,6 @@ export function inferLegacyName(job: { export function normalizePayloadToSystemText(payload: CronPayload) { if (payload.kind === "systemEvent") return payload.text.trim(); + if (payload.kind === "message") return payload.text.trim(); return payload.message.trim(); } diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index ab094c20b..e8b66fb2b 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -1,4 +1,5 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; +import type { ChannelId } from "../../channels/plugins/types.js"; import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js"; export type CronEvent = { @@ -19,6 +20,18 @@ export type Logger = { error: (obj: unknown, msg?: string) => void; }; +export type SendDirectMessageParams = { + channel: ChannelId; + text: string; + to?: string; +}; + +export type SendDirectMessageResult = { + ok: boolean; + error?: string; + messageId?: string; +}; + export type CronServiceDeps = { nowMs?: () => number; log: Logger; @@ -34,6 +47,11 @@ export type CronServiceDeps = { outputText?: string; error?: string; }>; + /** + * Send a message directly to a channel without involving an agent. + * Used by the "message" payload kind. + */ + sendDirectMessage?: (params: SendDirectMessageParams) => Promise; onEvent?: (evt: CronEvent) => void; }; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 370f5d116..58becdad7 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -190,6 +190,40 @@ export async function executeJob( return; } + // Handle direct message delivery (no agent involved) + if (job.sessionTarget === "direct") { + if (job.payload.kind !== "message") { + await finish("skipped", 'direct job requires payload.kind="message"'); + return; + } + if (!state.deps.sendDirectMessage) { + await finish("error", "sendDirectMessage not configured"); + return; + } + const { text, channel, to } = job.payload; + if (!text?.trim()) { + await finish("skipped", "message payload requires non-empty text"); + return; + } + const channelId = + channel === "last" ? undefined : (channel as import("../../channels/plugins/types.js").ChannelId); + if (!channelId) { + await finish("error", 'direct message requires explicit channel (not "last")'); + return; + } + const result = await state.deps.sendDirectMessage({ + channel: channelId, + text, + to, + }); + if (result.ok) { + await finish("ok", undefined, `Sent to ${channelId}${to ? `:${to}` : ""}`); + } else { + await finish("error", result.error ?? "message send failed"); + } + return; + } + if (job.payload.kind !== "agentTurn") { await finish("skipped", "isolated job requires payload.kind=agentTurn"); return; diff --git a/src/cron/types.ts b/src/cron/types.ts index f3fd891d6..287f67d0a 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -5,7 +5,7 @@ export type CronSchedule = | { kind: "every"; everyMs: number; anchorMs?: number } | { kind: "cron"; expr: string; tz?: string }; -export type CronSessionTarget = "main" | "isolated"; +export type CronSessionTarget = "main" | "isolated" | "direct"; export type CronWakeMode = "next-heartbeat" | "now"; export type CronMessageChannel = ChannelId | "last"; @@ -24,6 +24,15 @@ export type CronPayload = channel?: CronMessageChannel; to?: string; bestEffortDeliver?: boolean; + } + | { + kind: "message"; + /** The text to send directly to the channel. */ + text: string; + /** The channel to send to (e.g., "telegram", "discord"). */ + channel: CronMessageChannel; + /** Optional target id for DMs or specific channels. */ + to?: string; }; export type CronPayloadPatch = @@ -39,6 +48,12 @@ export type CronPayloadPatch = channel?: CronMessageChannel; to?: string; bestEffortDeliver?: boolean; + } + | { + kind: "message"; + text?: string; + channel?: CronMessageChannel; + to?: string; }; export type CronIsolation = { diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 63ed0c209..04119c785 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -50,6 +50,15 @@ export const CronPayloadSchema = Type.Union([ }, { additionalProperties: false }, ), + Type.Object( + { + kind: Type.Literal("message"), + text: NonEmptyString, + channel: Type.Union([Type.Literal("last"), NonEmptyString]), + to: Type.Optional(Type.String()), + }, + { additionalProperties: false }, + ), ]); export const CronPayloadPatchSchema = Type.Union([ @@ -74,6 +83,15 @@ export const CronPayloadPatchSchema = Type.Union([ }, { additionalProperties: false }, ), + Type.Object( + { + kind: Type.Literal("message"), + text: Type.Optional(NonEmptyString), + channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + to: Type.Optional(Type.String()), + }, + { additionalProperties: false }, + ), ]); export const CronIsolationSchema = Type.Object( @@ -110,7 +128,7 @@ export const CronJobSchema = Type.Object( createdAtMs: Type.Integer({ minimum: 0 }), updatedAtMs: Type.Integer({ minimum: 0 }), schedule: CronScheduleSchema, - sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), + sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]), wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), payload: CronPayloadSchema, isolation: Type.Optional(CronIsolationSchema), @@ -136,7 +154,7 @@ export const CronAddParamsSchema = Type.Object( enabled: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()), schedule: CronScheduleSchema, - sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), + sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]), wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), payload: CronPayloadSchema, isolation: Type.Optional(CronIsolationSchema), @@ -152,7 +170,7 @@ export const CronJobPatchSchema = Type.Object( enabled: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()), schedule: Type.Optional(CronScheduleSchema), - sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])), + sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")])), wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])), payload: Type.Optional(CronPayloadPatchSchema), isolation: Type.Optional(CronIsolationSchema), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 9a0c0ca98..6463394c7 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -5,13 +5,16 @@ import { resolveAgentMainSessionKey } from "../config/sessions.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; import { CronService } from "../cron/service.js"; +import type { SendDirectMessageParams, SendDirectMessageResult } from "../cron/service/state.js"; import { resolveCronStorePath } from "../cron/store.js"; import { runHeartbeatOnce } from "../infra/heartbeat-runner.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; +import { runMessageAction } from "../infra/outbound/message-action-runner.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { defaultRuntime } from "../runtime.js"; +import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js"; export type GatewayCronState = { cron: CronService; @@ -75,6 +78,45 @@ export function buildGatewayCronService(params: { lane: "cron", }); }, + sendDirectMessage: async ( + msgParams: SendDirectMessageParams, + ): Promise => { + const runtimeConfig = loadConfig(); + try { + const result = await runMessageAction({ + cfg: runtimeConfig, + action: "send", + params: { + channel: msgParams.channel, + message: msgParams.text, + to: msgParams.to, + }, + gateway: { + clientName: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT, + clientDisplayName: "cron", + mode: GATEWAY_CLIENT_MODES.BACKEND, + }, + }); + if (result.kind === "send" && result.sendResult) { + const msgResult = result.sendResult.result; + const messageId = + msgResult && typeof msgResult === "object" && "messageId" in msgResult + ? (msgResult as { messageId?: string }).messageId + : undefined; + return { + ok: true, + messageId, + }; + } + return { ok: true }; + } catch (err) { + cronLogger.error({ err: String(err) }, "cron: direct message send failed"); + return { + ok: false, + error: err instanceof Error ? err.message : String(err), + }; + } + }, log: getChildLogger({ module: "cron", storePath }), onEvent: (evt) => { params.broadcast("cron", evt, { dropIfSlow: true });