diff --git a/src/config/sessions/per-session-store.ts b/src/config/sessions/per-session-store.ts new file mode 100644 index 000000000..48ba3c0b2 --- /dev/null +++ b/src/config/sessions/per-session-store.ts @@ -0,0 +1,137 @@ +/** + * Per-session metadata storage to eliminate lock contention. + * + * Instead of storing all session metadata in a single sessions.json file + * (which requires a global lock), each session gets its own .meta.json file. + * This allows parallel updates without blocking. + */ + +import fs from "node:fs/promises"; +import path from "node:path"; +import JSON5 from "json5"; +import type { SessionEntry } from "./types.js"; +import { resolveSessionTranscriptsDirForAgent } from "./paths.js"; + +const META_SUFFIX = ".meta.json"; + +/** + * Get the path to a session's metadata file. + */ +export function getSessionMetaPath(sessionId: string, agentId?: string): string { + const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId); + return path.join(sessionsDir, `${sessionId}${META_SUFFIX}`); +} + +/** + * Load session metadata from per-session file. + * Returns undefined if the file doesn't exist. + */ +export async function loadSessionMeta( + sessionId: string, + agentId?: string, +): Promise { + const metaPath = getSessionMetaPath(sessionId, agentId); + try { + const content = await fs.readFile(metaPath, "utf-8"); + const entry = JSON5.parse(content) as SessionEntry; + return entry; + } catch (err) { + const code = (err as { code?: string }).code; + if (code === "ENOENT") return undefined; + throw err; + } +} + +/** + * Save session metadata to per-session file. + * Uses atomic write (write to temp, then rename) to prevent corruption. + */ +export async function saveSessionMeta( + sessionId: string, + entry: SessionEntry, + agentId?: string, +): Promise { + const metaPath = getSessionMetaPath(sessionId, agentId); + const dir = path.dirname(metaPath); + await fs.mkdir(dir, { recursive: true }); + + // Atomic write: write to temp file, then rename + const tempPath = `${metaPath}.tmp.${process.pid}.${Date.now()}`; + const content = JSON.stringify(entry, null, 2); + + try { + await fs.writeFile(tempPath, content, "utf-8"); + await fs.rename(tempPath, metaPath); + } catch (err) { + // Clean up temp file on error + await fs.unlink(tempPath).catch(() => {}); + throw err; + } +} + +/** + * Update session metadata atomically. + * Reads current state, applies patch, and writes back. + * No lock needed since we use atomic writes and per-session files. + */ +export async function updateSessionMeta( + sessionId: string, + patch: Partial, + agentId?: string, +): Promise { + const existing = await loadSessionMeta(sessionId, agentId); + const updatedAt = Date.now(); + const merged: SessionEntry = { + ...existing, + ...patch, + sessionId, + updatedAt, + }; + await saveSessionMeta(sessionId, merged, agentId); + return merged; +} + +/** + * Delete session metadata file. + */ +export async function deleteSessionMeta(sessionId: string, agentId?: string): Promise { + const metaPath = getSessionMetaPath(sessionId, agentId); + await fs.unlink(metaPath).catch((err) => { + if ((err as { code?: string }).code !== "ENOENT") throw err; + }); +} + +/** + * List all session metadata files in the sessions directory. + * Returns an array of session IDs. + */ +export async function listSessionMetas(agentId?: string): Promise { + const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId); + try { + const files = await fs.readdir(sessionsDir); + return files.filter((f) => f.endsWith(META_SUFFIX)).map((f) => f.slice(0, -META_SUFFIX.length)); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") return []; + throw err; + } +} + +/** + * Load all session metadata from per-session files. + * This is used for backwards compatibility and for building the session index. + */ +export async function loadAllSessionMetas(agentId?: string): Promise> { + const sessionIds = await listSessionMetas(agentId); + const entries: Record = {}; + + await Promise.all( + sessionIds.map(async (sessionId) => { + const entry = await loadSessionMeta(sessionId, agentId); + if (entry) { + entries[sessionId] = entry; + } + }), + ); + + return entries; +} diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 0f36e0ebb..0feb72691 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -342,17 +342,102 @@ export async function updateSessionStoreEntry(params: { update: (entry: SessionEntry) => Promise | null>; }): Promise { const { storePath, sessionKey, update } = params; - return await withSessionStoreLock(storePath, async () => { - const store = loadSessionStore(storePath); - const existing = store[sessionKey]; - if (!existing) return null; - const patch = await update(existing); - if (!patch) return existing; - const next = mergeSessionEntry(existing, patch); - store[sessionKey] = next; - await saveSessionStoreUnlocked(storePath, store); - return next; - }); + + // Fast path: read the store without locking to get the session entry + // The store is cached and TTL-validated, so this is cheap + const store = loadSessionStore(storePath); + const existing = store[sessionKey]; + if (!existing) return null; + + // Get the sessionId for per-session file access + const sessionId = existing.sessionId; + if (!sessionId) { + // Fallback to locked update for legacy entries without sessionId + return await withSessionStoreLock(storePath, async () => { + const freshStore = loadSessionStore(storePath, { skipCache: true }); + const freshExisting = freshStore[sessionKey]; + if (!freshExisting) return null; + const patch = await update(freshExisting); + if (!patch) return freshExisting; + const next = mergeSessionEntry(freshExisting, patch); + freshStore[sessionKey] = next; + await saveSessionStoreUnlocked(storePath, freshStore); + return next; + }); + } + + // Compute the patch + const patch = await update(existing); + if (!patch) return existing; + + // Merge and create the updated entry + const next = mergeSessionEntry(existing, patch); + + // Write to per-session meta file (no global lock needed) + const { updateSessionMeta } = await import("./per-session-store.js"); + const agentId = extractAgentIdFromStorePath(storePath); + await updateSessionMeta(sessionId, next, agentId); + + // Update the in-memory cache so subsequent reads see the update + store[sessionKey] = next; + invalidateSessionStoreCache(storePath); + + // Async background sync to sessions.json (debounced, best-effort) + debouncedSyncToSessionsJson(storePath, sessionKey, next); + + return next; +} + +// Helper to extract agentId from store path +function extractAgentIdFromStorePath(storePath: string): string | undefined { + // storePath is like: ~/.openclaw/agents/{agentId}/sessions/sessions.json + const match = storePath.match(/agents\/([^/]+)\/sessions/); + return match?.[1]; +} + +// Debounced sync to sessions.json to keep it in sync (background, best-effort) +const pendingSyncs = new Map(); +let syncTimer: NodeJS.Timeout | null = null; + +function debouncedSyncToSessionsJson( + storePath: string, + sessionKey: string, + entry: SessionEntry, +): void { + const key = `${storePath}::${sessionKey}`; + pendingSyncs.set(key, { sessionKey, entry }); + + if (syncTimer) return; // Already scheduled + + syncTimer = setTimeout(async () => { + syncTimer = null; + const toSync = new Map(pendingSyncs); + pendingSyncs.clear(); + + // Group by storePath + const byStore = new Map>(); + for (const [key, value] of toSync) { + const [sp] = key.split("::"); + const list = byStore.get(sp) ?? []; + list.push(value); + byStore.set(sp, list); + } + + // Batch update each store + for (const [sp, entries] of byStore) { + try { + await withSessionStoreLock(sp, async () => { + const store = loadSessionStore(sp, { skipCache: true }); + for (const { sessionKey: sk, entry: e } of entries) { + store[sk] = e; + } + await saveSessionStoreUnlocked(sp, store); + }); + } catch { + // Best-effort sync, ignore errors + } + } + }, 5000); // 5 second debounce } export async function recordSessionMetaFromInbound(params: {