fix: per-session metadata files to eliminate lock contention
Instead of all sessions sharing a single sessions.json with a global lock, each session now gets its own .meta.json file for frequent updates. - New per-session-store.ts with atomic write functions - updateSessionStoreEntry now writes to per-session files without locking - Background debounced sync keeps sessions.json in sync Fixes #3092
This commit is contained in:
parent
da71eaebd2
commit
e0ff05c4ea
137
src/config/sessions/per-session-store.ts
Normal file
137
src/config/sessions/per-session-store.ts
Normal file
@ -0,0 +1,137 @@
|
||||
/**
|
||||
* Per-session metadata storage to eliminate lock contention.
|
||||
*
|
||||
* Instead of storing all session metadata in a single sessions.json file
|
||||
* (which requires a global lock), each session gets its own .meta.json file.
|
||||
* This allows parallel updates without blocking.
|
||||
*/
|
||||
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import JSON5 from "json5";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
import { resolveSessionTranscriptsDirForAgent } from "./paths.js";
|
||||
|
||||
const META_SUFFIX = ".meta.json";
|
||||
|
||||
/**
|
||||
* Get the path to a session's metadata file.
|
||||
*/
|
||||
export function getSessionMetaPath(sessionId: string, agentId?: string): string {
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId);
|
||||
return path.join(sessionsDir, `${sessionId}${META_SUFFIX}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load session metadata from per-session file.
|
||||
* Returns undefined if the file doesn't exist.
|
||||
*/
|
||||
export async function loadSessionMeta(
|
||||
sessionId: string,
|
||||
agentId?: string,
|
||||
): Promise<SessionEntry | undefined> {
|
||||
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||
try {
|
||||
const content = await fs.readFile(metaPath, "utf-8");
|
||||
const entry = JSON5.parse(content) as SessionEntry;
|
||||
return entry;
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code === "ENOENT") return undefined;
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save session metadata to per-session file.
|
||||
* Uses atomic write (write to temp, then rename) to prevent corruption.
|
||||
*/
|
||||
export async function saveSessionMeta(
|
||||
sessionId: string,
|
||||
entry: SessionEntry,
|
||||
agentId?: string,
|
||||
): Promise<void> {
|
||||
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||
const dir = path.dirname(metaPath);
|
||||
await fs.mkdir(dir, { recursive: true });
|
||||
|
||||
// Atomic write: write to temp file, then rename
|
||||
const tempPath = `${metaPath}.tmp.${process.pid}.${Date.now()}`;
|
||||
const content = JSON.stringify(entry, null, 2);
|
||||
|
||||
try {
|
||||
await fs.writeFile(tempPath, content, "utf-8");
|
||||
await fs.rename(tempPath, metaPath);
|
||||
} catch (err) {
|
||||
// Clean up temp file on error
|
||||
await fs.unlink(tempPath).catch(() => {});
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update session metadata atomically.
|
||||
* Reads current state, applies patch, and writes back.
|
||||
* No lock needed since we use atomic writes and per-session files.
|
||||
*/
|
||||
export async function updateSessionMeta(
|
||||
sessionId: string,
|
||||
patch: Partial<SessionEntry>,
|
||||
agentId?: string,
|
||||
): Promise<SessionEntry> {
|
||||
const existing = await loadSessionMeta(sessionId, agentId);
|
||||
const updatedAt = Date.now();
|
||||
const merged: SessionEntry = {
|
||||
...existing,
|
||||
...patch,
|
||||
sessionId,
|
||||
updatedAt,
|
||||
};
|
||||
await saveSessionMeta(sessionId, merged, agentId);
|
||||
return merged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete session metadata file.
|
||||
*/
|
||||
export async function deleteSessionMeta(sessionId: string, agentId?: string): Promise<void> {
|
||||
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||
await fs.unlink(metaPath).catch((err) => {
|
||||
if ((err as { code?: string }).code !== "ENOENT") throw err;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* List all session metadata files in the sessions directory.
|
||||
* Returns an array of session IDs.
|
||||
*/
|
||||
export async function listSessionMetas(agentId?: string): Promise<string[]> {
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId);
|
||||
try {
|
||||
const files = await fs.readdir(sessionsDir);
|
||||
return files.filter((f) => f.endsWith(META_SUFFIX)).map((f) => f.slice(0, -META_SUFFIX.length));
|
||||
} catch (err) {
|
||||
if ((err as { code?: string }).code === "ENOENT") return [];
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Load all session metadata from per-session files.
|
||||
* This is used for backwards compatibility and for building the session index.
|
||||
*/
|
||||
export async function loadAllSessionMetas(agentId?: string): Promise<Record<string, SessionEntry>> {
|
||||
const sessionIds = await listSessionMetas(agentId);
|
||||
const entries: Record<string, SessionEntry> = {};
|
||||
|
||||
await Promise.all(
|
||||
sessionIds.map(async (sessionId) => {
|
||||
const entry = await loadSessionMeta(sessionId, agentId);
|
||||
if (entry) {
|
||||
entries[sessionId] = entry;
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
return entries;
|
||||
}
|
||||
@ -342,17 +342,102 @@ export async function updateSessionStoreEntry(params: {
|
||||
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
|
||||
}): Promise<SessionEntry | null> {
|
||||
const { storePath, sessionKey, update } = params;
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
const store = loadSessionStore(storePath);
|
||||
const existing = store[sessionKey];
|
||||
if (!existing) return null;
|
||||
const patch = await update(existing);
|
||||
if (!patch) return existing;
|
||||
const next = mergeSessionEntry(existing, patch);
|
||||
store[sessionKey] = next;
|
||||
await saveSessionStoreUnlocked(storePath, store);
|
||||
return next;
|
||||
});
|
||||
|
||||
// Fast path: read the store without locking to get the session entry
|
||||
// The store is cached and TTL-validated, so this is cheap
|
||||
const store = loadSessionStore(storePath);
|
||||
const existing = store[sessionKey];
|
||||
if (!existing) return null;
|
||||
|
||||
// Get the sessionId for per-session file access
|
||||
const sessionId = existing.sessionId;
|
||||
if (!sessionId) {
|
||||
// Fallback to locked update for legacy entries without sessionId
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
const freshStore = loadSessionStore(storePath, { skipCache: true });
|
||||
const freshExisting = freshStore[sessionKey];
|
||||
if (!freshExisting) return null;
|
||||
const patch = await update(freshExisting);
|
||||
if (!patch) return freshExisting;
|
||||
const next = mergeSessionEntry(freshExisting, patch);
|
||||
freshStore[sessionKey] = next;
|
||||
await saveSessionStoreUnlocked(storePath, freshStore);
|
||||
return next;
|
||||
});
|
||||
}
|
||||
|
||||
// Compute the patch
|
||||
const patch = await update(existing);
|
||||
if (!patch) return existing;
|
||||
|
||||
// Merge and create the updated entry
|
||||
const next = mergeSessionEntry(existing, patch);
|
||||
|
||||
// Write to per-session meta file (no global lock needed)
|
||||
const { updateSessionMeta } = await import("./per-session-store.js");
|
||||
const agentId = extractAgentIdFromStorePath(storePath);
|
||||
await updateSessionMeta(sessionId, next, agentId);
|
||||
|
||||
// Update the in-memory cache so subsequent reads see the update
|
||||
store[sessionKey] = next;
|
||||
invalidateSessionStoreCache(storePath);
|
||||
|
||||
// Async background sync to sessions.json (debounced, best-effort)
|
||||
debouncedSyncToSessionsJson(storePath, sessionKey, next);
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
// Helper to extract agentId from store path
|
||||
function extractAgentIdFromStorePath(storePath: string): string | undefined {
|
||||
// storePath is like: ~/.openclaw/agents/{agentId}/sessions/sessions.json
|
||||
const match = storePath.match(/agents\/([^/]+)\/sessions/);
|
||||
return match?.[1];
|
||||
}
|
||||
|
||||
// Debounced sync to sessions.json to keep it in sync (background, best-effort)
|
||||
const pendingSyncs = new Map<string, { sessionKey: string; entry: SessionEntry }>();
|
||||
let syncTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
function debouncedSyncToSessionsJson(
|
||||
storePath: string,
|
||||
sessionKey: string,
|
||||
entry: SessionEntry,
|
||||
): void {
|
||||
const key = `${storePath}::${sessionKey}`;
|
||||
pendingSyncs.set(key, { sessionKey, entry });
|
||||
|
||||
if (syncTimer) return; // Already scheduled
|
||||
|
||||
syncTimer = setTimeout(async () => {
|
||||
syncTimer = null;
|
||||
const toSync = new Map(pendingSyncs);
|
||||
pendingSyncs.clear();
|
||||
|
||||
// Group by storePath
|
||||
const byStore = new Map<string, Array<{ sessionKey: string; entry: SessionEntry }>>();
|
||||
for (const [key, value] of toSync) {
|
||||
const [sp] = key.split("::");
|
||||
const list = byStore.get(sp) ?? [];
|
||||
list.push(value);
|
||||
byStore.set(sp, list);
|
||||
}
|
||||
|
||||
// Batch update each store
|
||||
for (const [sp, entries] of byStore) {
|
||||
try {
|
||||
await withSessionStoreLock(sp, async () => {
|
||||
const store = loadSessionStore(sp, { skipCache: true });
|
||||
for (const { sessionKey: sk, entry: e } of entries) {
|
||||
store[sk] = e;
|
||||
}
|
||||
await saveSessionStoreUnlocked(sp, store);
|
||||
});
|
||||
} catch {
|
||||
// Best-effort sync, ignore errors
|
||||
}
|
||||
}
|
||||
}, 5000); // 5 second debounce
|
||||
}
|
||||
|
||||
export async function recordSessionMetaFromInbound(params: {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user