Merge pull request #666 from roshanasingh4/fix/652-cron-wakeMode-now-waits-for-agent
[AI-assisted] fix(cron): wait for heartbeat to complete when wakeMode is "now"
This commit is contained in:
commit
7ac628a697
@ -10,6 +10,7 @@
|
|||||||
|
|
||||||
### Fixes
|
### Fixes
|
||||||
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
|
- Auto-reply: prefer `RawBody` for command/directive parsing (WhatsApp + Discord) and prevent fallback runs from clobbering concurrent session updates. (#643) — thanks @mcinteerj.
|
||||||
|
- Cron: `wakeMode: "now"` waits for heartbeat completion (and retries when the main lane is busy). (#666) — thanks @roshanasingh4.
|
||||||
- Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”).
|
- Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”).
|
||||||
- Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage.
|
- Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage.
|
||||||
- Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh.
|
- Auth: throttle external CLI credential syncs (Claude/Codex), reduce Keychain reads, and skip sync when cached credentials are still fresh.
|
||||||
@ -32,6 +33,7 @@
|
|||||||
- Tests/Agents: add regression coverage for workspace tool path resolution and bash cwd defaults.
|
- Tests/Agents: add regression coverage for workspace tool path resolution and bash cwd defaults.
|
||||||
- iOS/Android: enable stricter concurrency/lint checks; fix Swift 6 strict concurrency issues + Android lint errors (ExifInterface, obsolete SDK check). (#662) — thanks @KristijanJovanovski.
|
- iOS/Android: enable stricter concurrency/lint checks; fix Swift 6 strict concurrency issues + Android lint errors (ExifInterface, obsolete SDK check). (#662) — thanks @KristijanJovanovski.
|
||||||
- iOS/macOS: share `AsyncTimeout`, require explicit `bridgeStableID` on connect, and harden tool display defaults (avoids missing-resource label fallbacks).
|
- iOS/macOS: share `AsyncTimeout`, require explicit `bridgeStableID` on connect, and harden tool display defaults (avoids missing-resource label fallbacks).
|
||||||
|
- Telegram: serialize media-group processing to avoid missed albums under load.
|
||||||
- Docs: showcase entries for ParentPay, R2 Upload, iOS TestFlight, and Oura Health. (#650) — thanks @henrino3.
|
- Docs: showcase entries for ParentPay, R2 Upload, iOS TestFlight, and Oura Health. (#650) — thanks @henrino3.
|
||||||
|
|
||||||
## 2026.1.9
|
## 2026.1.9
|
||||||
|
|||||||
@ -387,15 +387,15 @@ enum GatewayEnvironment {
|
|||||||
|
|
||||||
private static func bundledGatewayStatusMessage(
|
private static func bundledGatewayStatusMessage(
|
||||||
gatewayVersion: String,
|
gatewayVersion: String,
|
||||||
nodeVersion: String?
|
nodeVersion: String?) -> String
|
||||||
) -> String {
|
{
|
||||||
"\(self.bundledGatewayLabel) \(gatewayVersion) (node \(nodeVersion ?? "unknown"))"
|
"\(self.bundledGatewayLabel) \(gatewayVersion) (node \(nodeVersion ?? "unknown"))"
|
||||||
}
|
}
|
||||||
|
|
||||||
private static func bundledGatewayIncompatibleMessage(
|
private static func bundledGatewayIncompatibleMessage(
|
||||||
installed: Semver,
|
installed: Semver,
|
||||||
expected: Semver
|
expected: Semver) -> String
|
||||||
) -> String {
|
{
|
||||||
"\(self.bundledGatewayLabel) \(installed.description) is incompatible with app " +
|
"\(self.bundledGatewayLabel) \(installed.description) is incompatible with app " +
|
||||||
"\(expected.description); rebuild the app bundle."
|
"\(expected.description); rebuild the app bundle."
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import path from "node:path";
|
|||||||
|
|
||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
|
||||||
import { CronService } from "./service.js";
|
import { CronService } from "./service.js";
|
||||||
|
|
||||||
const noopLogger = {
|
const noopLogger = {
|
||||||
@ -78,6 +79,68 @@ describe("CronService", () => {
|
|||||||
await store.cleanup();
|
await store.cleanup();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("wakeMode now waits for heartbeat completion when available", async () => {
|
||||||
|
const store = await makeStorePath();
|
||||||
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
const requestHeartbeatNow = vi.fn();
|
||||||
|
|
||||||
|
let now = 0;
|
||||||
|
const nowMs = () => {
|
||||||
|
now += 10;
|
||||||
|
return now;
|
||||||
|
};
|
||||||
|
|
||||||
|
let resolveHeartbeat: ((res: HeartbeatRunResult) => void) | null = null;
|
||||||
|
const runHeartbeatOnce = vi.fn(
|
||||||
|
async () =>
|
||||||
|
await new Promise<HeartbeatRunResult>((resolve) => {
|
||||||
|
resolveHeartbeat = resolve;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const cron = new CronService({
|
||||||
|
storePath: store.storePath,
|
||||||
|
cronEnabled: true,
|
||||||
|
log: noopLogger,
|
||||||
|
nowMs,
|
||||||
|
enqueueSystemEvent,
|
||||||
|
requestHeartbeatNow,
|
||||||
|
runHeartbeatOnce,
|
||||||
|
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
|
||||||
|
});
|
||||||
|
|
||||||
|
await cron.start();
|
||||||
|
const job = await cron.add({
|
||||||
|
name: "wakeMode now waits",
|
||||||
|
enabled: true,
|
||||||
|
schedule: { kind: "at", atMs: 1 },
|
||||||
|
sessionTarget: "main",
|
||||||
|
wakeMode: "now",
|
||||||
|
payload: { kind: "systemEvent", text: "hello" },
|
||||||
|
});
|
||||||
|
|
||||||
|
const runPromise = cron.run(job.id, "force");
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
if (runHeartbeatOnce.mock.calls.length > 0) break;
|
||||||
|
// Let the locked() chain progress.
|
||||||
|
await Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(runHeartbeatOnce).toHaveBeenCalledTimes(1);
|
||||||
|
expect(requestHeartbeatNow).not.toHaveBeenCalled();
|
||||||
|
expect(enqueueSystemEvent).toHaveBeenCalledWith("hello");
|
||||||
|
expect(job.state.runningAtMs).toBeTypeOf("number");
|
||||||
|
|
||||||
|
resolveHeartbeat?.({ status: "ran", durationMs: 123 });
|
||||||
|
await runPromise;
|
||||||
|
|
||||||
|
expect(job.state.lastStatus).toBe("ok");
|
||||||
|
expect(job.state.lastDurationMs).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
cron.stop();
|
||||||
|
await store.cleanup();
|
||||||
|
});
|
||||||
|
|
||||||
it("runs an isolated job and posts summary to main", async () => {
|
it("runs an isolated job and posts summary to main", async () => {
|
||||||
const store = await makeStorePath();
|
const store = await makeStorePath();
|
||||||
const enqueueSystemEvent = vi.fn();
|
const enqueueSystemEvent = vi.fn();
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import crypto from "node:crypto";
|
import crypto from "node:crypto";
|
||||||
|
|
||||||
|
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
|
||||||
import { truncateUtf16Safe } from "../utils.js";
|
import { truncateUtf16Safe } from "../utils.js";
|
||||||
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
import { migrateLegacyCronPayload } from "./payload-migration.js";
|
||||||
import { computeNextRunAtMs } from "./schedule.js";
|
import { computeNextRunAtMs } from "./schedule.js";
|
||||||
@ -37,6 +38,9 @@ export type CronServiceDeps = {
|
|||||||
cronEnabled: boolean;
|
cronEnabled: boolean;
|
||||||
enqueueSystemEvent: (text: string) => void;
|
enqueueSystemEvent: (text: string) => void;
|
||||||
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
requestHeartbeatNow: (opts?: { reason?: string }) => void;
|
||||||
|
runHeartbeatOnce?: (opts?: {
|
||||||
|
reason?: string;
|
||||||
|
}) => Promise<HeartbeatRunResult>;
|
||||||
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
|
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<{
|
||||||
status: "ok" | "error" | "skipped";
|
status: "ok" | "error" | "skipped";
|
||||||
summary?: string;
|
summary?: string;
|
||||||
@ -45,6 +49,10 @@ export type CronServiceDeps = {
|
|||||||
onEvent?: (evt: CronEvent) => void;
|
onEvent?: (evt: CronEvent) => void;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type CronServiceDepsInternal = Omit<CronServiceDeps, "nowMs"> & {
|
||||||
|
nowMs: () => number;
|
||||||
|
};
|
||||||
|
|
||||||
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
|
||||||
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
|
const MAX_TIMEOUT_MS = 2 ** 31 - 1;
|
||||||
|
|
||||||
@ -99,8 +107,7 @@ function normalizePayloadToSystemText(payload: CronPayload) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export class CronService {
|
export class CronService {
|
||||||
private readonly deps: Required<Omit<CronServiceDeps, "onEvent">> &
|
private readonly deps: CronServiceDepsInternal;
|
||||||
Pick<CronServiceDeps, "onEvent">;
|
|
||||||
private store: CronStoreFile | null = null;
|
private store: CronStoreFile | null = null;
|
||||||
private timer: NodeJS.Timeout | null = null;
|
private timer: NodeJS.Timeout | null = null;
|
||||||
private running = false;
|
private running = false;
|
||||||
@ -111,7 +118,6 @@ export class CronService {
|
|||||||
this.deps = {
|
this.deps = {
|
||||||
...deps,
|
...deps,
|
||||||
nowMs: deps.nowMs ?? (() => Date.now()),
|
nowMs: deps.nowMs ?? (() => Date.now()),
|
||||||
onEvent: deps.onEvent,
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,10 +520,42 @@ export class CronService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.deps.enqueueSystemEvent(text);
|
this.deps.enqueueSystemEvent(text);
|
||||||
if (job.wakeMode === "now") {
|
if (job.wakeMode === "now" && this.deps.runHeartbeatOnce) {
|
||||||
|
const reason = `cron:${job.id}`;
|
||||||
|
const delay = (ms: number) =>
|
||||||
|
new Promise<void>((resolve) => setTimeout(resolve, ms));
|
||||||
|
const maxWaitMs = 2 * 60_000;
|
||||||
|
const waitStartedAt = this.deps.nowMs();
|
||||||
|
|
||||||
|
let heartbeatResult: HeartbeatRunResult;
|
||||||
|
for (;;) {
|
||||||
|
heartbeatResult = await this.deps.runHeartbeatOnce({ reason });
|
||||||
|
if (
|
||||||
|
heartbeatResult.status !== "skipped" ||
|
||||||
|
heartbeatResult.reason !== "requests-in-flight"
|
||||||
|
) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (this.deps.nowMs() - waitStartedAt > maxWaitMs) {
|
||||||
|
heartbeatResult = {
|
||||||
|
status: "skipped",
|
||||||
|
reason: "timeout waiting for main lane to become idle",
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
await delay(250);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (heartbeatResult.status === "ran") {
|
||||||
|
await finish("ok", undefined, text);
|
||||||
|
} else if (heartbeatResult.status === "skipped")
|
||||||
|
await finish("skipped", heartbeatResult.reason, text);
|
||||||
|
else await finish("error", heartbeatResult.reason, text);
|
||||||
|
} else {
|
||||||
|
// wakeMode is "next-heartbeat" or runHeartbeatOnce not available
|
||||||
this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
this.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
||||||
|
await finish("ok", undefined, text);
|
||||||
}
|
}
|
||||||
await finish("ok", undefined, text);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -19,10 +19,12 @@ installGatewayTestHooks();
|
|||||||
describe("gateway server models + voicewake", () => {
|
describe("gateway server models + voicewake", () => {
|
||||||
const setTempHome = (homeDir: string) => {
|
const setTempHome = (homeDir: string) => {
|
||||||
const prevHome = process.env.HOME;
|
const prevHome = process.env.HOME;
|
||||||
|
const prevStateDir = process.env.CLAWDBOT_STATE_DIR;
|
||||||
const prevUserProfile = process.env.USERPROFILE;
|
const prevUserProfile = process.env.USERPROFILE;
|
||||||
const prevHomeDrive = process.env.HOMEDRIVE;
|
const prevHomeDrive = process.env.HOMEDRIVE;
|
||||||
const prevHomePath = process.env.HOMEPATH;
|
const prevHomePath = process.env.HOMEPATH;
|
||||||
process.env.HOME = homeDir;
|
process.env.HOME = homeDir;
|
||||||
|
process.env.CLAWDBOT_STATE_DIR = path.join(homeDir, ".clawdbot");
|
||||||
process.env.USERPROFILE = homeDir;
|
process.env.USERPROFILE = homeDir;
|
||||||
if (process.platform === "win32") {
|
if (process.platform === "win32") {
|
||||||
const parsed = path.parse(homeDir);
|
const parsed = path.parse(homeDir);
|
||||||
@ -35,6 +37,11 @@ describe("gateway server models + voicewake", () => {
|
|||||||
} else {
|
} else {
|
||||||
process.env.HOME = prevHome;
|
process.env.HOME = prevHome;
|
||||||
}
|
}
|
||||||
|
if (prevStateDir === undefined) {
|
||||||
|
delete process.env.CLAWDBOT_STATE_DIR;
|
||||||
|
} else {
|
||||||
|
process.env.CLAWDBOT_STATE_DIR = prevStateDir;
|
||||||
|
}
|
||||||
if (prevUserProfile === undefined) {
|
if (prevUserProfile === undefined) {
|
||||||
delete process.env.USERPROFILE;
|
delete process.env.USERPROFILE;
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -61,7 +61,10 @@ import { startNodeBridgeServer } from "../infra/bridge/server.js";
|
|||||||
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
|
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
|
||||||
import { GatewayLockError } from "../infra/gateway-lock.js";
|
import { GatewayLockError } from "../infra/gateway-lock.js";
|
||||||
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||||
import { startHeartbeatRunner } from "../infra/heartbeat-runner.js";
|
import {
|
||||||
|
runHeartbeatOnce,
|
||||||
|
startHeartbeatRunner,
|
||||||
|
} from "../infra/heartbeat-runner.js";
|
||||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||||
import { getMachineDisplayName } from "../infra/machine-name.js";
|
import { getMachineDisplayName } from "../infra/machine-name.js";
|
||||||
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
|
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
|
||||||
@ -715,6 +718,14 @@ export async function startGatewayServer(
|
|||||||
enqueueSystemEvent(text, { sessionKey: resolveMainSessionKey(cfg) });
|
enqueueSystemEvent(text, { sessionKey: resolveMainSessionKey(cfg) });
|
||||||
},
|
},
|
||||||
requestHeartbeatNow,
|
requestHeartbeatNow,
|
||||||
|
runHeartbeatOnce: async (opts) => {
|
||||||
|
const runtimeConfig = loadConfig();
|
||||||
|
return await runHeartbeatOnce({
|
||||||
|
cfg: runtimeConfig,
|
||||||
|
reason: opts?.reason,
|
||||||
|
deps: { ...deps, runtime: defaultRuntime },
|
||||||
|
});
|
||||||
|
},
|
||||||
runIsolatedAgentJob: async ({ job, message }) => {
|
runIsolatedAgentJob: async ({ job, message }) => {
|
||||||
const runtimeConfig = loadConfig();
|
const runtimeConfig = loadConfig();
|
||||||
return await runCronIsolatedAgentTurn({
|
return await runCronIsolatedAgentTurn({
|
||||||
|
|||||||
@ -238,6 +238,7 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
|
||||||
|
let mediaGroupProcessing: Promise<void> = Promise.resolve();
|
||||||
|
|
||||||
const cfg = opts.config ?? loadConfig();
|
const cfg = opts.config ?? loadConfig();
|
||||||
const account = resolveTelegramAccount({
|
const account = resolveTelegramAccount({
|
||||||
@ -1228,14 +1229,24 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
|||||||
existing.messages.push({ msg, ctx });
|
existing.messages.push({ msg, ctx });
|
||||||
existing.timer = setTimeout(async () => {
|
existing.timer = setTimeout(async () => {
|
||||||
mediaGroupBuffer.delete(mediaGroupId);
|
mediaGroupBuffer.delete(mediaGroupId);
|
||||||
await processMediaGroup(existing);
|
mediaGroupProcessing = mediaGroupProcessing
|
||||||
|
.then(async () => {
|
||||||
|
await processMediaGroup(existing);
|
||||||
|
})
|
||||||
|
.catch(() => undefined);
|
||||||
|
await mediaGroupProcessing;
|
||||||
}, MEDIA_GROUP_TIMEOUT_MS);
|
}, MEDIA_GROUP_TIMEOUT_MS);
|
||||||
} else {
|
} else {
|
||||||
const entry: MediaGroupEntry = {
|
const entry: MediaGroupEntry = {
|
||||||
messages: [{ msg, ctx }],
|
messages: [{ msg, ctx }],
|
||||||
timer: setTimeout(async () => {
|
timer: setTimeout(async () => {
|
||||||
mediaGroupBuffer.delete(mediaGroupId);
|
mediaGroupBuffer.delete(mediaGroupId);
|
||||||
await processMediaGroup(entry);
|
mediaGroupProcessing = mediaGroupProcessing
|
||||||
|
.then(async () => {
|
||||||
|
await processMediaGroup(entry);
|
||||||
|
})
|
||||||
|
.catch(() => undefined);
|
||||||
|
await mediaGroupProcessing;
|
||||||
}, MEDIA_GROUP_TIMEOUT_MS),
|
}, MEDIA_GROUP_TIMEOUT_MS),
|
||||||
};
|
};
|
||||||
mediaGroupBuffer.set(mediaGroupId, entry);
|
mediaGroupBuffer.set(mediaGroupId, entry);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user