Hook sessions (e.g., from Gmail hooks) now auto-cleanup after a configurable TTL. Changes: - Add `hooks.sessionTtlMs` config option (default: 24 hours) - Add `cleanupStaleHookSessions()` function that deletes stale hook sessions - Run cleanup hourly via existing maintenance interval - Only affects sessions with keys starting with "hook:" - Sessions + transcripts are deleted after TTL expires Config example: ```yaml hooks: sessionTtlMs: 86400000 # 24 hours (default) # Set to 0 to disable cleanup ``` This prevents hook sessions from accumulating indefinitely while still allowing time for debugging (sessions are kept for 24h by default).
130 lines
4.7 KiB
TypeScript
130 lines
4.7 KiB
TypeScript
import type { HealthSummary } from "../commands/health.js";
|
|
import { loadConfig } from "../config/config.js";
|
|
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
|
import { cleanupStaleHookSessions } from "./hooks-session-cleanup.js";
|
|
import { setBroadcastHealthUpdate } from "./server/health-state.js";
|
|
import type { ChatRunEntry } from "./server-chat.js";
|
|
import {
|
|
DEDUPE_MAX,
|
|
DEDUPE_TTL_MS,
|
|
HEALTH_REFRESH_INTERVAL_MS,
|
|
TICK_INTERVAL_MS,
|
|
} from "./server-constants.js";
|
|
import type { DedupeEntry } from "./server-shared.js";
|
|
import { formatError } from "./server-utils.js";
|
|
|
|
export function startGatewayMaintenanceTimers(params: {
|
|
broadcast: (
|
|
event: string,
|
|
payload: unknown,
|
|
opts?: {
|
|
dropIfSlow?: boolean;
|
|
stateVersion?: { presence?: number; health?: number };
|
|
},
|
|
) => void;
|
|
nodeSendToAllSubscribed: (event: string, payload: unknown) => void;
|
|
getPresenceVersion: () => number;
|
|
getHealthVersion: () => number;
|
|
refreshGatewayHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
|
|
logHealth: { error: (msg: string) => void };
|
|
dedupe: Map<string, DedupeEntry>;
|
|
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
|
chatRunState: { abortedRuns: Map<string, number> };
|
|
chatRunBuffers: Map<string, string>;
|
|
chatDeltaSentAt: Map<string, number>;
|
|
removeChatRun: (
|
|
sessionId: string,
|
|
clientRunId: string,
|
|
sessionKey?: string,
|
|
) => ChatRunEntry | undefined;
|
|
agentRunSeq: Map<string, number>;
|
|
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
|
|
}): {
|
|
tickInterval: ReturnType<typeof setInterval>;
|
|
healthInterval: ReturnType<typeof setInterval>;
|
|
dedupeCleanup: ReturnType<typeof setInterval>;
|
|
} {
|
|
setBroadcastHealthUpdate((snap: HealthSummary) => {
|
|
params.broadcast("health", snap, {
|
|
stateVersion: {
|
|
presence: params.getPresenceVersion(),
|
|
health: params.getHealthVersion(),
|
|
},
|
|
});
|
|
params.nodeSendToAllSubscribed("health", snap);
|
|
});
|
|
|
|
// periodic keepalive
|
|
const tickInterval = setInterval(() => {
|
|
const payload = { ts: Date.now() };
|
|
params.broadcast("tick", payload, { dropIfSlow: true });
|
|
params.nodeSendToAllSubscribed("tick", payload);
|
|
}, TICK_INTERVAL_MS);
|
|
|
|
// periodic health refresh to keep cached snapshot warm
|
|
const healthInterval = setInterval(() => {
|
|
void params
|
|
.refreshGatewayHealthSnapshot({ probe: true })
|
|
.catch((err) => params.logHealth.error(`refresh failed: ${formatError(err)}`));
|
|
}, HEALTH_REFRESH_INTERVAL_MS);
|
|
|
|
// Prime cache so first client gets a snapshot without waiting.
|
|
void params
|
|
.refreshGatewayHealthSnapshot({ probe: true })
|
|
.catch((err) => params.logHealth.error(`initial refresh failed: ${formatError(err)}`));
|
|
|
|
// Hook session cleanup counter (run hourly = every 60 iterations at 60s interval)
|
|
let hookCleanupCounter = 0;
|
|
const HOOK_CLEANUP_INTERVAL = 60; // Run every 60 iterations (1 hour)
|
|
|
|
// dedupe cache cleanup
|
|
const dedupeCleanup = setInterval(() => {
|
|
const now = Date.now();
|
|
|
|
// Hook session TTL cleanup (runs hourly)
|
|
hookCleanupCounter++;
|
|
if (hookCleanupCounter >= HOOK_CLEANUP_INTERVAL) {
|
|
hookCleanupCounter = 0;
|
|
void cleanupStaleHookSessions({ cfg: loadConfig() }).catch(() => {
|
|
// Best-effort cleanup; errors are logged inside the function
|
|
});
|
|
}
|
|
for (const [k, v] of params.dedupe) {
|
|
if (now - v.ts > DEDUPE_TTL_MS) params.dedupe.delete(k);
|
|
}
|
|
if (params.dedupe.size > DEDUPE_MAX) {
|
|
const entries = [...params.dedupe.entries()].sort((a, b) => a[1].ts - b[1].ts);
|
|
for (let i = 0; i < params.dedupe.size - DEDUPE_MAX; i++) {
|
|
params.dedupe.delete(entries[i][0]);
|
|
}
|
|
}
|
|
|
|
for (const [runId, entry] of params.chatAbortControllers) {
|
|
if (now <= entry.expiresAtMs) continue;
|
|
abortChatRunById(
|
|
{
|
|
chatAbortControllers: params.chatAbortControllers,
|
|
chatRunBuffers: params.chatRunBuffers,
|
|
chatDeltaSentAt: params.chatDeltaSentAt,
|
|
chatAbortedRuns: params.chatRunState.abortedRuns,
|
|
removeChatRun: params.removeChatRun,
|
|
agentRunSeq: params.agentRunSeq,
|
|
broadcast: params.broadcast,
|
|
nodeSendToSession: params.nodeSendToSession,
|
|
},
|
|
{ runId, sessionKey: entry.sessionKey, stopReason: "timeout" },
|
|
);
|
|
}
|
|
|
|
const ABORTED_RUN_TTL_MS = 60 * 60_000;
|
|
for (const [runId, abortedAt] of params.chatRunState.abortedRuns) {
|
|
if (now - abortedAt <= ABORTED_RUN_TTL_MS) continue;
|
|
params.chatRunState.abortedRuns.delete(runId);
|
|
params.chatRunBuffers.delete(runId);
|
|
params.chatDeltaSentAt.delete(runId);
|
|
}
|
|
}, 60_000);
|
|
|
|
return { tickInterval, healthInterval, dedupeCleanup };
|
|
}
|