From 5dae98ca33acb7fa7233e6466cf1598e0cbc018f Mon Sep 17 00:00:00 2001 From: Joel Cooper Date: Thu, 29 Jan 2026 14:28:50 -0700 Subject: [PATCH] fix: avoid cron self-deadlock when embedded agent calls cron tools MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a cron job triggers an agent that calls back into cron (e.g. cron.list), the non-reentrant async mutex in locked() causes a circular wait: onTimer holds the lock waiting for the agent, the agent's cron.list waits for the lock. Fix by: 1. Splitting onTimer to collect due jobs under the lock, then execute them after releasing it. The runningAtMs sentinel prevents double-runs. 2. Adding a module-level CronService registry so the cron tool can call the service directly in-process, bypassing the gateway WebSocket round-trip entirely. AI-assisted (Claude). Tested locally with cron jobs that call cron.list mid-execution — previously deadlocked, now works. Co-Authored-By: Claude Opus 4.5 --- src/agents/moltbot-tools.ts | 3 +++ src/agents/tools/cron-tool.ts | 47 +++++++++++++++++++++++++++-------- src/cron/service-registry.ts | 21 ++++++++++++++++ src/cron/service/timer.ts | 24 +++++++++++++----- src/gateway/server.impl.ts | 3 +++ 5 files changed, 81 insertions(+), 17 deletions(-) create mode 100644 src/cron/service-registry.ts diff --git a/src/agents/moltbot-tools.ts b/src/agents/moltbot-tools.ts index c10a55190..811ee8b89 100644 --- a/src/agents/moltbot-tools.ts +++ b/src/agents/moltbot-tools.ts @@ -53,6 +53,8 @@ export function createMoltbotTools(options?: { modelHasVision?: boolean; /** Explicit agent ID override for cron/hook sessions. */ requesterAgentIdOverride?: string; + /** Direct cron service for in-process calls (avoids WebSocket self-deadlock). */ + cronService?: Parameters[0] extends { cronService?: infer T } ? T : never; }): AnyAgentTool[] { const imageTool = options?.agentDir?.trim() ? createImageTool({ @@ -82,6 +84,7 @@ export function createMoltbotTools(options?: { }), createCronTool({ agentSessionKey: options?.agentSessionKey, + cronService: options?.cronService, }), createMessageTool({ agentAccountId: options?.agentAccountId, diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 739b3ada3..8ab489cb2 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -5,6 +5,7 @@ import { truncateUtf16Safe } from "../../utils.js"; import { optionalStringEnum, stringEnum } from "../schema/typebox.js"; import { resolveSessionAgentId } from "../agent-scope.js"; import { type AnyAgentTool, jsonResult, readStringParam } from "./common.js"; +import { getCronServiceInstance } from "../../cron/service-registry.js"; import { callGatewayTool, type GatewayCallOptions } from "./gateway.js"; import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-helpers.js"; @@ -42,6 +43,15 @@ const CronToolSchema = Type.Object({ type CronToolOptions = { agentSessionKey?: string; + cronService?: { + status: () => Promise; + list: (opts?: { includeDisabled?: boolean }) => Promise; + add: (input: unknown) => Promise; + update: (id: string, patch: unknown) => Promise; + remove: (id: string) => Promise; + run: (id: string, mode?: string) => Promise; + wake: (opts: { mode: string; text: string }) => unknown; + }; }; type ChatMessage = { @@ -187,14 +197,20 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con timeoutMs: typeof params.timeoutMs === "number" ? params.timeoutMs : undefined, }; + const cron = opts?.cronService ?? getCronServiceInstance(); + switch (action) { case "status": - return jsonResult(await callGatewayTool("cron.status", gatewayOpts, {})); + return jsonResult( + cron ? await cron.status() : await callGatewayTool("cron.status", gatewayOpts, {}), + ); case "list": return jsonResult( - await callGatewayTool("cron.list", gatewayOpts, { - includeDisabled: Boolean(params.includeDisabled), - }), + cron + ? await cron.list({ includeDisabled: Boolean(params.includeDisabled) }) + : await callGatewayTool("cron.list", gatewayOpts, { + includeDisabled: Boolean(params.includeDisabled), + }), ); case "add": { if (!params.job || typeof params.job !== "object") { @@ -233,7 +249,11 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con } } } - return jsonResult(await callGatewayTool("cron.add", gatewayOpts, job)); + return jsonResult( + cron + ? await cron.add(job as never) + : await callGatewayTool("cron.add", gatewayOpts, job), + ); } case "update": { const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); @@ -245,10 +265,9 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con } const patch = normalizeCronJobPatch(params.patch) ?? params.patch; return jsonResult( - await callGatewayTool("cron.update", gatewayOpts, { - id, - patch, - }), + cron + ? await cron.update(id, patch as never) + : await callGatewayTool("cron.update", gatewayOpts, { id, patch }), ); } case "remove": { @@ -256,14 +275,20 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con if (!id) { throw new Error("jobId required (id accepted for backward compatibility)"); } - return jsonResult(await callGatewayTool("cron.remove", gatewayOpts, { id })); + return jsonResult( + cron + ? await cron.remove(id) + : await callGatewayTool("cron.remove", gatewayOpts, { id }), + ); } case "run": { const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); if (!id) { throw new Error("jobId required (id accepted for backward compatibility)"); } - return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id })); + return jsonResult( + cron ? await cron.run(id) : await callGatewayTool("cron.run", gatewayOpts, { id }), + ); } case "runs": { const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); diff --git a/src/cron/service-registry.ts b/src/cron/service-registry.ts new file mode 100644 index 000000000..cf18a6e49 --- /dev/null +++ b/src/cron/service-registry.ts @@ -0,0 +1,21 @@ +/** + * Module-level registry for the CronService singleton. + * + * The gateway creates a single CronService instance. The cron tool needs + * direct access to it to avoid WebSocket self-deadlock when the embedded + * agent calls cron operations from within the same process. + * + * This registry avoids threading the instance through 7+ layers of params. + */ + +import type { CronService } from "./service.js"; + +let instance: CronService | undefined; + +export function setCronServiceInstance(svc: CronService): void { + instance = svc; +} + +export function getCronServiceInstance(): CronService | undefined { + return instance; +} diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 370f5d116..e455783ed 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -28,9 +28,17 @@ export async function onTimer(state: CronServiceState) { if (state.running) return; state.running = true; try { - await locked(state, async () => { + // Identify due jobs under the lock, then release the lock before executing + // them. This avoids a self-deadlock when an embedded agent calls the cron + // tool mid-run (the agent's cron.list() would block on locked() otherwise). + const dueJobs = await locked(state, async () => { await ensureLoaded(state); - await runDueJobs(state); + return collectDueJobs(state); + }); + for (const job of dueJobs) { + await executeJob(state, job, state.deps.nowMs(), { forced: false }); + } + await locked(state, async () => { await persist(state); armTimer(state); }); @@ -39,17 +47,21 @@ export async function onTimer(state: CronServiceState) { } } -export async function runDueJobs(state: CronServiceState) { - if (!state.store) return; +function collectDueJobs(state: CronServiceState): CronJob[] { + if (!state.store) return []; const now = state.deps.nowMs(); - const due = state.store.jobs.filter((j) => { + return state.store.jobs.filter((j) => { if (!j.enabled) return false; if (typeof j.state.runningAtMs === "number") return false; const next = j.state.nextRunAtMs; return typeof next === "number" && now >= next; }); +} + +export async function runDueJobs(state: CronServiceState) { + const due = collectDueJobs(state); for (const job of due) { - await executeJob(state, job, now, { forced: false }); + await executeJob(state, job, state.deps.nowMs(), { forced: false }); } } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index f641c4076..3c65b0a08 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -50,6 +50,7 @@ import { createChannelManager } from "./server-channels.js"; import { createAgentEventHandler } from "./server-chat.js"; import { createGatewayCloseHandler } from "./server-close.js"; import { buildGatewayCronService } from "./server-cron.js"; +import { setCronServiceInstance } from "../cron/service-registry.js"; import { applyGatewayLaneConcurrency } from "./server-lanes.js"; import { startGatewayMaintenanceTimers } from "./server-maintenance.js"; import { coreGatewayHandlers } from "./server-methods.js"; @@ -334,6 +335,7 @@ export async function startGatewayServer( broadcast, }); let { cron, storePath: cronStorePath } = cronState; + setCronServiceInstance(cron); const channelManager = createChannelManager({ loadConfig, @@ -519,6 +521,7 @@ export async function startGatewayServer( heartbeatRunner = nextState.heartbeatRunner; cronState = nextState.cronState; cron = cronState.cron; + setCronServiceInstance(cron); cronStorePath = cronState.storePath; browserControl = nextState.browserControl; },