This commit is contained in:
JoelCooperPhD 2026-01-29 14:34:42 -07:00 committed by GitHub
commit 1c4bd196e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 81 additions and 17 deletions

View File

@ -53,6 +53,8 @@ export function createMoltbotTools(options?: {
modelHasVision?: boolean; modelHasVision?: boolean;
/** Explicit agent ID override for cron/hook sessions. */ /** Explicit agent ID override for cron/hook sessions. */
requesterAgentIdOverride?: string; requesterAgentIdOverride?: string;
/** Direct cron service for in-process calls (avoids WebSocket self-deadlock). */
cronService?: Parameters<typeof createCronTool>[0] extends { cronService?: infer T } ? T : never;
}): AnyAgentTool[] { }): AnyAgentTool[] {
const imageTool = options?.agentDir?.trim() const imageTool = options?.agentDir?.trim()
? createImageTool({ ? createImageTool({
@ -82,6 +84,7 @@ export function createMoltbotTools(options?: {
}), }),
createCronTool({ createCronTool({
agentSessionKey: options?.agentSessionKey, agentSessionKey: options?.agentSessionKey,
cronService: options?.cronService,
}), }),
createMessageTool({ createMessageTool({
agentAccountId: options?.agentAccountId, agentAccountId: options?.agentAccountId,

View File

@ -5,6 +5,7 @@ import { truncateUtf16Safe } from "../../utils.js";
import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; import { optionalStringEnum, stringEnum } from "../schema/typebox.js";
import { resolveSessionAgentId } from "../agent-scope.js"; import { resolveSessionAgentId } from "../agent-scope.js";
import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js"; import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js";
import { getCronServiceInstance } from "../../cron/service-registry.js";
import { callGatewayTool, type GatewayCallOptions } from "./gateway.js"; import { callGatewayTool, type GatewayCallOptions } from "./gateway.js";
import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-helpers.js"; import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-helpers.js";
@ -42,6 +43,15 @@ const CronToolSchema = Type.Object({
type CronToolOptions = { type CronToolOptions = {
agentSessionKey?: string; agentSessionKey?: string;
cronService?: {
status: () => Promise<unknown>;
list: (opts?: { includeDisabled?: boolean }) => Promise<unknown>;
add: (input: unknown) => Promise<unknown>;
update: (id: string, patch: unknown) => Promise<unknown>;
remove: (id: string) => Promise<unknown>;
run: (id: string, mode?: string) => Promise<unknown>;
wake: (opts: { mode: string; text: string }) => unknown;
};
}; };
type ChatMessage = { type ChatMessage = {
@ -187,14 +197,20 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : undefined, timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : undefined,
}; };
const cron = opts?.cronService ?? getCronServiceInstance();
switch (action) { switch (action) {
case "status": case "status":
return jsonResult(await callGatewayTool("cron.status", gatewayOpts, {})); return jsonResult(
cron ? await cron.status() : await callGatewayTool("cron.status", gatewayOpts, {}),
);
case "list": case "list":
return jsonResult( return jsonResult(
await callGatewayTool("cron.list", gatewayOpts, { cron
includeDisabled: Boolean(params.includeDisabled), ? await cron.list({ includeDisabled: Boolean(params.includeDisabled) })
}), : await callGatewayTool("cron.list", gatewayOpts, {
includeDisabled: Boolean(params.includeDisabled),
}),
); );
case "add": { case "add": {
if (!params.job || typeof params.job !== "object") { if (!params.job || typeof params.job !== "object") {
@ -233,7 +249,11 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
} }
} }
} }
return jsonResult(await callGatewayTool("cron.add", gatewayOpts, job)); return jsonResult(
cron
? await cron.add(job as never)
: await callGatewayTool("cron.add", gatewayOpts, job),
);
} }
case "update": { case "update": {
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
@ -245,10 +265,9 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
} }
const patch = normalizeCronJobPatch(params.patch) ?? params.patch; const patch = normalizeCronJobPatch(params.patch) ?? params.patch;
return jsonResult( return jsonResult(
await callGatewayTool("cron.update", gatewayOpts, { cron
id, ? await cron.update(id, patch as never)
patch, : await callGatewayTool("cron.update", gatewayOpts, { id, patch }),
}),
); );
} }
case "remove": { case "remove": {
@ -256,14 +275,20 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
if (!id) { if (!id) {
throw new Error("jobId required (id accepted for backward compatibility)"); throw new Error("jobId required (id accepted for backward compatibility)");
} }
return jsonResult(await callGatewayTool("cron.remove", gatewayOpts, { id })); return jsonResult(
cron
? await cron.remove(id)
: await callGatewayTool("cron.remove", gatewayOpts, { id }),
);
} }
case "run": { case "run": {
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");
if (!id) { if (!id) {
throw new Error("jobId required (id accepted for backward compatibility)"); throw new Error("jobId required (id accepted for backward compatibility)");
} }
return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id })); return jsonResult(
cron ? await cron.run(id) : await callGatewayTool("cron.run", gatewayOpts, { id }),
);
} }
case "runs": { case "runs": {
const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); const id = readStringParam(params, "jobId") ?? readStringParam(params, "id");

View File

@ -0,0 +1,21 @@
/**
* Module-level registry for the CronService singleton.
*
* The gateway creates a single CronService instance. The cron tool needs
* direct access to it to avoid WebSocket self-deadlock when the embedded
* agent calls cron operations from within the same process.
*
* This registry avoids threading the instance through 7+ layers of params.
*/
import type { CronService } from "./service.js";
let instance: CronService | undefined;
export function setCronServiceInstance(svc: CronService): void {
instance = svc;
}
export function getCronServiceInstance(): CronService | undefined {
return instance;
}

View File

@ -28,9 +28,17 @@ export async function onTimer(state: CronServiceState) {
if (state.running) return; if (state.running) return;
state.running = true; state.running = true;
try { try {
await locked(state, async () => { // Identify due jobs under the lock, then release the lock before executing
// them. This avoids a self-deadlock when an embedded agent calls the cron
// tool mid-run (the agent's cron.list() would block on locked() otherwise).
const dueJobs = await locked(state, async () => {
await ensureLoaded(state); await ensureLoaded(state);
await runDueJobs(state); return collectDueJobs(state);
});
for (const job of dueJobs) {
await executeJob(state, job, state.deps.nowMs(), { forced: false });
}
await locked(state, async () => {
await persist(state); await persist(state);
armTimer(state); armTimer(state);
}); });
@ -39,17 +47,21 @@ export async function onTimer(state: CronServiceState) {
} }
} }
export async function runDueJobs(state: CronServiceState) { function collectDueJobs(state: CronServiceState): CronJob[] {
if (!state.store) return; if (!state.store) return [];
const now = state.deps.nowMs(); const now = state.deps.nowMs();
const due = state.store.jobs.filter((j) => { return state.store.jobs.filter((j) => {
if (!j.enabled) return false; if (!j.enabled) return false;
if (typeof j.state.runningAtMs === "number") return false; if (typeof j.state.runningAtMs === "number") return false;
const next = j.state.nextRunAtMs; const next = j.state.nextRunAtMs;
return typeof next === "number" && now >= next; return typeof next === "number" && now >= next;
}); });
}
export async function runDueJobs(state: CronServiceState) {
const due = collectDueJobs(state);
for (const job of due) { for (const job of due) {
await executeJob(state, job, now, { forced: false }); await executeJob(state, job, state.deps.nowMs(), { forced: false });
} }
} }

View File

@ -50,6 +50,7 @@ import { createChannelManager } from "./server-channels.js";
import { createAgentEventHandler } from "./server-chat.js"; import { createAgentEventHandler } from "./server-chat.js";
import { createGatewayCloseHandler } from "./server-close.js"; import { createGatewayCloseHandler } from "./server-close.js";
import { buildGatewayCronService } from "./server-cron.js"; import { buildGatewayCronService } from "./server-cron.js";
import { setCronServiceInstance } from "../cron/service-registry.js";
import { applyGatewayLaneConcurrency } from "./server-lanes.js"; import { applyGatewayLaneConcurrency } from "./server-lanes.js";
import { startGatewayMaintenanceTimers } from "./server-maintenance.js"; import { startGatewayMaintenanceTimers } from "./server-maintenance.js";
import { coreGatewayHandlers } from "./server-methods.js"; import { coreGatewayHandlers } from "./server-methods.js";
@ -334,6 +335,7 @@ export async function startGatewayServer(
broadcast, broadcast,
}); });
let { cron, storePath: cronStorePath } = cronState; let { cron, storePath: cronStorePath } = cronState;
setCronServiceInstance(cron);
const channelManager = createChannelManager({ const channelManager = createChannelManager({
loadConfig, loadConfig,
@ -519,6 +521,7 @@ export async function startGatewayServer(
heartbeatRunner = nextState.heartbeatRunner; heartbeatRunner = nextState.heartbeatRunner;
cronState = nextState.cronState; cronState = nextState.cronState;
cron = cronState.cron; cron = cronState.cron;
setCronServiceInstance(cron);
cronStorePath = cronState.storePath; cronStorePath = cronState.storePath;
browserControl = nextState.browserControl; browserControl = nextState.browserControl;
}, },