feat: add direct message payload type

This commit is contained in:
Gustav (Clawdbot) 2026-01-29 02:56:19 -05:00
parent 6372242da7
commit 7fcfba3a55
8 changed files with 423 additions and 5 deletions

View File

@ -0,0 +1,259 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CronService } from "./service.js";
const noopLogger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
async function makeStorePath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-cron-"));
return {
storePath: path.join(dir, "cron", "jobs.json"),
cleanup: async () => {
await fs.rm(dir, { recursive: true, force: true });
},
};
}
describe("CronService direct message payload", () => {
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
noopLogger.debug.mockClear();
noopLogger.info.mockClear();
noopLogger.warn.mockClear();
noopLogger.error.mockClear();
});
afterEach(() => {
vi.useRealTimers();
});
it("runs a direct message job and sends without spinning up an agent", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const sendDirectMessage = vi.fn(async () => ({ ok: true, messageId: "msg123" }));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
sendDirectMessage,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "birthday reminder",
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "direct",
wakeMode: "now",
payload: {
kind: "message",
text: "🎂 Emma's birthday tomorrow!",
channel: "telegram",
to: "6450265544",
},
});
expect(job.state.nextRunAtMs).toBe(atMs);
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
await vi.runOnlyPendingTimersAsync();
// Direct message should have been sent
expect(sendDirectMessage).toHaveBeenCalledWith({
channel: "telegram",
text: "🎂 Emma's birthday tomorrow!",
to: "6450265544",
});
// Should NOT have called agent-related functions
expect(enqueueSystemEvent).not.toHaveBeenCalled();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((j) => j.id === job.id);
expect(updated?.enabled).toBe(false);
expect(updated?.state.lastStatus).toBe("ok");
cron.stop();
await store.cleanup();
});
it("reports error when sendDirectMessage is not configured", async () => {
const store = await makeStorePath();
const enqueueSystemEvent = vi.fn();
const requestHeartbeatNow = vi.fn();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
// sendDirectMessage is NOT provided
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "direct message test",
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "direct",
wakeMode: "now",
payload: {
kind: "message",
text: "Hello!",
channel: "telegram",
},
});
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
await vi.runOnlyPendingTimersAsync();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((j) => j.id === job.id);
expect(updated?.state.lastStatus).toBe("error");
expect(updated?.state.lastError).toBe("sendDirectMessage not configured");
cron.stop();
await store.cleanup();
});
it("rejects direct sessionTarget with non-message payload", async () => {
const store = await makeStorePath();
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
sendDirectMessage: vi.fn(async () => ({ ok: true })),
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
// Should throw when trying to create a direct job with systemEvent payload
await expect(
cron.add({
name: "invalid combo",
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "direct",
wakeMode: "now",
payload: { kind: "systemEvent", text: "this should fail" },
}),
).rejects.toThrow('direct cron jobs require payload.kind="message"');
cron.stop();
await store.cleanup();
});
it("handles sendDirectMessage failure gracefully", async () => {
const store = await makeStorePath();
const sendDirectMessage = vi.fn(async () => ({
ok: false,
error: "Channel not configured",
}));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
sendDirectMessage,
});
await cron.start();
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
const job = await cron.add({
name: "failing message",
enabled: true,
schedule: { kind: "at", atMs },
sessionTarget: "direct",
wakeMode: "now",
payload: {
kind: "message",
text: "Test message",
channel: "discord",
},
});
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
await vi.runOnlyPendingTimersAsync();
const jobs = await cron.list({ includeDisabled: true });
const updated = jobs.find((j) => j.id === job.id);
expect(updated?.state.lastStatus).toBe("error");
expect(updated?.state.lastError).toBe("Channel not configured");
cron.stop();
await store.cleanup();
});
it("runs recurring direct message jobs", async () => {
const store = await makeStorePath();
const sendDirectMessage = vi.fn(async () => ({ ok: true }));
const cron = new CronService({
storePath: store.storePath,
cronEnabled: true,
log: noopLogger,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
sendDirectMessage,
});
await cron.start();
// Create a job that will run at specific times
const anchorMs = Date.parse("2025-12-13T00:01:00.000Z");
const job = await cron.add({
name: "recurring reminder",
enabled: true,
schedule: { kind: "every", everyMs: 60000, anchorMs },
sessionTarget: "direct",
wakeMode: "now",
payload: {
kind: "message",
text: "Hourly check-in",
channel: "telegram",
to: "123456",
},
});
// First run at anchor time
vi.setSystemTime(new Date("2025-12-13T00:01:00.000Z"));
await vi.runOnlyPendingTimersAsync();
expect(sendDirectMessage).toHaveBeenCalledTimes(1);
// After first run, job should be rescheduled and still enabled
const jobsAfterFirst = await cron.list();
const afterFirst = jobsAfterFirst.find((j) => j.id === job.id);
expect(afterFirst?.enabled).toBe(true);
expect(afterFirst?.state.lastStatus).toBe("ok");
cron.stop();
await store.cleanup();
});
});

View File

@ -25,6 +25,9 @@ export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "pay
if (job.sessionTarget === "isolated" && job.payload.kind !== "agentTurn") { if (job.sessionTarget === "isolated" && job.payload.kind !== "agentTurn") {
throw new Error('isolated cron jobs require payload.kind="agentTurn"'); throw new Error('isolated cron jobs require payload.kind="agentTurn"');
} }
if (job.sessionTarget === "direct" && job.payload.kind !== "message") {
throw new Error('direct cron jobs require payload.kind="message"');
}
} }
export function findJobOrThrow(state: CronServiceState, id: string) { export function findJobOrThrow(state: CronServiceState, id: string) {
@ -131,6 +134,17 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP
return { kind: "systemEvent", text }; return { kind: "systemEvent", text };
} }
if (patch.kind === "message") {
if (existing.kind !== "message") {
return buildPayloadFromPatch(patch);
}
const next: Extract<CronPayload, { kind: "message" }> = { ...existing };
if (typeof patch.text === "string") next.text = patch.text;
if (typeof patch.channel === "string") next.channel = patch.channel;
if (typeof patch.to === "string") next.to = patch.to;
return next;
}
if (existing.kind !== "agentTurn") { if (existing.kind !== "agentTurn") {
return buildPayloadFromPatch(patch); return buildPayloadFromPatch(patch);
} }
@ -157,6 +171,21 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload {
return { kind: "systemEvent", text: patch.text }; return { kind: "systemEvent", text: patch.text };
} }
if (patch.kind === "message") {
if (typeof patch.text !== "string" || patch.text.length === 0) {
throw new Error('cron.update payload.kind="message" requires text');
}
if (typeof patch.channel !== "string" || patch.channel.length === 0) {
throw new Error('cron.update payload.kind="message" requires channel');
}
return {
kind: "message",
text: patch.text,
channel: patch.channel,
to: patch.to,
};
}
if (typeof patch.message !== "string" || patch.message.length === 0) { if (typeof patch.message !== "string" || patch.message.length === 0) {
throw new Error('cron.update payload.kind="agentTurn" requires message'); throw new Error('cron.update payload.kind="agentTurn" requires message');
} }

View File

@ -36,7 +36,9 @@ export function inferLegacyName(job: {
? job.payload.text ? job.payload.text
: job?.payload?.kind === "agentTurn" && typeof job.payload.message === "string" : job?.payload?.kind === "agentTurn" && typeof job.payload.message === "string"
? job.payload.message ? job.payload.message
: ""; : job?.payload?.kind === "message" && typeof job.payload.text === "string"
? job.payload.text
: "";
const firstLine = const firstLine =
text text
.split("\n") .split("\n")
@ -55,5 +57,6 @@ export function inferLegacyName(job: {
export function normalizePayloadToSystemText(payload: CronPayload) { export function normalizePayloadToSystemText(payload: CronPayload) {
if (payload.kind === "systemEvent") return payload.text.trim(); if (payload.kind === "systemEvent") return payload.text.trim();
if (payload.kind === "message") return payload.text.trim();
return payload.message.trim(); return payload.message.trim();
} }

View File

@ -1,4 +1,5 @@
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
import type { ChannelId } from "../../channels/plugins/types.js";
import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js"; import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js";
export type CronEvent = { export type CronEvent = {
@ -19,6 +20,18 @@ export type Logger = {
error: (obj: unknown, msg?: string) => void; error: (obj: unknown, msg?: string) => void;
}; };
export type SendDirectMessageParams = {
channel: ChannelId;
text: string;
to?: string;
};
export type SendDirectMessageResult = {
ok: boolean;
error?: string;
messageId?: string;
};
export type CronServiceDeps = { export type CronServiceDeps = {
nowMs?: () => number; nowMs?: () => number;
log: Logger; log: Logger;
@ -34,6 +47,11 @@ export type CronServiceDeps = {
outputText?: string; outputText?: string;
error?: string; error?: string;
}>; }>;
/**
* Send a message directly to a channel without involving an agent.
* Used by the "message" payload kind.
*/
sendDirectMessage?: (params: SendDirectMessageParams) => Promise<SendDirectMessageResult>;
onEvent?: (evt: CronEvent) => void; onEvent?: (evt: CronEvent) => void;
}; };

View File

@ -190,6 +190,40 @@ export async function executeJob(
return; return;
} }
// Handle direct message delivery (no agent involved)
if (job.sessionTarget === "direct") {
if (job.payload.kind !== "message") {
await finish("skipped", 'direct job requires payload.kind="message"');
return;
}
if (!state.deps.sendDirectMessage) {
await finish("error", "sendDirectMessage not configured");
return;
}
const { text, channel, to } = job.payload;
if (!text?.trim()) {
await finish("skipped", "message payload requires non-empty text");
return;
}
const channelId =
channel === "last" ? undefined : (channel as import("../../channels/plugins/types.js").ChannelId);
if (!channelId) {
await finish("error", 'direct message requires explicit channel (not "last")');
return;
}
const result = await state.deps.sendDirectMessage({
channel: channelId,
text,
to,
});
if (result.ok) {
await finish("ok", undefined, `Sent to ${channelId}${to ? `:${to}` : ""}`);
} else {
await finish("error", result.error ?? "message send failed");
}
return;
}
if (job.payload.kind !== "agentTurn") { if (job.payload.kind !== "agentTurn") {
await finish("skipped", "isolated job requires payload.kind=agentTurn"); await finish("skipped", "isolated job requires payload.kind=agentTurn");
return; return;

View File

@ -5,7 +5,7 @@ export type CronSchedule =
| { kind: "every"; everyMs: number; anchorMs?: number } | { kind: "every"; everyMs: number; anchorMs?: number }
| { kind: "cron"; expr: string; tz?: string }; | { kind: "cron"; expr: string; tz?: string };
export type CronSessionTarget = "main" | "isolated"; export type CronSessionTarget = "main" | "isolated" | "direct";
export type CronWakeMode = "next-heartbeat" | "now"; export type CronWakeMode = "next-heartbeat" | "now";
export type CronMessageChannel = ChannelId | "last"; export type CronMessageChannel = ChannelId | "last";
@ -24,6 +24,15 @@ export type CronPayload =
channel?: CronMessageChannel; channel?: CronMessageChannel;
to?: string; to?: string;
bestEffortDeliver?: boolean; bestEffortDeliver?: boolean;
}
| {
kind: "message";
/** The text to send directly to the channel. */
text: string;
/** The channel to send to (e.g., "telegram", "discord"). */
channel: CronMessageChannel;
/** Optional target id for DMs or specific channels. */
to?: string;
}; };
export type CronPayloadPatch = export type CronPayloadPatch =
@ -39,6 +48,12 @@ export type CronPayloadPatch =
channel?: CronMessageChannel; channel?: CronMessageChannel;
to?: string; to?: string;
bestEffortDeliver?: boolean; bestEffortDeliver?: boolean;
}
| {
kind: "message";
text?: string;
channel?: CronMessageChannel;
to?: string;
}; };
export type CronIsolation = { export type CronIsolation = {

View File

@ -50,6 +50,15 @@ export const CronPayloadSchema = Type.Union([
}, },
{ additionalProperties: false }, { additionalProperties: false },
), ),
Type.Object(
{
kind: Type.Literal("message"),
text: NonEmptyString,
channel: Type.Union([Type.Literal("last"), NonEmptyString]),
to: Type.Optional(Type.String()),
},
{ additionalProperties: false },
),
]); ]);
export const CronPayloadPatchSchema = Type.Union([ export const CronPayloadPatchSchema = Type.Union([
@ -74,6 +83,15 @@ export const CronPayloadPatchSchema = Type.Union([
}, },
{ additionalProperties: false }, { additionalProperties: false },
), ),
Type.Object(
{
kind: Type.Literal("message"),
text: Type.Optional(NonEmptyString),
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
to: Type.Optional(Type.String()),
},
{ additionalProperties: false },
),
]); ]);
export const CronIsolationSchema = Type.Object( export const CronIsolationSchema = Type.Object(
@ -110,7 +128,7 @@ export const CronJobSchema = Type.Object(
createdAtMs: Type.Integer({ minimum: 0 }), createdAtMs: Type.Integer({ minimum: 0 }),
updatedAtMs: Type.Integer({ minimum: 0 }), updatedAtMs: Type.Integer({ minimum: 0 }),
schedule: CronScheduleSchema, schedule: CronScheduleSchema,
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]),
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
payload: CronPayloadSchema, payload: CronPayloadSchema,
isolation: Type.Optional(CronIsolationSchema), isolation: Type.Optional(CronIsolationSchema),
@ -136,7 +154,7 @@ export const CronAddParamsSchema = Type.Object(
enabled: Type.Optional(Type.Boolean()), enabled: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()),
schedule: CronScheduleSchema, schedule: CronScheduleSchema,
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]), sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]),
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]), wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
payload: CronPayloadSchema, payload: CronPayloadSchema,
isolation: Type.Optional(CronIsolationSchema), isolation: Type.Optional(CronIsolationSchema),
@ -152,7 +170,7 @@ export const CronJobPatchSchema = Type.Object(
enabled: Type.Optional(Type.Boolean()), enabled: Type.Optional(Type.Boolean()),
deleteAfterRun: Type.Optional(Type.Boolean()), deleteAfterRun: Type.Optional(Type.Boolean()),
schedule: Type.Optional(CronScheduleSchema), schedule: Type.Optional(CronScheduleSchema),
sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])), sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")])),
wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])), wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])),
payload: Type.Optional(CronPayloadPatchSchema), payload: Type.Optional(CronPayloadPatchSchema),
isolation: Type.Optional(CronIsolationSchema), isolation: Type.Optional(CronIsolationSchema),

View File

@ -5,13 +5,16 @@ import { resolveAgentMainSessionKey } from "../config/sessions.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js"; import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js"; import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
import { CronService } from "../cron/service.js"; import { CronService } from "../cron/service.js";
import type { SendDirectMessageParams, SendDirectMessageResult } from "../cron/service/state.js";
import { resolveCronStorePath } from "../cron/store.js"; import { resolveCronStorePath } from "../cron/store.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js"; import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { runMessageAction } from "../infra/outbound/message-action-runner.js";
import { enqueueSystemEvent } from "../infra/system-events.js"; import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js"; import { getChildLogger } from "../logging.js";
import { normalizeAgentId } from "../routing/session-key.js"; import { normalizeAgentId } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js"; import { defaultRuntime } from "../runtime.js";
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
export type GatewayCronState = { export type GatewayCronState = {
cron: CronService; cron: CronService;
@ -75,6 +78,45 @@ export function buildGatewayCronService(params: {
lane: "cron", lane: "cron",
}); });
}, },
sendDirectMessage: async (
msgParams: SendDirectMessageParams,
): Promise<SendDirectMessageResult> => {
const runtimeConfig = loadConfig();
try {
const result = await runMessageAction({
cfg: runtimeConfig,
action: "send",
params: {
channel: msgParams.channel,
message: msgParams.text,
to: msgParams.to,
},
gateway: {
clientName: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT,
clientDisplayName: "cron",
mode: GATEWAY_CLIENT_MODES.BACKEND,
},
});
if (result.kind === "send" && result.sendResult) {
const msgResult = result.sendResult.result;
const messageId =
msgResult && typeof msgResult === "object" && "messageId" in msgResult
? (msgResult as { messageId?: string }).messageId
: undefined;
return {
ok: true,
messageId,
};
}
return { ok: true };
} catch (err) {
cronLogger.error({ err: String(err) }, "cron: direct message send failed");
return {
ok: false,
error: err instanceof Error ? err.message : String(err),
};
}
},
log: getChildLogger({ module: "cron", storePath }), log: getChildLogger({ module: "cron", storePath }),
onEvent: (evt) => { onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true }); params.broadcast("cron", evt, { dropIfSlow: true });