import fs from "node:fs/promises"; import path from "node:path"; import { resolveAgentConfig, resolveAgentWorkspaceDir, resolveDefaultAgentId, } from "../agents/agent-scope.js"; import { resolveUserTimezone } from "../agents/date-time.js"; import { resolveEffectiveMessagesConfig } from "../agents/identity.js"; import { DEFAULT_HEARTBEAT_FILENAME } from "../agents/workspace.js"; import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, DEFAULT_HEARTBEAT_EVERY, isHeartbeatContentEffectivelyEmpty, resolveHeartbeatPrompt as resolveHeartbeatPromptText, stripHeartbeatToken, } from "../auto-reply/heartbeat.js"; import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js"; import { getReplyFromConfig } from "../auto-reply/reply.js"; import type { ReplyPayload } from "../auto-reply/types.js"; import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ChannelHeartbeatDeps } from "../channels/plugins/types.js"; import { parseDurationMs } from "../cli/parse-duration.js"; import type { ClawdbotConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { canonicalizeMainSessionAlias, loadSessionStore, resolveAgentIdFromSessionKey, resolveAgentMainSessionKey, resolveStorePath, saveSessionStore, updateSessionStore, } from "../config/sessions.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js"; import { emitHeartbeatEvent, resolveIndicatorType } from "./heartbeat-events.js"; import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js"; import { type HeartbeatRunResult, type HeartbeatWakeHandler, requestHeartbeatNow, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; import { deliverOutboundPayloads } from "./outbound/deliver.js"; import { resolveHeartbeatDeliveryTarget, resolveHeartbeatSenderContext, } from "./outbound/targets.js"; type HeartbeatDeps = OutboundSendDeps & ChannelHeartbeatDeps & { runtime?: RuntimeEnv; getQueueSize?: (lane?: string) => number; nowMs?: () => number; }; const log = createSubsystemLogger("gateway/heartbeat"); let heartbeatsEnabled = true; export function setHeartbeatsEnabled(enabled: boolean) { heartbeatsEnabled = enabled; } type HeartbeatConfig = AgentDefaultsConfig["heartbeat"]; type HeartbeatAgent = { agentId: string; heartbeat?: HeartbeatConfig; }; export type HeartbeatSummary = { enabled: boolean; every: string; everyMs: number | null; prompt: string; target: string; model?: string; ackMaxChars: number; }; const DEFAULT_HEARTBEAT_TARGET = "last"; const ACTIVE_HOURS_TIME_PATTERN = /^([01]\d|2[0-3]|24):([0-5]\d)$/; function resolveActiveHoursTimezone(cfg: ClawdbotConfig, raw?: string): string { const trimmed = raw?.trim(); if (!trimmed || trimmed === "user") { return resolveUserTimezone(cfg.agents?.defaults?.userTimezone); } if (trimmed === "local") { const host = Intl.DateTimeFormat().resolvedOptions().timeZone; return host?.trim() || "UTC"; } try { new Intl.DateTimeFormat("en-US", { timeZone: trimmed }).format(new Date()); return trimmed; } catch { return resolveUserTimezone(cfg.agents?.defaults?.userTimezone); } } function parseActiveHoursTime(opts: { allow24: boolean }, raw?: string): number | null { if (!raw || !ACTIVE_HOURS_TIME_PATTERN.test(raw)) return null; const [hourStr, minuteStr] = raw.split(":"); const hour = Number(hourStr); const minute = Number(minuteStr); if (!Number.isFinite(hour) || !Number.isFinite(minute)) return null; if (hour === 24) { if (!opts.allow24 || minute !== 0) return null; return 24 * 60; } return hour * 60 + minute; } function resolveMinutesInTimeZone(nowMs: number, timeZone: string): number | null { try { const parts = new Intl.DateTimeFormat("en-US", { timeZone, hour: "2-digit", minute: "2-digit", hourCycle: "h23", }).formatToParts(new Date(nowMs)); const map: Record = {}; for (const part of parts) { if (part.type !== "literal") map[part.type] = part.value; } const hour = Number(map.hour); const minute = Number(map.minute); if (!Number.isFinite(hour) || !Number.isFinite(minute)) return null; return hour * 60 + minute; } catch { return null; } } function isWithinActiveHours( cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig, nowMs?: number, ): boolean { const active = heartbeat?.activeHours; if (!active) return true; const startMin = parseActiveHoursTime({ allow24: false }, active.start); const endMin = parseActiveHoursTime({ allow24: true }, active.end); if (startMin === null || endMin === null) return true; if (startMin === endMin) return true; const timeZone = resolveActiveHoursTimezone(cfg, active.timezone); const currentMin = resolveMinutesInTimeZone(nowMs ?? Date.now(), timeZone); if (currentMin === null) return true; if (endMin > startMin) { return currentMin >= startMin && currentMin < endMin; } return currentMin >= startMin || currentMin < endMin; } type HeartbeatAgentState = { agentId: string; heartbeat?: HeartbeatConfig; intervalMs: number; lastRunMs?: number; nextDueMs: number; }; export type HeartbeatRunner = { stop: () => void; updateConfig: (cfg: ClawdbotConfig) => void; }; function hasExplicitHeartbeatAgents(cfg: ClawdbotConfig) { const list = cfg.agents?.list ?? []; return list.some((entry) => Boolean(entry?.heartbeat)); } export function isHeartbeatEnabledForAgent(cfg: ClawdbotConfig, agentId?: string): boolean { const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg)); const list = cfg.agents?.list ?? []; const hasExplicit = hasExplicitHeartbeatAgents(cfg); if (hasExplicit) { return list.some( (entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === resolvedAgentId, ); } return resolvedAgentId === resolveDefaultAgentId(cfg); } function resolveHeartbeatConfig( cfg: ClawdbotConfig, agentId?: string, ): HeartbeatConfig | undefined { const defaults = cfg.agents?.defaults?.heartbeat; if (!agentId) return defaults; const overrides = resolveAgentConfig(cfg, agentId)?.heartbeat; if (!defaults && !overrides) return overrides; return { ...defaults, ...overrides }; } export function resolveHeartbeatSummaryForAgent( cfg: ClawdbotConfig, agentId?: string, ): HeartbeatSummary { const defaults = cfg.agents?.defaults?.heartbeat; const overrides = agentId ? resolveAgentConfig(cfg, agentId)?.heartbeat : undefined; const enabled = isHeartbeatEnabledForAgent(cfg, agentId); if (!enabled) { return { enabled: false, every: "disabled", everyMs: null, prompt: resolveHeartbeatPromptText(defaults?.prompt), target: defaults?.target ?? DEFAULT_HEARTBEAT_TARGET, model: defaults?.model, ackMaxChars: Math.max(0, defaults?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS), }; } const merged = defaults || overrides ? { ...defaults, ...overrides } : undefined; const every = merged?.every ?? defaults?.every ?? overrides?.every ?? DEFAULT_HEARTBEAT_EVERY; const everyMs = resolveHeartbeatIntervalMs(cfg, undefined, merged); const prompt = resolveHeartbeatPromptText( merged?.prompt ?? defaults?.prompt ?? overrides?.prompt, ); const target = merged?.target ?? defaults?.target ?? overrides?.target ?? DEFAULT_HEARTBEAT_TARGET; const model = merged?.model ?? defaults?.model ?? overrides?.model; const ackMaxChars = Math.max( 0, merged?.ackMaxChars ?? defaults?.ackMaxChars ?? overrides?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, ); return { enabled: true, every, everyMs, prompt, target, model, ackMaxChars, }; } function resolveHeartbeatAgents(cfg: ClawdbotConfig): HeartbeatAgent[] { const list = cfg.agents?.list ?? []; if (hasExplicitHeartbeatAgents(cfg)) { return list .filter((entry) => entry?.heartbeat) .map((entry) => { const id = normalizeAgentId(entry.id); return { agentId: id, heartbeat: resolveHeartbeatConfig(cfg, id) }; }) .filter((entry) => entry.agentId); } const fallbackId = resolveDefaultAgentId(cfg); return [{ agentId: fallbackId, heartbeat: resolveHeartbeatConfig(cfg, fallbackId) }]; } export function resolveHeartbeatIntervalMs( cfg: ClawdbotConfig, overrideEvery?: string, heartbeat?: HeartbeatConfig, ) { const raw = overrideEvery ?? heartbeat?.every ?? cfg.agents?.defaults?.heartbeat?.every ?? DEFAULT_HEARTBEAT_EVERY; if (!raw) return null; const trimmed = String(raw).trim(); if (!trimmed) return null; let ms: number; try { ms = parseDurationMs(trimmed, { defaultUnit: "m" }); } catch { return null; } if (ms <= 0) return null; return ms; } export function resolveHeartbeatPrompt(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { return resolveHeartbeatPromptText(heartbeat?.prompt ?? cfg.agents?.defaults?.heartbeat?.prompt); } function resolveHeartbeatAckMaxChars(cfg: ClawdbotConfig, heartbeat?: HeartbeatConfig) { return Math.max( 0, heartbeat?.ackMaxChars ?? cfg.agents?.defaults?.heartbeat?.ackMaxChars ?? DEFAULT_HEARTBEAT_ACK_MAX_CHARS, ); } function resolveHeartbeatSession( cfg: ClawdbotConfig, agentId?: string, heartbeat?: HeartbeatConfig, ) { const sessionCfg = cfg.session; const scope = sessionCfg?.scope ?? "per-sender"; const resolvedAgentId = normalizeAgentId(agentId ?? resolveDefaultAgentId(cfg)); const mainSessionKey = scope === "global" ? "global" : resolveAgentMainSessionKey({ cfg, agentId: resolvedAgentId }); const storeAgentId = scope === "global" ? resolveDefaultAgentId(cfg) : resolvedAgentId; const storePath = resolveStorePath(sessionCfg?.store, { agentId: storeAgentId }); const store = loadSessionStore(storePath); const mainEntry = store[mainSessionKey]; if (scope === "global") { return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; } const trimmed = heartbeat?.session?.trim() ?? ""; if (!trimmed) { return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; } const normalized = trimmed.toLowerCase(); if (normalized === "main" || normalized === "global") { return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; } const candidate = toAgentStoreSessionKey({ agentId: resolvedAgentId, requestKey: trimmed, mainKey: cfg.session?.mainKey, }); const canonical = canonicalizeMainSessionAlias({ cfg, agentId: resolvedAgentId, sessionKey: candidate, }); if (canonical !== "global") { const sessionAgentId = resolveAgentIdFromSessionKey(canonical); if (sessionAgentId === normalizeAgentId(resolvedAgentId)) { return { sessionKey: canonical, storePath, store, entry: store[canonical] }; } } return { sessionKey: mainSessionKey, storePath, store, entry: mainEntry }; } function resolveHeartbeatReplyPayload( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload | undefined { if (!replyResult) return undefined; if (!Array.isArray(replyResult)) return replyResult; for (let idx = replyResult.length - 1; idx >= 0; idx -= 1) { const payload = replyResult[idx]; if (!payload) continue; if (payload.text || payload.mediaUrl || (payload.mediaUrls && payload.mediaUrls.length > 0)) { return payload; } } return undefined; } function resolveHeartbeatReasoningPayloads( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload[] { const payloads = Array.isArray(replyResult) ? replyResult : replyResult ? [replyResult] : []; return payloads.filter((payload) => { const text = typeof payload.text === "string" ? payload.text : ""; return text.trimStart().startsWith("Reasoning:"); }); } async function restoreHeartbeatUpdatedAt(params: { storePath: string; sessionKey: string; updatedAt?: number; }) { const { storePath, sessionKey, updatedAt } = params; if (typeof updatedAt !== "number") return; const store = loadSessionStore(storePath); const entry = store[sessionKey]; if (!entry) return; const nextUpdatedAt = Math.max(entry.updatedAt ?? 0, updatedAt); if (entry.updatedAt === nextUpdatedAt) return; await updateSessionStore(storePath, (nextStore) => { const nextEntry = nextStore[sessionKey] ?? entry; if (!nextEntry) return; const resolvedUpdatedAt = Math.max(nextEntry.updatedAt ?? 0, updatedAt); if (nextEntry.updatedAt === resolvedUpdatedAt) return; nextStore[sessionKey] = { ...nextEntry, updatedAt: resolvedUpdatedAt }; }); } function normalizeHeartbeatReply( payload: ReplyPayload, responsePrefix: string | undefined, ackMaxChars: number, ) { const stripped = stripHeartbeatToken(payload.text, { mode: "heartbeat", maxAckChars: ackMaxChars, }); const hasMedia = Boolean(payload.mediaUrl || (payload.mediaUrls?.length ?? 0) > 0); if (stripped.shouldSkip && !hasMedia) { return { shouldSkip: true, text: "", hasMedia, }; } let finalText = stripped.text; if (responsePrefix && finalText && !finalText.startsWith(responsePrefix)) { finalText = `${responsePrefix} ${finalText}`; } return { shouldSkip: false, text: finalText, hasMedia }; } export async function runHeartbeatOnce(opts: { cfg?: ClawdbotConfig; agentId?: string; heartbeat?: HeartbeatConfig; reason?: string; deps?: HeartbeatDeps; }): Promise { const cfg = opts.cfg ?? loadConfig(); const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg)); const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId); if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" }; } if (!isHeartbeatEnabledForAgent(cfg, agentId)) { return { status: "skipped", reason: "disabled" }; } if (!resolveHeartbeatIntervalMs(cfg, undefined, heartbeat)) { return { status: "skipped", reason: "disabled" }; } const startedAt = opts.deps?.nowMs?.() ?? Date.now(); if (!isWithinActiveHours(cfg, heartbeat, startedAt)) { return { status: "skipped", reason: "quiet-hours" }; } const queueSize = (opts.deps?.getQueueSize ?? getQueueSize)(CommandLane.Main); if (queueSize > 0) { return { status: "skipped", reason: "requests-in-flight" }; } // Skip heartbeat if HEARTBEAT.md exists but has no actionable content. // This saves API calls/costs when the file is effectively empty (only comments/headers). const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId); const heartbeatFilePath = path.join(workspaceDir, DEFAULT_HEARTBEAT_FILENAME); try { const heartbeatFileContent = await fs.readFile(heartbeatFilePath, "utf-8"); if (isHeartbeatContentEffectivelyEmpty(heartbeatFileContent)) { emitHeartbeatEvent({ status: "skipped", reason: "empty-heartbeat-file", durationMs: Date.now() - startedAt, }); return { status: "skipped", reason: "empty-heartbeat-file" }; } } catch { // File doesn't exist or can't be read - proceed with heartbeat. // The LLM prompt says "if it exists" so this is expected behavior. } const { entry, sessionKey, storePath } = resolveHeartbeatSession(cfg, agentId, heartbeat); const previousUpdatedAt = entry?.updatedAt; const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); const visibility = delivery.channel !== "none" ? resolveHeartbeatVisibility({ cfg, channel: delivery.channel, accountId: delivery.accountId, }) : { showOk: false, showAlerts: true, useIndicator: true }; const { sender } = resolveHeartbeatSenderContext({ cfg, entry, delivery }); const responsePrefix = resolveEffectiveMessagesConfig(cfg, agentId).responsePrefix; const prompt = resolveHeartbeatPrompt(cfg, heartbeat); const ctx = { Body: prompt, From: sender, To: sender, Provider: "heartbeat", SessionKey: sessionKey, }; if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) { emitHeartbeatEvent({ status: "skipped", reason: "alerts-disabled", durationMs: Date.now() - startedAt, channel: delivery.channel !== "none" ? delivery.channel : undefined, }); return { status: "skipped", reason: "alerts-disabled" }; } const heartbeatOkText = responsePrefix ? `${responsePrefix} ${HEARTBEAT_TOKEN}` : HEARTBEAT_TOKEN; const canAttemptHeartbeatOk = Boolean( visibility.showOk && delivery.channel !== "none" && delivery.to, ); const maybeSendHeartbeatOk = async () => { if (!canAttemptHeartbeatOk || delivery.channel === "none" || !delivery.to) return false; const heartbeatPlugin = getChannelPlugin(delivery.channel); if (heartbeatPlugin?.heartbeat?.checkReady) { const readiness = await heartbeatPlugin.heartbeat.checkReady({ cfg, accountId: delivery.accountId, deps: opts.deps, }); if (!readiness.ok) return false; } await deliverOutboundPayloads({ cfg, channel: delivery.channel, to: delivery.to, accountId: delivery.accountId, payloads: [{ text: heartbeatOkText }], deps: opts.deps, }); return true; }; try { const replyResult = await getReplyFromConfig(ctx, { isHeartbeat: true }, cfg); const replyPayload = resolveHeartbeatReplyPayload(replyResult); const includeReasoning = heartbeat?.includeReasoning === true; const reasoningPayloads = includeReasoning ? resolveHeartbeatReasoningPayloads(replyResult).filter((payload) => payload !== replyPayload) : []; if ( !replyPayload || (!replyPayload.text && !replyPayload.mediaUrl && !replyPayload.mediaUrls?.length) ) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-empty", reason: opts.reason, durationMs: Date.now() - startedAt, channel: delivery.channel !== "none" ? delivery.channel : undefined, silent: !okSent, indicatorType: visibility.useIndicator ? resolveIndicatorType("ok-empty") : undefined, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const ackMaxChars = resolveHeartbeatAckMaxChars(cfg, heartbeat); const normalized = normalizeHeartbeatReply(replyPayload, responsePrefix, ackMaxChars); const shouldSkipMain = normalized.shouldSkip && !normalized.hasMedia; if (shouldSkipMain && reasoningPayloads.length === 0) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); const okSent = await maybeSendHeartbeatOk(); emitHeartbeatEvent({ status: "ok-token", reason: opts.reason, durationMs: Date.now() - startedAt, channel: delivery.channel !== "none" ? delivery.channel : undefined, silent: !okSent, indicatorType: visibility.useIndicator ? resolveIndicatorType("ok-token") : undefined, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const mediaUrls = replyPayload.mediaUrls ?? (replyPayload.mediaUrl ? [replyPayload.mediaUrl] : []); // Suppress duplicate heartbeats (same payload) within a short window. // This prevents "nagging" when nothing changed but the model repeats the same items. const prevHeartbeatText = typeof entry?.lastHeartbeatText === "string" ? entry.lastHeartbeatText : ""; const prevHeartbeatAt = typeof entry?.lastHeartbeatSentAt === "number" ? entry.lastHeartbeatSentAt : undefined; const isDuplicateMain = !shouldSkipMain && !mediaUrls.length && Boolean(prevHeartbeatText.trim()) && normalized.text.trim() === prevHeartbeatText.trim() && typeof prevHeartbeatAt === "number" && startedAt - prevHeartbeatAt < 24 * 60 * 60 * 1000; if (isDuplicateMain) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt, }); emitHeartbeatEvent({ status: "skipped", reason: "duplicate", preview: normalized.text.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: false, channel: delivery.channel !== "none" ? delivery.channel : undefined, }); return { status: "ran", durationMs: Date.now() - startedAt }; } // Reasoning payloads are text-only; any attachments stay on the main reply. const previewText = shouldSkipMain ? reasoningPayloads .map((payload) => payload.text) .filter((text): text is string => Boolean(text?.trim())) .join("\n") : normalized.text; if (delivery.channel === "none" || !delivery.to) { emitHeartbeatEvent({ status: "skipped", reason: delivery.reason ?? "no-target", preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, }); return { status: "ran", durationMs: Date.now() - startedAt }; } if (!visibility.showAlerts) { await restoreHeartbeatUpdatedAt({ storePath, sessionKey, updatedAt: previousUpdatedAt }); emitHeartbeatEvent({ status: "skipped", reason: "alerts-disabled", preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, channel: delivery.channel, hasMedia: mediaUrls.length > 0, indicatorType: visibility.useIndicator ? resolveIndicatorType("sent") : undefined, }); return { status: "ran", durationMs: Date.now() - startedAt }; } const deliveryAccountId = delivery.accountId; const heartbeatPlugin = getChannelPlugin(delivery.channel); if (heartbeatPlugin?.heartbeat?.checkReady) { const readiness = await heartbeatPlugin.heartbeat.checkReady({ cfg, accountId: deliveryAccountId, deps: opts.deps, }); if (!readiness.ok) { emitHeartbeatEvent({ status: "skipped", reason: readiness.reason, preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, channel: delivery.channel, }); log.info("heartbeat: channel not ready", { channel: delivery.channel, reason: readiness.reason, }); return { status: "skipped", reason: readiness.reason }; } } await deliverOutboundPayloads({ cfg, channel: delivery.channel, to: delivery.to, accountId: deliveryAccountId, payloads: [ ...reasoningPayloads, ...(shouldSkipMain ? [] : [ { text: normalized.text, mediaUrls, }, ]), ], deps: opts.deps, }); // Record last delivered heartbeat payload for dedupe. if (!shouldSkipMain && normalized.text.trim()) { const store = loadSessionStore(storePath); const current = store[sessionKey]; if (current) { store[sessionKey] = { ...current, lastHeartbeatText: normalized.text, lastHeartbeatSentAt: startedAt, }; await saveSessionStore(storePath, store); } } emitHeartbeatEvent({ status: "sent", to: delivery.to, preview: previewText?.slice(0, 200), durationMs: Date.now() - startedAt, hasMedia: mediaUrls.length > 0, channel: delivery.channel, indicatorType: visibility.useIndicator ? resolveIndicatorType("sent") : undefined, }); return { status: "ran", durationMs: Date.now() - startedAt }; } catch (err) { const reason = formatErrorMessage(err); emitHeartbeatEvent({ status: "failed", reason, durationMs: Date.now() - startedAt, channel: delivery.channel !== "none" ? delivery.channel : undefined, indicatorType: visibility.useIndicator ? resolveIndicatorType("failed") : undefined, }); log.error(`heartbeat failed: ${reason}`, { error: reason }); return { status: "failed", reason }; } } export function startHeartbeatRunner(opts: { cfg?: ClawdbotConfig; runtime?: RuntimeEnv; abortSignal?: AbortSignal; runOnce?: typeof runHeartbeatOnce; }): HeartbeatRunner { const runtime = opts.runtime ?? defaultRuntime; const runOnce = opts.runOnce ?? runHeartbeatOnce; const state = { cfg: opts.cfg ?? loadConfig(), runtime, agents: new Map(), timer: null as NodeJS.Timeout | null, stopped: false, }; let initialized = false; const resolveNextDue = (now: number, intervalMs: number, prevState?: HeartbeatAgentState) => { if (typeof prevState?.lastRunMs === "number") { return prevState.lastRunMs + intervalMs; } if (prevState && prevState.intervalMs === intervalMs && prevState.nextDueMs > now) { return prevState.nextDueMs; } return now + intervalMs; }; const scheduleNext = () => { if (state.stopped) return; if (state.timer) { clearTimeout(state.timer); state.timer = null; } if (state.agents.size === 0) return; const now = Date.now(); let nextDue = Number.POSITIVE_INFINITY; for (const agent of state.agents.values()) { if (agent.nextDueMs < nextDue) nextDue = agent.nextDueMs; } if (!Number.isFinite(nextDue)) return; const delay = Math.max(0, nextDue - now); state.timer = setTimeout(() => { requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); }, delay); state.timer.unref?.(); }; const updateConfig = (cfg: ClawdbotConfig) => { if (state.stopped) return; const now = Date.now(); const prevAgents = state.agents; const prevEnabled = prevAgents.size > 0; const nextAgents = new Map(); const intervals: number[] = []; for (const agent of resolveHeartbeatAgents(cfg)) { const intervalMs = resolveHeartbeatIntervalMs(cfg, undefined, agent.heartbeat); if (!intervalMs) continue; intervals.push(intervalMs); const prevState = prevAgents.get(agent.agentId); const nextDueMs = resolveNextDue(now, intervalMs, prevState); nextAgents.set(agent.agentId, { agentId: agent.agentId, heartbeat: agent.heartbeat, intervalMs, lastRunMs: prevState?.lastRunMs, nextDueMs, }); } state.cfg = cfg; state.agents = nextAgents; const nextEnabled = nextAgents.size > 0; if (!initialized) { if (!nextEnabled) { log.info("heartbeat: disabled", { enabled: false }); } else { log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); } initialized = true; } else if (prevEnabled !== nextEnabled) { if (!nextEnabled) { log.info("heartbeat: disabled", { enabled: false }); } else { log.info("heartbeat: started", { intervalMs: Math.min(...intervals) }); } } scheduleNext(); }; const run: HeartbeatWakeHandler = async (params) => { if (!heartbeatsEnabled) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } if (state.agents.size === 0) { return { status: "skipped", reason: "disabled" } satisfies HeartbeatRunResult; } const reason = params?.reason; const isInterval = reason === "interval"; const startedAt = Date.now(); const now = startedAt; let ran = false; for (const agent of state.agents.values()) { if (isInterval && now < agent.nextDueMs) { continue; } const res = await runOnce({ cfg: state.cfg, agentId: agent.agentId, heartbeat: agent.heartbeat, reason, deps: { runtime: state.runtime }, }); if (res.status === "skipped" && res.reason === "requests-in-flight") { return res; } if (res.status !== "skipped" || res.reason !== "disabled") { agent.lastRunMs = now; agent.nextDueMs = now + agent.intervalMs; } if (res.status === "ran") ran = true; } scheduleNext(); if (ran) return { status: "ran", durationMs: Date.now() - startedAt }; return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); updateConfig(state.cfg); const cleanup = () => { state.stopped = true; setHeartbeatWakeHandler(null); if (state.timer) clearTimeout(state.timer); state.timer = null; }; opts.abortSignal?.addEventListener("abort", cleanup, { once: true }); return { stop: cleanup, updateConfig }; }