Merge 7fcfba3a55 into 4583f88626
This commit is contained in:
commit
2259d88c96
259
src/cron/service.direct-message.test.ts
Normal file
259
src/cron/service.direct-message.test.ts
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
import fs from "node:fs/promises";
|
||||||
|
import os from "node:os";
|
||||||
|
import path from "node:path";
|
||||||
|
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import { CronService } from "./service.js";
|
||||||
|
|
||||||
|
const noopLogger = {
|
||||||
|
debug: vi.fn(),
|
||||||
|
info: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
error: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
async function makeStorePath() {
|
||||||
|
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-cron-"));
|
||||||
|
return {
|
||||||
|
storePath: path.join(dir, "cron", "jobs.json"),
|
||||||
|
cleanup: async () => {
|
||||||
|
await fs.rm(dir, { recursive: true, force: true });
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("CronService direct message payload", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.useFakeTimers();
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
|
||||||
|
noopLogger.debug.mockClear();
|
||||||
|
noopLogger.info.mockClear();
|
||||||
|
noopLogger.warn.mockClear();
|
||||||
|
noopLogger.error.mockClear();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("runs a direct message job and sends without spinning up an agent", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
const sendDirectMessage = vi.fn(async () => ({ ok: true, messageId: "msg123" }));
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
sendDirectMessage,
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
|
||||||
|
const job = await cron.add({
|
||||||
|
name: "birthday reminder",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "at", atMs },
|
||||||
|
sessionTarget: "direct",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "message",
|
||||||
|
text: "🎂 Emma's birthday tomorrow!",
|
||||||
|
channel: "telegram",
|
||||||
|
to: "6450265544",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(job.state.nextRunAtMs).toBe(atMs);
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
|
||||||
|
// Direct message should have been sent
|
||||||
|
expect(sendDirectMessage).toHaveBeenCalledWith({
|
||||||
|
channel: "telegram",
|
||||||
|
text: "🎂 Emma's birthday tomorrow!",
|
||||||
|
to: "6450265544",
|
||||||
|
});
|
||||||
|
|
||||||
|
// Should NOT have called agent-related functions
|
||||||
|
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
const updated = jobs.find((j) => j.id === job.id);
|
||||||
|
expect(updated?.enabled).toBe(false);
|
||||||
|
expect(updated?.state.lastStatus).toBe("ok");
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("reports error when sendDirectMessage is not configured", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
// sendDirectMessage is NOT provided
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
|
||||||
|
const job = await cron.add({
|
||||||
|
name: "direct message test",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "at", atMs },
|
||||||
|
sessionTarget: "direct",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "message",
|
||||||
|
text: "Hello!",
|
||||||
|
channel: "telegram",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
const updated = jobs.find((j) => j.id === job.id);
|
||||||
|
expect(updated?.state.lastStatus).toBe("error");
|
||||||
|
expect(updated?.state.lastError).toBe("sendDirectMessage not configured");
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects direct sessionTarget with non-message payload", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
sendDirectMessage: vi.fn(async () => ({ ok: true })),
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
|
||||||
|
|
||||||
|
// Should throw when trying to create a direct job with systemEvent payload
|
||||||
|
await expect(
|
||||||
|
cron.add({
|
||||||
|
name: "invalid combo",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "at", atMs },
|
||||||
|
sessionTarget: "direct",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "systemEvent", text: "this should fail" },
|
||||||
|
}),
|
||||||
|
).rejects.toThrow('direct cron jobs require payload.kind="message"');
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles sendDirectMessage failure gracefully", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const sendDirectMessage = vi.fn(async () => ({
|
||||||
|
ok: false,
|
||||||
|
error: "Channel not configured",
|
||||||
|
}));
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
sendDirectMessage,
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const atMs = Date.parse("2025-12-13T00:00:02.000Z");
|
||||||
|
const job = await cron.add({
|
||||||
|
name: "failing message",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "at", atMs },
|
||||||
|
sessionTarget: "direct",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "message",
|
||||||
|
text: "Test message",
|
||||||
|
channel: "discord",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z"));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
|
||||||
|
const jobs = await cron.list({ includeDisabled: true });
|
||||||
|
const updated = jobs.find((j) => j.id === job.id);
|
||||||
|
expect(updated?.state.lastStatus).toBe("error");
|
||||||
|
expect(updated?.state.lastError).toBe("Channel not configured");
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("runs recurring direct message jobs", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const sendDirectMessage = vi.fn(async () => ({ ok: true }));
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
enqueueSystemEvent: vi.fn(),
|
||||||
|
requestHeartbeatNow: vi.fn(),
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
sendDirectMessage,
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
// Create a job that will run at specific times
|
||||||
|
const anchorMs = Date.parse("2025-12-13T00:01:00.000Z");
|
||||||
|
const job = await cron.add({
|
||||||
|
name: "recurring reminder",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "every", everyMs: 60000, anchorMs },
|
||||||
|
sessionTarget: "direct",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: {
|
||||||
|
kind: "message",
|
||||||
|
text: "Hourly check-in",
|
||||||
|
channel: "telegram",
|
||||||
|
to: "123456",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
// First run at anchor time
|
||||||
|
vi.setSystemTime(new Date("2025-12-13T00:01:00.000Z"));
|
||||||
|
await vi.runOnlyPendingTimersAsync();
|
||||||
|
expect(sendDirectMessage).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
// After first run, job should be rescheduled and still enabled
|
||||||
|
const jobsAfterFirst = await cron.list();
|
||||||
|
const afterFirst = jobsAfterFirst.find((j) => j.id === job.id);
|
||||||
|
expect(afterFirst?.enabled).toBe(true);
|
||||||
|
expect(afterFirst?.state.lastStatus).toBe("ok");
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -25,6 +25,9 @@ export function assertSupportedJobSpec(job: Pick<CronJob, "sessionTarget" | "pay
|
|||||||
if (job.sessionTarget === "isolated" && job.payload.kind !== "agentTurn") {
|
if (job.sessionTarget === "isolated" && job.payload.kind !== "agentTurn") {
|
||||||
throw new Error('isolated cron jobs require payload.kind="agentTurn"');
|
throw new Error('isolated cron jobs require payload.kind="agentTurn"');
|
||||||
}
|
}
|
||||||
|
if (job.sessionTarget === "direct" && job.payload.kind !== "message") {
|
||||||
|
throw new Error('direct cron jobs require payload.kind="message"');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function findJobOrThrow(state: CronServiceState, id: string) {
|
export function findJobOrThrow(state: CronServiceState, id: string) {
|
||||||
@ -131,6 +134,17 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP
|
|||||||
return { kind: "systemEvent", text };
|
return { kind: "systemEvent", text };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (patch.kind === "message") {
|
||||||
|
if (existing.kind !== "message") {
|
||||||
|
return buildPayloadFromPatch(patch);
|
||||||
|
}
|
||||||
|
const next: Extract<CronPayload, { kind: "message" }> = { ...existing };
|
||||||
|
if (typeof patch.text === "string") next.text = patch.text;
|
||||||
|
if (typeof patch.channel === "string") next.channel = patch.channel;
|
||||||
|
if (typeof patch.to === "string") next.to = patch.to;
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
if (existing.kind !== "agentTurn") {
|
if (existing.kind !== "agentTurn") {
|
||||||
return buildPayloadFromPatch(patch);
|
return buildPayloadFromPatch(patch);
|
||||||
}
|
}
|
||||||
@ -157,6 +171,21 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload {
|
|||||||
return { kind: "systemEvent", text: patch.text };
|
return { kind: "systemEvent", text: patch.text };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (patch.kind === "message") {
|
||||||
|
if (typeof patch.text !== "string" || patch.text.length === 0) {
|
||||||
|
throw new Error('cron.update payload.kind="message" requires text');
|
||||||
|
}
|
||||||
|
if (typeof patch.channel !== "string" || patch.channel.length === 0) {
|
||||||
|
throw new Error('cron.update payload.kind="message" requires channel');
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
kind: "message",
|
||||||
|
text: patch.text,
|
||||||
|
channel: patch.channel,
|
||||||
|
to: patch.to,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
if (typeof patch.message !== "string" || patch.message.length === 0) {
|
if (typeof patch.message !== "string" || patch.message.length === 0) {
|
||||||
throw new Error('cron.update payload.kind="agentTurn" requires message');
|
throw new Error('cron.update payload.kind="agentTurn" requires message');
|
||||||
}
|
}
|
||||||
|
|||||||
@ -36,7 +36,9 @@ export function inferLegacyName(job: {
|
|||||||
? job.payload.text
|
? job.payload.text
|
||||||
: job?.payload?.kind === "agentTurn" && typeof job.payload.message === "string"
|
: job?.payload?.kind === "agentTurn" && typeof job.payload.message === "string"
|
||||||
? job.payload.message
|
? job.payload.message
|
||||||
: "";
|
: job?.payload?.kind === "message" && typeof job.payload.text === "string"
|
||||||
|
? job.payload.text
|
||||||
|
: "";
|
||||||
const firstLine =
|
const firstLine =
|
||||||
text
|
text
|
||||||
.split("\n")
|
.split("\n")
|
||||||
@ -55,5 +57,6 @@ export function inferLegacyName(job: {
|
|||||||
|
|
||||||
export function normalizePayloadToSystemText(payload: CronPayload) {
|
export function normalizePayloadToSystemText(payload: CronPayload) {
|
||||||
if (payload.kind === "systemEvent") return payload.text.trim();
|
if (payload.kind === "systemEvent") return payload.text.trim();
|
||||||
|
if (payload.kind === "message") return payload.text.trim();
|
||||||
return payload.message.trim();
|
return payload.message.trim();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||||
|
import type { ChannelId } from "../../channels/plugins/types.js";
|
||||||
import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js";
|
import type { CronJob, CronJobCreate, CronJobPatch, CronStoreFile } from "../types.js";
|
||||||
|
|
||||||
export type CronEvent = {
|
export type CronEvent = {
|
||||||
@ -19,6 +20,18 @@ export type Logger = {
|
|||||||
error: (obj: unknown, msg?: string) => void;
|
error: (obj: unknown, msg?: string) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type SendDirectMessageParams = {
|
||||||
|
channel: ChannelId;
|
||||||
|
text: string;
|
||||||
|
to?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type SendDirectMessageResult = {
|
||||||
|
ok: boolean;
|
||||||
|
error?: string;
|
||||||
|
messageId?: string;
|
||||||
|
};
|
||||||
|
|
||||||
export type CronServiceDeps = {
|
export type CronServiceDeps = {
|
||||||
nowMs?: () => number;
|
nowMs?: () => number;
|
||||||
log: Logger;
|
log: Logger;
|
||||||
@ -34,6 +47,11 @@ export type CronServiceDeps = {
|
|||||||
outputText?: string;
|
outputText?: string;
|
||||||
error?: string;
|
error?: string;
|
||||||
}>;
|
}>;
|
||||||
|
/**
|
||||||
|
* Send a message directly to a channel without involving an agent.
|
||||||
|
* Used by the "message" payload kind.
|
||||||
|
*/
|
||||||
|
sendDirectMessage?: (params: SendDirectMessageParams) => Promise<SendDirectMessageResult>;
|
||||||
onEvent?: (evt: CronEvent) => void;
|
onEvent?: (evt: CronEvent) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -190,6 +190,40 @@ export async function executeJob(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle direct message delivery (no agent involved)
|
||||||
|
if (job.sessionTarget === "direct") {
|
||||||
|
if (job.payload.kind !== "message") {
|
||||||
|
await finish("skipped", 'direct job requires payload.kind="message"');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!state.deps.sendDirectMessage) {
|
||||||
|
await finish("error", "sendDirectMessage not configured");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const { text, channel, to } = job.payload;
|
||||||
|
if (!text?.trim()) {
|
||||||
|
await finish("skipped", "message payload requires non-empty text");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const channelId =
|
||||||
|
channel === "last" ? undefined : (channel as import("../../channels/plugins/types.js").ChannelId);
|
||||||
|
if (!channelId) {
|
||||||
|
await finish("error", 'direct message requires explicit channel (not "last")');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const result = await state.deps.sendDirectMessage({
|
||||||
|
channel: channelId,
|
||||||
|
text,
|
||||||
|
to,
|
||||||
|
});
|
||||||
|
if (result.ok) {
|
||||||
|
await finish("ok", undefined, `Sent to ${channelId}${to ? `:${to}` : ""}`);
|
||||||
|
} else {
|
||||||
|
await finish("error", result.error ?? "message send failed");
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (job.payload.kind !== "agentTurn") {
|
if (job.payload.kind !== "agentTurn") {
|
||||||
await finish("skipped", "isolated job requires payload.kind=agentTurn");
|
await finish("skipped", "isolated job requires payload.kind=agentTurn");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@ -5,7 +5,7 @@ export type CronSchedule =
|
|||||||
| { kind: "every"; everyMs: number; anchorMs?: number }
|
| { kind: "every"; everyMs: number; anchorMs?: number }
|
||||||
| { kind: "cron"; expr: string; tz?: string };
|
| { kind: "cron"; expr: string; tz?: string };
|
||||||
|
|
||||||
export type CronSessionTarget = "main" | "isolated";
|
export type CronSessionTarget = "main" | "isolated" | "direct";
|
||||||
export type CronWakeMode = "next-heartbeat" | "now";
|
export type CronWakeMode = "next-heartbeat" | "now";
|
||||||
|
|
||||||
export type CronMessageChannel = ChannelId | "last";
|
export type CronMessageChannel = ChannelId | "last";
|
||||||
@ -24,6 +24,15 @@ export type CronPayload =
|
|||||||
channel?: CronMessageChannel;
|
channel?: CronMessageChannel;
|
||||||
to?: string;
|
to?: string;
|
||||||
bestEffortDeliver?: boolean;
|
bestEffortDeliver?: boolean;
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
kind: "message";
|
||||||
|
/** The text to send directly to the channel. */
|
||||||
|
text: string;
|
||||||
|
/** The channel to send to (e.g., "telegram", "discord"). */
|
||||||
|
channel: CronMessageChannel;
|
||||||
|
/** Optional target id for DMs or specific channels. */
|
||||||
|
to?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type CronPayloadPatch =
|
export type CronPayloadPatch =
|
||||||
@ -39,6 +48,12 @@ export type CronPayloadPatch =
|
|||||||
channel?: CronMessageChannel;
|
channel?: CronMessageChannel;
|
||||||
to?: string;
|
to?: string;
|
||||||
bestEffortDeliver?: boolean;
|
bestEffortDeliver?: boolean;
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
kind: "message";
|
||||||
|
text?: string;
|
||||||
|
channel?: CronMessageChannel;
|
||||||
|
to?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type CronIsolation = {
|
export type CronIsolation = {
|
||||||
|
|||||||
@ -50,6 +50,15 @@ export const CronPayloadSchema = Type.Union([
|
|||||||
},
|
},
|
||||||
{ additionalProperties: false },
|
{ additionalProperties: false },
|
||||||
),
|
),
|
||||||
|
Type.Object(
|
||||||
|
{
|
||||||
|
kind: Type.Literal("message"),
|
||||||
|
text: NonEmptyString,
|
||||||
|
channel: Type.Union([Type.Literal("last"), NonEmptyString]),
|
||||||
|
to: Type.Optional(Type.String()),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
export const CronPayloadPatchSchema = Type.Union([
|
export const CronPayloadPatchSchema = Type.Union([
|
||||||
@ -74,6 +83,15 @@ export const CronPayloadPatchSchema = Type.Union([
|
|||||||
},
|
},
|
||||||
{ additionalProperties: false },
|
{ additionalProperties: false },
|
||||||
),
|
),
|
||||||
|
Type.Object(
|
||||||
|
{
|
||||||
|
kind: Type.Literal("message"),
|
||||||
|
text: Type.Optional(NonEmptyString),
|
||||||
|
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
|
||||||
|
to: Type.Optional(Type.String()),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
export const CronIsolationSchema = Type.Object(
|
export const CronIsolationSchema = Type.Object(
|
||||||
@ -110,7 +128,7 @@ export const CronJobSchema = Type.Object(
|
|||||||
createdAtMs: Type.Integer({ minimum: 0 }),
|
createdAtMs: Type.Integer({ minimum: 0 }),
|
||||||
updatedAtMs: Type.Integer({ minimum: 0 }),
|
updatedAtMs: Type.Integer({ minimum: 0 }),
|
||||||
schedule: CronScheduleSchema,
|
schedule: CronScheduleSchema,
|
||||||
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
|
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]),
|
||||||
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
|
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
|
||||||
payload: CronPayloadSchema,
|
payload: CronPayloadSchema,
|
||||||
isolation: Type.Optional(CronIsolationSchema),
|
isolation: Type.Optional(CronIsolationSchema),
|
||||||
@ -136,7 +154,7 @@ export const CronAddParamsSchema = Type.Object(
|
|||||||
enabled: Type.Optional(Type.Boolean()),
|
enabled: Type.Optional(Type.Boolean()),
|
||||||
deleteAfterRun: Type.Optional(Type.Boolean()),
|
deleteAfterRun: Type.Optional(Type.Boolean()),
|
||||||
schedule: CronScheduleSchema,
|
schedule: CronScheduleSchema,
|
||||||
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated")]),
|
sessionTarget: Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")]),
|
||||||
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
|
wakeMode: Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")]),
|
||||||
payload: CronPayloadSchema,
|
payload: CronPayloadSchema,
|
||||||
isolation: Type.Optional(CronIsolationSchema),
|
isolation: Type.Optional(CronIsolationSchema),
|
||||||
@ -152,7 +170,7 @@ export const CronJobPatchSchema = Type.Object(
|
|||||||
enabled: Type.Optional(Type.Boolean()),
|
enabled: Type.Optional(Type.Boolean()),
|
||||||
deleteAfterRun: Type.Optional(Type.Boolean()),
|
deleteAfterRun: Type.Optional(Type.Boolean()),
|
||||||
schedule: Type.Optional(CronScheduleSchema),
|
schedule: Type.Optional(CronScheduleSchema),
|
||||||
sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated")])),
|
sessionTarget: Type.Optional(Type.Union([Type.Literal("main"), Type.Literal("isolated"), Type.Literal("direct")])),
|
||||||
wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])),
|
wakeMode: Type.Optional(Type.Union([Type.Literal("next-heartbeat"), Type.Literal("now")])),
|
||||||
payload: Type.Optional(CronPayloadPatchSchema),
|
payload: Type.Optional(CronPayloadPatchSchema),
|
||||||
isolation: Type.Optional(CronIsolationSchema),
|
isolation: Type.Optional(CronIsolationSchema),
|
||||||
|
|||||||
@ -5,13 +5,16 @@ import { resolveAgentMainSessionKey } from "../config/sessions.js";
|
|||||||
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
|
||||||
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
|
import { appendCronRunLog, resolveCronRunLogPath } from "../cron/run-log.js";
|
||||||
import { CronService } from "../cron/service.js";
|
import { CronService } from "../cron/service.js";
|
||||||
|
import type { SendDirectMessageParams, SendDirectMessageResult } from "../cron/service/state.js";
|
||||||
import { resolveCronStorePath } from "../cron/store.js";
|
import { resolveCronStorePath } from "../cron/store.js";
|
||||||
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
||||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||||
|
import { runMessageAction } from "../infra/outbound/message-action-runner.js";
|
||||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||||
import { getChildLogger } from "../logging.js";
|
import { getChildLogger } from "../logging.js";
|
||||||
import { normalizeAgentId } from "../routing/session-key.js";
|
import { normalizeAgentId } from "../routing/session-key.js";
|
||||||
import { defaultRuntime } from "../runtime.js";
|
import { defaultRuntime } from "../runtime.js";
|
||||||
|
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
|
||||||
|
|
||||||
export type GatewayCronState = {
|
export type GatewayCronState = {
|
||||||
cron: CronService;
|
cron: CronService;
|
||||||
@ -75,6 +78,45 @@ export function buildGatewayCronService(params: {
|
|||||||
lane: "cron",
|
lane: "cron",
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
sendDirectMessage: async (
|
||||||
|
msgParams: SendDirectMessageParams,
|
||||||
|
): Promise<SendDirectMessageResult> => {
|
||||||
|
const runtimeConfig = loadConfig();
|
||||||
|
try {
|
||||||
|
const result = await runMessageAction({
|
||||||
|
cfg: runtimeConfig,
|
||||||
|
action: "send",
|
||||||
|
params: {
|
||||||
|
channel: msgParams.channel,
|
||||||
|
message: msgParams.text,
|
||||||
|
to: msgParams.to,
|
||||||
|
},
|
||||||
|
gateway: {
|
||||||
|
clientName: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT,
|
||||||
|
clientDisplayName: "cron",
|
||||||
|
mode: GATEWAY_CLIENT_MODES.BACKEND,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
if (result.kind === "send" && result.sendResult) {
|
||||||
|
const msgResult = result.sendResult.result;
|
||||||
|
const messageId =
|
||||||
|
msgResult && typeof msgResult === "object" && "messageId" in msgResult
|
||||||
|
? (msgResult as { messageId?: string }).messageId
|
||||||
|
: undefined;
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
messageId,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return { ok: true };
|
||||||
|
} catch (err) {
|
||||||
|
cronLogger.error({ err: String(err) }, "cron: direct message send failed");
|
||||||
|
return {
|
||||||
|
ok: false,
|
||||||
|
error: err instanceof Error ? err.message : String(err),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
},
|
||||||
log: getChildLogger({ module: "cron", storePath }),
|
log: getChildLogger({ module: "cron", storePath }),
|
||||||
onEvent: (evt) => {
|
onEvent: (evt) => {
|
||||||
params.broadcast("cron", evt, { dropIfSlow: true });
|
params.broadcast("cron", evt, { dropIfSlow: true });
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user