diff --git a/src/agents/session-write-lock.test.ts b/src/agents/session-write-lock.test.ts index 27e793d5a..4c5d5978e 100644 --- a/src/agents/session-write-lock.test.ts +++ b/src/agents/session-write-lock.test.ts @@ -3,7 +3,11 @@ import os from "node:os"; import path from "node:path"; import { describe, expect, it } from "vitest"; -import { __testing, acquireSessionWriteLock } from "./session-write-lock.js"; +import { + __testing, + acquireSessionWriteLock, + releaseAllSessionWriteLocks, +} from "./session-write-lock.js"; describe("acquireSessionWriteLock", () => { it("reuses locks across symlinked session paths", async () => { @@ -159,4 +163,150 @@ describe("acquireSessionWriteLock", () => { expect(process.listeners("SIGINT")).toContain(keepAlive); process.off("SIGINT", keepAlive); }); + + it("reclaims lock with same PID but different nonce (stale from prior boot)", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-lock-nonce-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + + // Simulate a lock left by a previous boot iteration: same PID, different nonce + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + nonce: "stale-nonce-from-previous-boot", + createdAt: new Date().toISOString(), + }), + "utf8", + ); + + // Should reclaim the lock despite same PID and fresh createdAt + const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 2000 }); + const raw = await fs.readFile(lockPath, "utf8"); + const payload = JSON.parse(raw) as { pid: number; nonce: string }; + + expect(payload.pid).toBe(process.pid); + expect(payload.nonce).toBe(__testing.instanceNonce); + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("does not reclaim lock with matching nonce (held by current boot)", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-lock-nonce-current-")); + try { + const sessionFile = path.join(root, "sessions.json"); + + // Acquire normally — writes current nonce + const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + // A second acquire on a different normalized path pointing to the same + // file should reuse the in-memory held lock (already tested elsewhere), + // but if we manually simulate a contention scenario by NOT going through + // the in-memory path, the nonce check should NOT treat our own lock as + // stale. We verify indirectly: the lock file should contain our nonce. + const lockPath = `${sessionFile}.lock`; + const raw = await fs.readFile(lockPath, "utf8"); + const payload = JSON.parse(raw) as { pid: number; nonce: string }; + + expect(payload.nonce).toBe(__testing.instanceNonce); + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("falls back to pid+staleMs for locks without a nonce (backward compat)", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-lock-no-nonce-")); + try { + const sessionFile = path.join(root, "sessions.json"); + const lockPath = `${sessionFile}.lock`; + + // Old-format lock: no nonce, same PID, recent timestamp + // This should NOT be reclaimed (backward compat: no nonce → skip nonce check) + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + createdAt: new Date().toISOString(), + }), + "utf8", + ); + + // With a short timeout this should fail — lock appears valid (same PID, alive, not stale) + await expect( + acquireSessionWriteLock({ sessionFile, timeoutMs: 200, staleMs: 60_000 }), + ).rejects.toThrow(/locked/); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("rotates instance nonce on releaseAllSessionWriteLocks", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-lock-nonce-rotate-")); + try { + const sessionFile = path.join(root, "sessions.json"); + + // Acquire a lock — captures current nonce + const nonceBefore = __testing.instanceNonce; + await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); + + // Release all — should rotate the nonce + await releaseAllSessionWriteLocks(); + const nonceAfter = __testing.instanceNonce; + + expect(nonceAfter).not.toBe(nonceBefore); + + // A stale lock file with the OLD nonce should now be reclaimable + const lockPath = `${sessionFile}.lock`; + await fs.writeFile( + lockPath, + JSON.stringify({ + pid: process.pid, + nonce: nonceBefore, + createdAt: new Date().toISOString(), + }), + "utf8", + ); + + const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 2000 }); + const raw = await fs.readFile(lockPath, "utf8"); + const payload = JSON.parse(raw) as { pid: number; nonce: string }; + expect(payload.nonce).toBe(nonceAfter); + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); + + it("releaseAllSessionWriteLocks removes all held locks", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "moltbot-lock-release-all-")); + try { + const sessionA = path.join(root, "a.json"); + const sessionB = path.join(root, "b.json"); + const lockA = `${sessionA}.lock`; + const lockB = `${sessionB}.lock`; + + await acquireSessionWriteLock({ sessionFile: sessionA, timeoutMs: 500 }); + await acquireSessionWriteLock({ sessionFile: sessionB, timeoutMs: 500 }); + + // Both lock files should exist + await expect(fs.access(lockA)).resolves.toBeUndefined(); + await expect(fs.access(lockB)).resolves.toBeUndefined(); + + await releaseAllSessionWriteLocks(); + + // Both lock files should be removed + await expect(fs.access(lockA)).rejects.toThrow(); + await expect(fs.access(lockB)).rejects.toThrow(); + + // Should be able to re-acquire after release + const lock = await acquireSessionWriteLock({ sessionFile: sessionA, timeoutMs: 500 }); + await lock.release(); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } + }); }); diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts index 82a2428da..3080b0148 100644 --- a/src/agents/session-write-lock.ts +++ b/src/agents/session-write-lock.ts @@ -1,10 +1,28 @@ +import { randomBytes } from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; +/** + * Instance nonce — regenerated each time the gateway server starts within + * the process. ESM modules are cached for the process lifetime, so this + * must be mutable and explicitly reset via `resetInstanceNonce()` during + * shutdown. After an in-process restart (SIGUSR1), lock files written by + * the previous server iteration carry a stale nonce, letting us detect + * them even when the PID hasn't changed (common in containers where + * PID = 1). + */ +let instanceNonce: string = randomBytes(12).toString("hex"); + +function resetInstanceNonce(): void { + instanceNonce = randomBytes(12).toString("hex"); +} + type LockFilePayload = { pid: number; createdAt: string; + /** Instance nonce — absent in lock files written by older versions. */ + nonce?: string; }; type HeldLock = { @@ -93,7 +111,11 @@ async function readLockPayload(lockPath: string): Promise; if (typeof parsed.pid !== "number") return null; if (typeof parsed.createdAt !== "string") return null; - return { pid: parsed.pid, createdAt: parsed.createdAt }; + return { + pid: parsed.pid, + createdAt: parsed.createdAt, + nonce: typeof parsed.nonce === "string" ? parsed.nonce : undefined, + }; } catch { return null; } @@ -144,7 +166,11 @@ export async function acquireSessionWriteLock(params: { try { const handle = await fs.open(lockPath, "wx"); await handle.writeFile( - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }, null, 2), + JSON.stringify( + { pid: process.pid, nonce: instanceNonce, createdAt: new Date().toISOString() }, + null, + 2, + ), "utf8", ); HELD_LOCKS.set(normalizedSessionFile, { count: 1, handle, lockPath }); @@ -166,7 +192,18 @@ export async function acquireSessionWriteLock(params: { const createdAt = payload?.createdAt ? Date.parse(payload.createdAt) : NaN; const stale = !Number.isFinite(createdAt) || Date.now() - createdAt > staleMs; const alive = payload?.pid ? isAlive(payload.pid) : false; - if (stale || !alive) { + + // Nonce mismatch: lock was written by a previous server iteration of the + // same process (e.g. PID 1 in a container after SIGUSR1). The old + // iteration is gone even though the PID is still alive, so treat the + // lock as stale. If the lock has no nonce (written by an older version) + // we fall through to the existing pid+staleMs checks for backward compat. + const nonceMismatch = + payload?.nonce !== undefined && + payload.nonce !== instanceNonce && + payload.pid === process.pid; + + if (stale || !alive || nonceMismatch) { await fs.rm(lockPath, { force: true }); continue; } @@ -181,8 +218,41 @@ export async function acquireSessionWriteLock(params: { throw new Error(`session file locked (timeout ${timeoutMs}ms): ${owner} ${lockPath}`); } +/** + * Release all held session write locks. + * + * Call this during gateway shutdown (e.g., before an in-process SIGUSR1 + * restart) to ensure on-disk `.lock` files are removed. Without this, + * locks owned by PID 1 survive an in-process restart because + * `isAlive(1)` returns true for the same process, making the lock + * appear valid for up to `staleMs` (30 min). + */ +export async function releaseAllSessionWriteLocks(): Promise { + for (const [sessionFile, held] of HELD_LOCKS) { + try { + await held.handle.close(); + } catch { + // Best effort - handle may already be closed. + } + try { + await fs.rm(held.lockPath, { force: true }); + } catch { + // Best effort - file may already be removed. + } + HELD_LOCKS.delete(sessionFile); + } + // Rotate the instance nonce AFTER all locks are cleaned up, so that any + // lock files that survive this cleanup (e.g. due to fs errors) are + // detectable as stale by the next server iteration. Rotating before + // cleanup would let a concurrent acquirer see the old nonce as stale + // and reclaim a lock we haven't finished releasing yet. + resetInstanceNonce(); +} export const __testing = { cleanupSignals: [...CLEANUP_SIGNALS], handleTerminationSignal, releaseAllLocksSync, + get instanceNonce() { + return instanceNonce; + }, }; diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index da9f5a39e..dd2996fb5 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -1,5 +1,6 @@ import type { Server as HttpServer } from "node:http"; import type { WebSocketServer } from "ws"; +import { releaseAllSessionWriteLocks } from "../agents/session-write-lock.js"; import type { CanvasHostHandler, CanvasHostServer } from "../canvas-host/server.js"; import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js"; import { stopGmailWatcher } from "../hooks/gmail-watcher.js"; @@ -96,6 +97,20 @@ export function createGatewayCloseHandler(params: { } } params.chatRunState.clear(); + + // Release all held session write locks before restarting. + // Without this, in-process restarts (SIGUSR1) leave on-disk .lock files + // owned by PID 1, which appear valid to the new server iteration because + // isAlive(1) returns true (same process). This blocks session access for + // up to staleMs (30 min) until the lock expires. + try { + await releaseAllSessionWriteLocks(); + } catch { + // Best effort — don't let lock cleanup failure prevent shutdown. + // The instance nonce mechanism will detect surviving stale locks + // on the next server iteration. + } + for (const c of params.clients) { try { c.socket.close(1012, "service restart");