Merge e0ff05c4ea into 09be5d45d5
This commit is contained in:
commit
76055cf9fd
137
src/config/sessions/per-session-store.ts
Normal file
137
src/config/sessions/per-session-store.ts
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
/**
|
||||||
|
* Per-session metadata storage to eliminate lock contention.
|
||||||
|
*
|
||||||
|
* Instead of storing all session metadata in a single sessions.json file
|
||||||
|
* (which requires a global lock), each session gets its own .meta.json file.
|
||||||
|
* This allows parallel updates without blocking.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import fs from "node:fs/promises";
|
||||||
|
import path from "node:path";
|
||||||
|
import JSON5 from "json5";
|
||||||
|
import type { SessionEntry } from "./types.js";
|
||||||
|
import { resolveSessionTranscriptsDirForAgent } from "./paths.js";
|
||||||
|
|
||||||
|
const META_SUFFIX = ".meta.json";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the path to a session's metadata file.
|
||||||
|
*/
|
||||||
|
export function getSessionMetaPath(sessionId: string, agentId?: string): string {
|
||||||
|
const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId);
|
||||||
|
return path.join(sessionsDir, `${sessionId}${META_SUFFIX}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load session metadata from per-session file.
|
||||||
|
* Returns undefined if the file doesn't exist.
|
||||||
|
*/
|
||||||
|
export async function loadSessionMeta(
|
||||||
|
sessionId: string,
|
||||||
|
agentId?: string,
|
||||||
|
): Promise<SessionEntry | undefined> {
|
||||||
|
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||||
|
try {
|
||||||
|
const content = await fs.readFile(metaPath, "utf-8");
|
||||||
|
const entry = JSON5.parse(content) as SessionEntry;
|
||||||
|
return entry;
|
||||||
|
} catch (err) {
|
||||||
|
const code = (err as { code?: string }).code;
|
||||||
|
if (code === "ENOENT") return undefined;
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save session metadata to per-session file.
|
||||||
|
* Uses atomic write (write to temp, then rename) to prevent corruption.
|
||||||
|
*/
|
||||||
|
export async function saveSessionMeta(
|
||||||
|
sessionId: string,
|
||||||
|
entry: SessionEntry,
|
||||||
|
agentId?: string,
|
||||||
|
): Promise<void> {
|
||||||
|
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||||
|
const dir = path.dirname(metaPath);
|
||||||
|
await fs.mkdir(dir, { recursive: true });
|
||||||
|
|
||||||
|
// Atomic write: write to temp file, then rename
|
||||||
|
const tempPath = `${metaPath}.tmp.${process.pid}.${Date.now()}`;
|
||||||
|
const content = JSON.stringify(entry, null, 2);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await fs.writeFile(tempPath, content, "utf-8");
|
||||||
|
await fs.rename(tempPath, metaPath);
|
||||||
|
} catch (err) {
|
||||||
|
// Clean up temp file on error
|
||||||
|
await fs.unlink(tempPath).catch(() => {});
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update session metadata atomically.
|
||||||
|
* Reads current state, applies patch, and writes back.
|
||||||
|
* No lock needed since we use atomic writes and per-session files.
|
||||||
|
*/
|
||||||
|
export async function updateSessionMeta(
|
||||||
|
sessionId: string,
|
||||||
|
patch: Partial<SessionEntry>,
|
||||||
|
agentId?: string,
|
||||||
|
): Promise<SessionEntry> {
|
||||||
|
const existing = await loadSessionMeta(sessionId, agentId);
|
||||||
|
const updatedAt = Date.now();
|
||||||
|
const merged: SessionEntry = {
|
||||||
|
...existing,
|
||||||
|
...patch,
|
||||||
|
sessionId,
|
||||||
|
updatedAt,
|
||||||
|
};
|
||||||
|
await saveSessionMeta(sessionId, merged, agentId);
|
||||||
|
return merged;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete session metadata file.
|
||||||
|
*/
|
||||||
|
export async function deleteSessionMeta(sessionId: string, agentId?: string): Promise<void> {
|
||||||
|
const metaPath = getSessionMetaPath(sessionId, agentId);
|
||||||
|
await fs.unlink(metaPath).catch((err) => {
|
||||||
|
if ((err as { code?: string }).code !== "ENOENT") throw err;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List all session metadata files in the sessions directory.
|
||||||
|
* Returns an array of session IDs.
|
||||||
|
*/
|
||||||
|
export async function listSessionMetas(agentId?: string): Promise<string[]> {
|
||||||
|
const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId);
|
||||||
|
try {
|
||||||
|
const files = await fs.readdir(sessionsDir);
|
||||||
|
return files.filter((f) => f.endsWith(META_SUFFIX)).map((f) => f.slice(0, -META_SUFFIX.length));
|
||||||
|
} catch (err) {
|
||||||
|
if ((err as { code?: string }).code === "ENOENT") return [];
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load all session metadata from per-session files.
|
||||||
|
* This is used for backwards compatibility and for building the session index.
|
||||||
|
*/
|
||||||
|
export async function loadAllSessionMetas(agentId?: string): Promise<Record<string, SessionEntry>> {
|
||||||
|
const sessionIds = await listSessionMetas(agentId);
|
||||||
|
const entries: Record<string, SessionEntry> = {};
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
sessionIds.map(async (sessionId) => {
|
||||||
|
const entry = await loadSessionMeta(sessionId, agentId);
|
||||||
|
if (entry) {
|
||||||
|
entries[sessionId] = entry;
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
@ -342,17 +342,102 @@ export async function updateSessionStoreEntry(params: {
|
|||||||
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
|
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
|
||||||
}): Promise<SessionEntry | null> {
|
}): Promise<SessionEntry | null> {
|
||||||
const { storePath, sessionKey, update } = params;
|
const { storePath, sessionKey, update } = params;
|
||||||
return await withSessionStoreLock(storePath, async () => {
|
|
||||||
const store = loadSessionStore(storePath);
|
// Fast path: read the store without locking to get the session entry
|
||||||
const existing = store[sessionKey];
|
// The store is cached and TTL-validated, so this is cheap
|
||||||
if (!existing) return null;
|
const store = loadSessionStore(storePath);
|
||||||
const patch = await update(existing);
|
const existing = store[sessionKey];
|
||||||
if (!patch) return existing;
|
if (!existing) return null;
|
||||||
const next = mergeSessionEntry(existing, patch);
|
|
||||||
store[sessionKey] = next;
|
// Get the sessionId for per-session file access
|
||||||
await saveSessionStoreUnlocked(storePath, store);
|
const sessionId = existing.sessionId;
|
||||||
return next;
|
if (!sessionId) {
|
||||||
});
|
// Fallback to locked update for legacy entries without sessionId
|
||||||
|
return await withSessionStoreLock(storePath, async () => {
|
||||||
|
const freshStore = loadSessionStore(storePath, { skipCache: true });
|
||||||
|
const freshExisting = freshStore[sessionKey];
|
||||||
|
if (!freshExisting) return null;
|
||||||
|
const patch = await update(freshExisting);
|
||||||
|
if (!patch) return freshExisting;
|
||||||
|
const next = mergeSessionEntry(freshExisting, patch);
|
||||||
|
freshStore[sessionKey] = next;
|
||||||
|
await saveSessionStoreUnlocked(storePath, freshStore);
|
||||||
|
return next;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the patch
|
||||||
|
const patch = await update(existing);
|
||||||
|
if (!patch) return existing;
|
||||||
|
|
||||||
|
// Merge and create the updated entry
|
||||||
|
const next = mergeSessionEntry(existing, patch);
|
||||||
|
|
||||||
|
// Write to per-session meta file (no global lock needed)
|
||||||
|
const { updateSessionMeta } = await import("./per-session-store.js");
|
||||||
|
const agentId = extractAgentIdFromStorePath(storePath);
|
||||||
|
await updateSessionMeta(sessionId, next, agentId);
|
||||||
|
|
||||||
|
// Update the in-memory cache so subsequent reads see the update
|
||||||
|
store[sessionKey] = next;
|
||||||
|
invalidateSessionStoreCache(storePath);
|
||||||
|
|
||||||
|
// Async background sync to sessions.json (debounced, best-effort)
|
||||||
|
debouncedSyncToSessionsJson(storePath, sessionKey, next);
|
||||||
|
|
||||||
|
return next;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper to extract agentId from store path
|
||||||
|
function extractAgentIdFromStorePath(storePath: string): string | undefined {
|
||||||
|
// storePath is like: ~/.openclaw/agents/{agentId}/sessions/sessions.json
|
||||||
|
const match = storePath.match(/agents\/([^/]+)\/sessions/);
|
||||||
|
return match?.[1];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debounced sync to sessions.json to keep it in sync (background, best-effort)
|
||||||
|
const pendingSyncs = new Map<string, { sessionKey: string; entry: SessionEntry }>();
|
||||||
|
let syncTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
function debouncedSyncToSessionsJson(
|
||||||
|
storePath: string,
|
||||||
|
sessionKey: string,
|
||||||
|
entry: SessionEntry,
|
||||||
|
): void {
|
||||||
|
const key = `${storePath}::${sessionKey}`;
|
||||||
|
pendingSyncs.set(key, { sessionKey, entry });
|
||||||
|
|
||||||
|
if (syncTimer) return; // Already scheduled
|
||||||
|
|
||||||
|
syncTimer = setTimeout(async () => {
|
||||||
|
syncTimer = null;
|
||||||
|
const toSync = new Map(pendingSyncs);
|
||||||
|
pendingSyncs.clear();
|
||||||
|
|
||||||
|
// Group by storePath
|
||||||
|
const byStore = new Map<string, Array<{ sessionKey: string; entry: SessionEntry }>>();
|
||||||
|
for (const [key, value] of toSync) {
|
||||||
|
const [sp] = key.split("::");
|
||||||
|
const list = byStore.get(sp) ?? [];
|
||||||
|
list.push(value);
|
||||||
|
byStore.set(sp, list);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch update each store
|
||||||
|
for (const [sp, entries] of byStore) {
|
||||||
|
try {
|
||||||
|
await withSessionStoreLock(sp, async () => {
|
||||||
|
const store = loadSessionStore(sp, { skipCache: true });
|
||||||
|
for (const { sessionKey: sk, entry: e } of entries) {
|
||||||
|
store[sk] = e;
|
||||||
|
}
|
||||||
|
await saveSessionStoreUnlocked(sp, store);
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
// Best-effort sync, ignore errors
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 5000); // 5 second debounce
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function recordSessionMetaFromInbound(params: {
|
export async function recordSessionMetaFromInbound(params: {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user