Merge 3c2f882053 into 4583f88626
This commit is contained in:
commit
a804179639
@ -4,6 +4,7 @@ import os from "node:os";
|
|||||||
import path from "node:path";
|
import path from "node:path";
|
||||||
import WebSocket from "ws";
|
import WebSocket from "ws";
|
||||||
|
|
||||||
|
import { registerChild } from "../infra/child-registry.js";
|
||||||
import { ensurePortAvailable } from "../infra/ports.js";
|
import { ensurePortAvailable } from "../infra/ports.js";
|
||||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
import { CONFIG_DIR } from "../utils.js";
|
import { CONFIG_DIR } from "../utils.js";
|
||||||
@ -263,6 +264,9 @@ export async function launchClawdChrome(
|
|||||||
}
|
}
|
||||||
|
|
||||||
const proc = spawnOnce();
|
const proc = spawnOnce();
|
||||||
|
// Register with child registry (managedExternally: true because browser has its own close logic)
|
||||||
|
// Note: Only register the main proc, NOT the bootstrap process (which is short-lived)
|
||||||
|
registerChild("chrome-browser", proc, { managedExternally: true });
|
||||||
// Wait for CDP to come up.
|
// Wait for CDP to come up.
|
||||||
const readyDeadline = Date.now() + 15_000;
|
const readyDeadline = Date.now() + 15_000;
|
||||||
while (Date.now() < readyDeadline) {
|
while (Date.now() < readyDeadline) {
|
||||||
|
|||||||
48
src/cli/gateway-cli/backoff.test.ts
Normal file
48
src/cli/gateway-cli/backoff.test.ts
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
// src/cli/gateway-cli/backoff.test.ts
|
||||||
|
import { describe, it, expect } from "vitest";
|
||||||
|
import { calculateBackoffMs, applyJitter } from "./backoff.js";
|
||||||
|
|
||||||
|
describe("calculateBackoffMs", () => {
|
||||||
|
it("returns 0 for zero consecutive failures", () => {
|
||||||
|
expect(calculateBackoffMs(0)).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns 2000ms for first failure", () => {
|
||||||
|
expect(calculateBackoffMs(1)).toBe(2000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns 4000ms for second failure", () => {
|
||||||
|
expect(calculateBackoffMs(2)).toBe(4000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns 32000ms for fifth failure", () => {
|
||||||
|
expect(calculateBackoffMs(5)).toBe(32000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("caps at 60000ms for high failure counts", () => {
|
||||||
|
expect(calculateBackoffMs(10)).toBe(60000);
|
||||||
|
expect(calculateBackoffMs(100)).toBe(60000);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("applyJitter", () => {
|
||||||
|
it("returns 0 for 0 input", () => {
|
||||||
|
expect(applyJitter(0)).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns value within +/- 10% of input", () => {
|
||||||
|
const input = 10000;
|
||||||
|
const minExpected = Math.floor(input * 0.9);
|
||||||
|
const maxExpected = Math.ceil(input * 1.1);
|
||||||
|
for (let i = 0; i < 100; i++) {
|
||||||
|
const result = applyJitter(input);
|
||||||
|
expect(result).toBeGreaterThanOrEqual(minExpected);
|
||||||
|
expect(result).toBeLessThanOrEqual(maxExpected);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns an integer", () => {
|
||||||
|
const result = applyJitter(2000);
|
||||||
|
expect(Number.isInteger(result)).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
30
src/cli/gateway-cli/backoff.ts
Normal file
30
src/cli/gateway-cli/backoff.ts
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
// src/cli/gateway-cli/backoff.ts
|
||||||
|
|
||||||
|
const BASE_BACKOFF_MS = 2000;
|
||||||
|
const MAX_BACKOFF_MS = 60_000;
|
||||||
|
const JITTER_FACTOR = 0.1; // +/- 10%
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate exponential backoff based on consecutive failures.
|
||||||
|
* Formula: min(BASE * 2^(failures-1), MAX)
|
||||||
|
*
|
||||||
|
* @param consecutiveFailures - Number of consecutive failures (0 = no backoff)
|
||||||
|
* @returns Backoff duration in milliseconds
|
||||||
|
*/
|
||||||
|
export function calculateBackoffMs(consecutiveFailures: number): number {
|
||||||
|
if (consecutiveFailures <= 0) return 0;
|
||||||
|
const exponential = BASE_BACKOFF_MS * Math.pow(2, consecutiveFailures - 1);
|
||||||
|
return Math.min(exponential, MAX_BACKOFF_MS);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Apply random jitter to a backoff value to prevent thundering herd.
|
||||||
|
*
|
||||||
|
* @param baseMs - Base backoff in milliseconds
|
||||||
|
* @returns Backoff with +/- 10% jitter applied, rounded to integer
|
||||||
|
*/
|
||||||
|
export function applyJitter(baseMs: number): number {
|
||||||
|
if (baseMs <= 0) return 0;
|
||||||
|
const jitter = (Math.random() - 0.5) * 2 * JITTER_FACTOR * baseMs;
|
||||||
|
return Math.round(baseMs + jitter);
|
||||||
|
}
|
||||||
109
src/cli/gateway-cli/crash-tracker.test.ts
Normal file
109
src/cli/gateway-cli/crash-tracker.test.ts
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
// src/cli/gateway-cli/crash-tracker.test.ts
|
||||||
|
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||||
|
import {
|
||||||
|
recordCrash,
|
||||||
|
getRecentCrashes,
|
||||||
|
getCrashesInLastHour,
|
||||||
|
clearCrashes,
|
||||||
|
classifyError,
|
||||||
|
} from "./crash-tracker.js";
|
||||||
|
|
||||||
|
describe("crash-tracker", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
clearCrashes();
|
||||||
|
vi.useFakeTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("records a crash with timestamp", () => {
|
||||||
|
vi.setSystemTime(new Date("2026-01-29T12:00:00Z"));
|
||||||
|
recordCrash({
|
||||||
|
errorType: "fetch_failed",
|
||||||
|
errorMessage: "ECONNREFUSED",
|
||||||
|
uptimeMs: 5000,
|
||||||
|
backoffMs: 2000,
|
||||||
|
consecutiveFailures: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
const crashes = getRecentCrashes();
|
||||||
|
expect(crashes).toHaveLength(1);
|
||||||
|
expect(crashes[0].timestamp).toBe(Date.now());
|
||||||
|
expect(crashes[0].errorType).toBe("fetch_failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("limits to MAX_CRASH_HISTORY entries", () => {
|
||||||
|
for (let i = 0; i < 25; i++) {
|
||||||
|
recordCrash({
|
||||||
|
errorType: "network_error",
|
||||||
|
errorMessage: `Error ${i}`,
|
||||||
|
uptimeMs: 0,
|
||||||
|
backoffMs: 2000,
|
||||||
|
consecutiveFailures: i + 1,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const crashes = getRecentCrashes();
|
||||||
|
expect(crashes).toHaveLength(20);
|
||||||
|
expect(crashes[0].errorMessage).toBe("Error 5"); // First 5 were dropped
|
||||||
|
});
|
||||||
|
|
||||||
|
it("counts crashes in last hour correctly", () => {
|
||||||
|
vi.setSystemTime(new Date("2026-01-29T12:00:00Z"));
|
||||||
|
recordCrash({
|
||||||
|
errorType: "fetch_failed",
|
||||||
|
errorMessage: "a",
|
||||||
|
uptimeMs: 0,
|
||||||
|
backoffMs: 2000,
|
||||||
|
consecutiveFailures: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.setSystemTime(new Date("2026-01-29T12:30:00Z"));
|
||||||
|
recordCrash({
|
||||||
|
errorType: "fetch_failed",
|
||||||
|
errorMessage: "b",
|
||||||
|
uptimeMs: 0,
|
||||||
|
backoffMs: 4000,
|
||||||
|
consecutiveFailures: 2,
|
||||||
|
});
|
||||||
|
|
||||||
|
// At 13:29:59, "a" is 89 min old (outside), "b" is 59 min old (inside)
|
||||||
|
vi.setSystemTime(new Date("2026-01-29T13:29:59Z"));
|
||||||
|
expect(getCrashesInLastHour()).toBe(1); // Only "b" is within last hour
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("classifyError", () => {
|
||||||
|
it("classifies fetch failed errors", () => {
|
||||||
|
expect(classifyError(new Error("TypeError: fetch failed"))).toBe("fetch_failed");
|
||||||
|
expect(classifyError(new Error("connect ECONNREFUSED 127.0.0.1:443"))).toBe("fetch_failed");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("classifies network errors", () => {
|
||||||
|
expect(classifyError(new Error("read ECONNRESET"))).toBe("network_error");
|
||||||
|
expect(classifyError(new Error("connect ETIMEDOUT"))).toBe("network_error");
|
||||||
|
expect(classifyError(new Error("network unreachable"))).toBe("network_error");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("classifies startup errors", () => {
|
||||||
|
expect(classifyError(new Error("startup failed: missing config"))).toBe("startup_error");
|
||||||
|
expect(classifyError(new Error("init error: bad credentials"))).toBe("startup_error");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("defaults to runtime_error for unrecognized errors", () => {
|
||||||
|
expect(classifyError(new Error("something went wrong"))).toBe("runtime_error");
|
||||||
|
expect(classifyError(new Error("unexpected condition"))).toBe("runtime_error");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles null/undefined safely", () => {
|
||||||
|
expect(classifyError(null)).toBe("unknown");
|
||||||
|
expect(classifyError(undefined)).toBe("unknown");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles non-Error objects", () => {
|
||||||
|
expect(classifyError("string error")).toBe("runtime_error");
|
||||||
|
expect(classifyError({ message: "object error" })).toBe("runtime_error");
|
||||||
|
});
|
||||||
|
});
|
||||||
102
src/cli/gateway-cli/crash-tracker.ts
Normal file
102
src/cli/gateway-cli/crash-tracker.ts
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
// src/cli/gateway-cli/crash-tracker.ts
|
||||||
|
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||||
|
import { isTransientNetworkError } from "../../infra/unhandled-rejections.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("gateway");
|
||||||
|
|
||||||
|
export type CrashErrorType =
|
||||||
|
| "fetch_failed"
|
||||||
|
| "network_error"
|
||||||
|
| "startup_error"
|
||||||
|
| "runtime_error"
|
||||||
|
| "unknown";
|
||||||
|
|
||||||
|
export type CrashRecord = {
|
||||||
|
timestamp: number;
|
||||||
|
errorType: CrashErrorType;
|
||||||
|
errorMessage: string;
|
||||||
|
uptimeMs: number;
|
||||||
|
backoffMs: number;
|
||||||
|
consecutiveFailures: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const MAX_CRASH_HISTORY = 20;
|
||||||
|
const recentCrashes: CrashRecord[] = [];
|
||||||
|
|
||||||
|
export function recordCrash(record: Omit<CrashRecord, "timestamp">): void {
|
||||||
|
const full: CrashRecord = { ...record, timestamp: Date.now() };
|
||||||
|
recentCrashes.push(full);
|
||||||
|
if (recentCrashes.length > MAX_CRASH_HISTORY) {
|
||||||
|
recentCrashes.shift();
|
||||||
|
}
|
||||||
|
|
||||||
|
log.error("gateway_crash", {
|
||||||
|
errorType: record.errorType,
|
||||||
|
errorMessage: record.errorMessage,
|
||||||
|
uptimeMs: record.uptimeMs,
|
||||||
|
backoffMs: record.backoffMs,
|
||||||
|
consecutiveFailures: record.consecutiveFailures,
|
||||||
|
crashesInLastHour: getCrashesInLastHour(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getRecentCrashes(): readonly CrashRecord[] {
|
||||||
|
return recentCrashes;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getCrashesInLastHour(): number {
|
||||||
|
const oneHourAgo = Date.now() - 3600_000;
|
||||||
|
return recentCrashes.filter((c) => c.timestamp > oneHourAgo).length;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function clearCrashes(): void {
|
||||||
|
recentCrashes.length = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Network error patterns to match in error messages
|
||||||
|
const FETCH_FAILED_PATTERNS = ["fetch failed", "econnrefused"];
|
||||||
|
const NETWORK_ERROR_PATTERNS = [
|
||||||
|
"econnreset",
|
||||||
|
"etimedout",
|
||||||
|
"enotfound",
|
||||||
|
"ehostunreach",
|
||||||
|
"enetunreach",
|
||||||
|
"network unreachable",
|
||||||
|
"socket hang up",
|
||||||
|
];
|
||||||
|
|
||||||
|
function getErrorMessage(err: unknown): string {
|
||||||
|
if (err instanceof Error) return err.message;
|
||||||
|
if (typeof err === "string") return err;
|
||||||
|
if (err && typeof err === "object" && "message" in err && typeof err.message === "string") {
|
||||||
|
return err.message;
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function classifyError(err: unknown): CrashErrorType {
|
||||||
|
if (!err) return "unknown";
|
||||||
|
|
||||||
|
const message = getErrorMessage(err).toLowerCase();
|
||||||
|
|
||||||
|
// Use existing transient network detection for consistency
|
||||||
|
if (isTransientNetworkError(err)) {
|
||||||
|
if (FETCH_FAILED_PATTERNS.some((p) => message.includes(p))) {
|
||||||
|
return "fetch_failed";
|
||||||
|
}
|
||||||
|
return "network_error";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also check message patterns for errors without proper error codes
|
||||||
|
if (FETCH_FAILED_PATTERNS.some((p) => message.includes(p))) {
|
||||||
|
return "fetch_failed";
|
||||||
|
}
|
||||||
|
if (NETWORK_ERROR_PATTERNS.some((p) => message.includes(p))) {
|
||||||
|
return "network_error";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message.includes("startup") || message.includes("init")) {
|
||||||
|
return "startup_error";
|
||||||
|
}
|
||||||
|
return "runtime_error";
|
||||||
|
}
|
||||||
@ -4,8 +4,12 @@ import {
|
|||||||
consumeGatewaySigusr1RestartAuthorization,
|
consumeGatewaySigusr1RestartAuthorization,
|
||||||
isGatewaySigusr1RestartExternallyAllowed,
|
isGatewaySigusr1RestartExternallyAllowed,
|
||||||
} from "../../infra/restart.js";
|
} from "../../infra/restart.js";
|
||||||
|
import { isTransientNetworkError } from "../../infra/unhandled-rejections.js";
|
||||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||||
import type { defaultRuntime } from "../../runtime.js";
|
import type { defaultRuntime } from "../../runtime.js";
|
||||||
|
import { calculateBackoffMs, applyJitter } from "./backoff.js";
|
||||||
|
import { recordCrash, classifyError } from "./crash-tracker.js";
|
||||||
|
import { killAllChildrenSync } from "../../infra/child-registry.js";
|
||||||
|
|
||||||
const gatewayLog = createSubsystemLogger("gateway");
|
const gatewayLog = createSubsystemLogger("gateway");
|
||||||
|
|
||||||
@ -18,7 +22,7 @@ export async function runGatewayLoop(params: {
|
|||||||
const lock = await acquireGatewayLock();
|
const lock = await acquireGatewayLock();
|
||||||
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
|
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
|
||||||
let shuttingDown = false;
|
let shuttingDown = false;
|
||||||
let restartResolver: (() => void) | null = null;
|
let restartResolver: ((reason: { isUserInitiated: boolean }) => void) | null = null;
|
||||||
|
|
||||||
const cleanupSignals = () => {
|
const cleanupSignals = () => {
|
||||||
process.removeListener("SIGTERM", onSigterm);
|
process.removeListener("SIGTERM", onSigterm);
|
||||||
@ -54,7 +58,7 @@ export async function runGatewayLoop(params: {
|
|||||||
server = null;
|
server = null;
|
||||||
if (isRestart) {
|
if (isRestart) {
|
||||||
shuttingDown = false;
|
shuttingDown = false;
|
||||||
restartResolver?.();
|
restartResolver?.({ isUserInitiated: action === "restart" });
|
||||||
} else {
|
} else {
|
||||||
cleanupSignals();
|
cleanupSignals();
|
||||||
params.runtime.exit(0);
|
params.runtime.exit(0);
|
||||||
@ -87,15 +91,83 @@ export async function runGatewayLoop(params: {
|
|||||||
process.on("SIGINT", onSigint);
|
process.on("SIGINT", onSigint);
|
||||||
process.on("SIGUSR1", onSigusr1);
|
process.on("SIGUSR1", onSigusr1);
|
||||||
|
|
||||||
|
// Register exit handler for crash scenarios (sync only - can't await in 'exit' handler)
|
||||||
|
process.on("exit", () => {
|
||||||
|
killAllChildrenSync();
|
||||||
|
});
|
||||||
|
|
||||||
|
let consecutiveFailures = 0;
|
||||||
|
const STABILITY_THRESHOLD_MS = 60_000;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
|
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
|
||||||
// SIGTERM/SIGINT still exit after a graceful shutdown.
|
// SIGTERM/SIGINT still exit after a graceful shutdown.
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
server = await params.start();
|
// Calculate and apply backoff with jitter
|
||||||
await new Promise<void>((resolve) => {
|
const baseBackoffMs = calculateBackoffMs(consecutiveFailures);
|
||||||
|
const backoffMs = applyJitter(baseBackoffMs);
|
||||||
|
|
||||||
|
if (backoffMs > 0) {
|
||||||
|
gatewayLog.warn(
|
||||||
|
`Restarting gateway in ${backoffMs}ms after failure (attempt ${consecutiveFailures + 1})`,
|
||||||
|
);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, backoffMs));
|
||||||
|
}
|
||||||
|
|
||||||
|
const startAttemptMs = Date.now();
|
||||||
|
|
||||||
|
try {
|
||||||
|
server = await params.start();
|
||||||
|
} catch (err) {
|
||||||
|
// Only retry transient network errors; rethrow fatal/config errors
|
||||||
|
if (!isTransientNetworkError(err)) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
gatewayLog.error(`Gateway startup failed (transient): ${String(err)}`);
|
||||||
|
recordCrash({
|
||||||
|
errorType: classifyError(err),
|
||||||
|
errorMessage: err instanceof Error ? err.message : String(err),
|
||||||
|
uptimeMs: 0,
|
||||||
|
backoffMs,
|
||||||
|
consecutiveFailures: consecutiveFailures + 1,
|
||||||
|
});
|
||||||
|
consecutiveFailures++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server started successfully - wait for restart signal
|
||||||
|
const restartReason = await new Promise<{ isUserInitiated: boolean }>((resolve) => {
|
||||||
restartResolver = resolve;
|
restartResolver = resolve;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const uptimeMs = Date.now() - startAttemptMs;
|
||||||
|
|
||||||
|
// Determine backoff reset behavior based on uptime and restart type
|
||||||
|
if (restartReason.isUserInitiated) {
|
||||||
|
// User-initiated restart (SIGUSR1): no backoff
|
||||||
|
consecutiveFailures = 0;
|
||||||
|
} else if (uptimeMs >= STABILITY_THRESHOLD_MS) {
|
||||||
|
// Crashed after stable uptime: reset to minimal backoff
|
||||||
|
recordCrash({
|
||||||
|
errorType: "runtime_error",
|
||||||
|
errorMessage: "crashed after stable uptime",
|
||||||
|
uptimeMs,
|
||||||
|
backoffMs: calculateBackoffMs(1),
|
||||||
|
consecutiveFailures: 1,
|
||||||
|
});
|
||||||
|
consecutiveFailures = 1;
|
||||||
|
} else {
|
||||||
|
// Crashed during startup or early runtime: increment backoff
|
||||||
|
recordCrash({
|
||||||
|
errorType: "runtime_error",
|
||||||
|
errorMessage: "crashed during early runtime",
|
||||||
|
uptimeMs,
|
||||||
|
backoffMs: calculateBackoffMs(consecutiveFailures + 1),
|
||||||
|
consecutiveFailures: consecutiveFailures + 1,
|
||||||
|
});
|
||||||
|
consecutiveFailures++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
await lock?.release();
|
await lock?.release();
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import type { WebSocketServer } from "ws";
|
|||||||
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
|
import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js";
|
||||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||||
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
import { stopGmailWatcher } from "../hooks/gmail-watcher.js";
|
||||||
|
import { killAllChildren, killAllChildrenSync } from "../infra/child-registry.js";
|
||||||
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||||
|
|
||||||
@ -124,5 +125,11 @@ export function createGatewayCloseHandler(params: {
|
|||||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 3: Kill any orphaned children not stopped by their own managers
|
||||||
|
await killAllChildren("SIGTERM", { excludeManaged: true, timeoutMs: 2000 });
|
||||||
|
|
||||||
|
// Phase 4: Force cleanup of any remaining children
|
||||||
|
killAllChildrenSync();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,6 +8,7 @@
|
|||||||
import { type ChildProcess, spawn } from "node:child_process";
|
import { type ChildProcess, spawn } from "node:child_process";
|
||||||
import { hasBinary } from "../agents/skills.js";
|
import { hasBinary } from "../agents/skills.js";
|
||||||
import type { MoltbotConfig } from "../config/config.js";
|
import type { MoltbotConfig } from "../config/config.js";
|
||||||
|
import { registerChild } from "../infra/child-registry.js";
|
||||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
import { runCommandWithTimeout } from "../process/exec.js";
|
import { runCommandWithTimeout } from "../process/exec.js";
|
||||||
import {
|
import {
|
||||||
@ -73,6 +74,9 @@ function spawnGogServe(cfg: GmailHookRuntimeConfig): ChildProcess {
|
|||||||
detached: false,
|
detached: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Register with child registry (managedExternally: true because we have our own stop logic)
|
||||||
|
registerChild("gog-gmail-watcher", child, { managedExternally: true });
|
||||||
|
|
||||||
child.stdout?.on("data", (data: Buffer) => {
|
child.stdout?.on("data", (data: Buffer) => {
|
||||||
const line = data.toString().trim();
|
const line = data.toString().trim();
|
||||||
if (line) log.info(`[gog] ${line}`);
|
if (line) log.info(`[gog] ${line}`);
|
||||||
|
|||||||
136
src/infra/child-registry.test.ts
Normal file
136
src/infra/child-registry.test.ts
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
// src/infra/child-registry.test.ts
|
||||||
|
import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
|
||||||
|
import { spawn, type ChildProcess } from "node:child_process";
|
||||||
|
import {
|
||||||
|
registerChild,
|
||||||
|
unregisterChild,
|
||||||
|
killAllChildren,
|
||||||
|
killAllChildrenSync,
|
||||||
|
getRegisteredChildren,
|
||||||
|
clearRegistry,
|
||||||
|
} from "./child-registry.js";
|
||||||
|
|
||||||
|
describe("child-registry", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
clearRegistry();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
clearRegistry();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("registers a child process with PID", () => {
|
||||||
|
const mockProc = {
|
||||||
|
pid: 12345,
|
||||||
|
killed: false,
|
||||||
|
exitCode: null,
|
||||||
|
signalCode: null,
|
||||||
|
kill: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
once: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("test-child", mockProc);
|
||||||
|
|
||||||
|
const children = getRegisteredChildren();
|
||||||
|
expect(children).toHaveLength(1);
|
||||||
|
expect(children[0].pid).toBe(12345);
|
||||||
|
expect(children[0].name).toBe("test-child");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not register if no PID", () => {
|
||||||
|
const mockProc = {
|
||||||
|
pid: undefined,
|
||||||
|
on: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("no-pid", mockProc);
|
||||||
|
|
||||||
|
expect(getRegisteredChildren()).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("respects managedExternally flag", () => {
|
||||||
|
const mockProc = {
|
||||||
|
pid: 11111,
|
||||||
|
killed: false,
|
||||||
|
exitCode: null,
|
||||||
|
signalCode: null,
|
||||||
|
kill: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
once: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("managed", mockProc, { managedExternally: true });
|
||||||
|
|
||||||
|
const children = getRegisteredChildren();
|
||||||
|
expect(children[0].managedExternally).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("unregisters a child by PID", () => {
|
||||||
|
const mockProc = {
|
||||||
|
pid: 22222,
|
||||||
|
killed: false,
|
||||||
|
exitCode: null,
|
||||||
|
signalCode: null,
|
||||||
|
kill: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
once: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("to-remove", mockProc);
|
||||||
|
expect(getRegisteredChildren()).toHaveLength(1);
|
||||||
|
|
||||||
|
unregisterChild(22222);
|
||||||
|
expect(getRegisteredChildren()).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("killAllChildrenSync sends SIGKILL to all children", () => {
|
||||||
|
const killFn = vi.fn();
|
||||||
|
const mockProc = {
|
||||||
|
pid: 33333,
|
||||||
|
killed: false,
|
||||||
|
exitCode: null,
|
||||||
|
signalCode: null,
|
||||||
|
kill: killFn,
|
||||||
|
on: vi.fn(),
|
||||||
|
once: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("to-kill", mockProc);
|
||||||
|
killAllChildrenSync();
|
||||||
|
|
||||||
|
expect(killFn).toHaveBeenCalledWith("SIGKILL");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips already-dead processes", () => {
|
||||||
|
const killFn = vi.fn();
|
||||||
|
const mockProc = {
|
||||||
|
pid: 44444,
|
||||||
|
killed: false,
|
||||||
|
exitCode: 0, // Already exited
|
||||||
|
signalCode: null,
|
||||||
|
kill: killFn,
|
||||||
|
on: vi.fn(),
|
||||||
|
once: vi.fn(),
|
||||||
|
} as unknown as ChildProcess;
|
||||||
|
|
||||||
|
registerChild("already-dead", mockProc);
|
||||||
|
killAllChildrenSync();
|
||||||
|
|
||||||
|
expect(killFn).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("child-registry integration", () => {
|
||||||
|
it("kills a real spawned process", async () => {
|
||||||
|
const proc = spawn("sleep", ["60"], { detached: false });
|
||||||
|
registerChild("sleep-test", proc);
|
||||||
|
|
||||||
|
expect(getRegisteredChildren()).toHaveLength(1);
|
||||||
|
|
||||||
|
await killAllChildren("SIGTERM", { timeoutMs: 1000 });
|
||||||
|
|
||||||
|
// Process should be killed
|
||||||
|
expect(proc.killed || proc.exitCode !== null || proc.signalCode !== null).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
153
src/infra/child-registry.ts
Normal file
153
src/infra/child-registry.ts
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
// src/infra/child-registry.ts
|
||||||
|
import type { ChildProcess } from "node:child_process";
|
||||||
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
|
|
||||||
|
const log = createSubsystemLogger("child-registry");
|
||||||
|
|
||||||
|
type ChildEntry = {
|
||||||
|
name: string;
|
||||||
|
process: ChildProcess;
|
||||||
|
managedExternally: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
const children = new Map<number, ChildEntry>();
|
||||||
|
|
||||||
|
export function registerChild(
|
||||||
|
name: string,
|
||||||
|
proc: ChildProcess,
|
||||||
|
opts?: { managedExternally?: boolean },
|
||||||
|
): void {
|
||||||
|
if (!proc.pid) {
|
||||||
|
log.warn(`Cannot register child "${name}": no PID (spawn may have failed)`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
children.set(proc.pid, {
|
||||||
|
name,
|
||||||
|
process: proc,
|
||||||
|
managedExternally: opts?.managedExternally ?? false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const cleanup = () => {
|
||||||
|
if (proc.pid) children.delete(proc.pid);
|
||||||
|
};
|
||||||
|
proc.on("exit", cleanup);
|
||||||
|
proc.on("error", cleanup);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function unregisterChild(pid: number): void {
|
||||||
|
children.delete(pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function killAllChildren(
|
||||||
|
signal: NodeJS.Signals = "SIGTERM",
|
||||||
|
opts?: { excludeManaged?: boolean; timeoutMs?: number },
|
||||||
|
): Promise<void> {
|
||||||
|
const timeoutMs = opts?.timeoutMs ?? (signal === "SIGKILL" ? 500 : 3000);
|
||||||
|
const excludeManaged = opts?.excludeManaged ?? false;
|
||||||
|
const promises: Promise<void>[] = [];
|
||||||
|
|
||||||
|
// Copy entries to avoid mutation during iteration
|
||||||
|
const entries = [...children.entries()];
|
||||||
|
for (const [pid, entry] of entries) {
|
||||||
|
const { name, process: proc, managedExternally } = entry;
|
||||||
|
|
||||||
|
if (excludeManaged && managedExternally) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (proc.killed || proc.exitCode !== null || proc.signalCode !== null) {
|
||||||
|
children.delete(pid);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(`Killing child process: ${name} (pid=${pid}) with ${signal}`);
|
||||||
|
promises.push(killWithTimeout(pid, proc, signal, timeoutMs, name));
|
||||||
|
}
|
||||||
|
|
||||||
|
await Promise.all(promises);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function killWithTimeout(
|
||||||
|
pid: number,
|
||||||
|
proc: ChildProcess,
|
||||||
|
signal: NodeJS.Signals,
|
||||||
|
timeoutMs: number,
|
||||||
|
name: string,
|
||||||
|
): Promise<void> {
|
||||||
|
if (proc.exitCode !== null || proc.signalCode !== null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
proc.kill(signal);
|
||||||
|
} catch (err) {
|
||||||
|
const errnoErr = err as NodeJS.ErrnoException;
|
||||||
|
if (errnoErr.code !== "ESRCH") {
|
||||||
|
log.warn(`Failed to send ${signal} to ${name}: ${errnoErr.message}`);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeoutMs <= 0) return;
|
||||||
|
|
||||||
|
await Promise.race([
|
||||||
|
new Promise<void>((resolve) => proc.once("exit", resolve)),
|
||||||
|
new Promise<void>((resolve) => setTimeout(resolve, timeoutMs)),
|
||||||
|
]);
|
||||||
|
|
||||||
|
if (proc.exitCode === null && proc.signalCode === null) {
|
||||||
|
if (signal !== "SIGKILL") {
|
||||||
|
log.warn(`${name} did not exit after ${timeoutMs}ms; sending SIGKILL`);
|
||||||
|
try {
|
||||||
|
proc.kill("SIGKILL");
|
||||||
|
} catch {
|
||||||
|
/* ignore */
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise<void>((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
if (proc.exitCode === null && proc.signalCode === null) {
|
||||||
|
log.warn(`Process ${name} (pid=${pid}) did not respond to SIGKILL; proceeding anyway`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.warn(
|
||||||
|
`Process ${name} (pid=${pid}) did not respond to SIGKILL after ${timeoutMs}ms; proceeding anyway`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
children.delete(pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function killAllChildrenSync(): void {
|
||||||
|
// Copy entries to avoid mutation during iteration
|
||||||
|
const entries = [...children.entries()];
|
||||||
|
for (const [pid, { name, process: proc }] of entries) {
|
||||||
|
if (proc.exitCode !== null || proc.signalCode !== null) continue;
|
||||||
|
try {
|
||||||
|
// Use console.error since logging may be unreliable during process exit
|
||||||
|
console.error(`[child-registry] Force-killing child process: ${name} (pid=${pid})`);
|
||||||
|
proc.kill("SIGKILL");
|
||||||
|
} catch {
|
||||||
|
/* ignore */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
children.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getRegisteredChildren(): Array<{
|
||||||
|
pid: number;
|
||||||
|
name: string;
|
||||||
|
managedExternally: boolean;
|
||||||
|
}> {
|
||||||
|
return [...children.entries()].map(([pid, entry]) => ({
|
||||||
|
pid,
|
||||||
|
name: entry.name,
|
||||||
|
managedExternally: entry.managedExternally,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
export function clearRegistry(): void {
|
||||||
|
children.clear();
|
||||||
|
}
|
||||||
@ -1,6 +1,7 @@
|
|||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
import net from "node:net";
|
import net from "node:net";
|
||||||
|
|
||||||
|
import { registerChild } from "./child-registry.js";
|
||||||
import { ensurePortAvailable } from "./ports.js";
|
import { ensurePortAvailable } from "./ports.js";
|
||||||
|
|
||||||
export type SshParsedTarget = {
|
export type SshParsedTarget = {
|
||||||
@ -145,6 +146,8 @@ export async function startSshPortForward(opts: {
|
|||||||
const child = spawn("/usr/bin/ssh", args, {
|
const child = spawn("/usr/bin/ssh", args, {
|
||||||
stdio: ["ignore", "ignore", "pipe"],
|
stdio: ["ignore", "ignore", "pipe"],
|
||||||
});
|
});
|
||||||
|
// Register with child registry (managedExternally: true because tunnel has its own stop logic)
|
||||||
|
registerChild("ssh-tunnel", child, { managedExternally: true });
|
||||||
child.stderr?.setEncoding("utf8");
|
child.stderr?.setEncoding("utf8");
|
||||||
child.stderr?.on("data", (chunk) => {
|
child.stderr?.on("data", (chunk) => {
|
||||||
const lines = String(chunk)
|
const lines = String(chunk)
|
||||||
|
|||||||
@ -127,3 +127,63 @@ describe("isTransientNetworkError", () => {
|
|||||||
expect(isTransientNetworkError(error)).toBe(false);
|
expect(isTransientNetworkError(error)).toBe(false);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("isTransientNetworkError - expanded patterns", () => {
|
||||||
|
it("recognizes EAI_NODATA as transient", () => {
|
||||||
|
const err = { code: "EAI_NODATA", message: "DNS lookup failed" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes EAI_NONAME as transient", () => {
|
||||||
|
const err = { code: "EAI_NONAME", message: "DNS name not found" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes HTTP 502 as transient", () => {
|
||||||
|
const err = { status: 502, message: "Bad Gateway" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes HTTP 503 as transient", () => {
|
||||||
|
const err = { statusCode: 503, message: "Service Unavailable" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes HTTP 429 rate limit as transient", () => {
|
||||||
|
const err = { status: 429, message: "Too Many Requests" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes socket closed message as transient", () => {
|
||||||
|
const err = new Error("socket closed unexpectedly");
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes client network socket disconnected as transient", () => {
|
||||||
|
const err = new Error("Client network socket disconnected before secure TLS connection");
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("recognizes TLS certificate errors as transient", () => {
|
||||||
|
const err1 = { code: "CERT_HAS_EXPIRED", message: "certificate has expired" };
|
||||||
|
expect(isTransientNetworkError(err1)).toBe(true);
|
||||||
|
|
||||||
|
const err2 = { code: "ERR_TLS_CERT_ALTNAME_INVALID", message: "Hostname mismatch" };
|
||||||
|
expect(isTransientNetworkError(err2)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not recognize HTTP 400 as transient", () => {
|
||||||
|
const err = { status: 400, message: "Bad Request" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not recognize HTTP 401 as transient", () => {
|
||||||
|
const err = { status: 401, message: "Unauthorized" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("handles string status codes", () => {
|
||||||
|
const err = { status: "503", message: "Service Unavailable" };
|
||||||
|
expect(isTransientNetworkError(err)).toBe(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@ -28,6 +28,10 @@ const TRANSIENT_NETWORK_CODES = new Set([
|
|||||||
"EHOSTUNREACH",
|
"EHOSTUNREACH",
|
||||||
"ENETUNREACH",
|
"ENETUNREACH",
|
||||||
"EAI_AGAIN",
|
"EAI_AGAIN",
|
||||||
|
"EAI_NODATA",
|
||||||
|
"EAI_NONAME",
|
||||||
|
"CERT_HAS_EXPIRED",
|
||||||
|
"ERR_TLS_CERT_ALTNAME_INVALID",
|
||||||
"UND_ERR_CONNECT_TIMEOUT",
|
"UND_ERR_CONNECT_TIMEOUT",
|
||||||
"UND_ERR_DNS_RESOLVE_FAILED",
|
"UND_ERR_DNS_RESOLVE_FAILED",
|
||||||
"UND_ERR_CONNECT",
|
"UND_ERR_CONNECT",
|
||||||
@ -99,6 +103,34 @@ export function isTransientNetworkError(err: unknown): boolean {
|
|||||||
return err.errors.some((e) => isTransientNetworkError(e));
|
return err.errors.some((e) => isTransientNetworkError(e));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Message-based fallback detection
|
||||||
|
const message = err instanceof Error ? err.message?.toLowerCase() : "";
|
||||||
|
if (
|
||||||
|
message.includes("fetch failed") ||
|
||||||
|
message.includes("network error") ||
|
||||||
|
message.includes("socket hang up") ||
|
||||||
|
message.includes("socket closed") ||
|
||||||
|
message.includes("client network socket disconnected")
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for transient HTTP error responses (handle both number and string status codes)
|
||||||
|
const statusRaw =
|
||||||
|
(err as { status?: unknown }).status ?? (err as { statusCode?: unknown }).statusCode;
|
||||||
|
const status =
|
||||||
|
typeof statusRaw === "number"
|
||||||
|
? statusRaw
|
||||||
|
: typeof statusRaw === "string"
|
||||||
|
? parseInt(statusRaw, 10)
|
||||||
|
: NaN;
|
||||||
|
if (
|
||||||
|
Number.isFinite(status) &&
|
||||||
|
(status === 429 || status === 502 || status === 503 || status === 504)
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
import type { RuntimeEnv } from "../runtime.js";
|
import type { RuntimeEnv } from "../runtime.js";
|
||||||
|
import { registerChild } from "../infra/child-registry.js";
|
||||||
|
|
||||||
export type SignalDaemonOpts = {
|
export type SignalDaemonOpts = {
|
||||||
cliPath: string;
|
cliPath: string;
|
||||||
@ -52,6 +53,10 @@ export function spawnSignalDaemon(opts: SignalDaemonOpts): SignalDaemonHandle {
|
|||||||
const child = spawn(opts.cliPath, args, {
|
const child = spawn(opts.cliPath, args, {
|
||||||
stdio: ["ignore", "pipe", "pipe"],
|
stdio: ["ignore", "pipe", "pipe"],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Register with child registry (managedExternally: true because signal daemon has its own lifecycle)
|
||||||
|
registerChild("signal-cli-daemon", child, { managedExternally: true });
|
||||||
|
|
||||||
const log = opts.runtime?.log ?? (() => {});
|
const log = opts.runtime?.log ?? (() => {});
|
||||||
const error = opts.runtime?.error ?? (() => {});
|
const error = opts.runtime?.error ?? (() => {});
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user