From 0e4ae1b173b44ed81aaf3e7ea7908885280ba698 Mon Sep 17 00:00:00 2001 From: Bheem Date: Wed, 28 Jan 2026 20:44:11 -0600 Subject: [PATCH] fix: repair unpaired tool calls when loading sessions When a session is loaded with assistant tool calls that never received results (e.g., due to exec timeout, crash, or interruption), the session would become stuck and unresponsive. This fix adds repair logic during session initialization that: - Detects unpaired tool calls in the loaded session history - Injects synthetic error tool results for missing results - Marks the session as unflushed so repairs are persisted This ensures sessions can always recover from incomplete tool executions instead of remaining permanently stuck. Fixes group chat sessions getting stuck when exec commands time out. --- .../session-manager-init.test.ts | 202 ++++++++++++++++++ .../session-manager-init.ts | 61 ++++++ 2 files changed, 263 insertions(+) create mode 100644 src/agents/pi-embedded-runner/session-manager-init.test.ts diff --git a/src/agents/pi-embedded-runner/session-manager-init.test.ts b/src/agents/pi-embedded-runner/session-manager-init.test.ts new file mode 100644 index 000000000..3d10cbf30 --- /dev/null +++ b/src/agents/pi-embedded-runner/session-manager-init.test.ts @@ -0,0 +1,202 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +// Mock logger before importing the module +vi.mock("../../logger.js", () => ({ + logDebug: vi.fn(), + logWarn: vi.fn(), +})); + +import { prepareSessionManagerForRun } from "./session-manager-init.js"; +import { logWarn } from "../../logger.js"; + +describe("prepareSessionManagerForRun", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + describe("tool use/result repair on load", () => { + it("should inject synthetic tool results for unpaired tool calls", async () => { + const sessionManager = { + sessionId: "test-session", + flushed: true, + fileEntries: [ + { type: "session", id: "test-session", cwd: "/test" }, + { + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "Hello" }], + }, + }, + { + type: "message", + message: { + role: "assistant", + content: [ + { type: "text", text: "Let me check that" }, + { + type: "toolCall", + id: "tool-123", + name: "exec", + arguments: { command: "find /Users -name '*.txt'" }, + }, + ], + }, + }, + // Missing toolResult for tool-123! + ], + byId: new Map(), + labelsById: new Map(), + leafId: null, + }; + + await prepareSessionManagerForRun({ + sessionManager, + sessionFile: "/tmp/test-session.jsonl", + hadSessionFile: true, + sessionId: "test-session", + cwd: "/test", + }); + + // Should have added a synthetic tool result + const messages = sessionManager.fileEntries + .filter((e: any) => e.type === "message") + .map((e: any) => e.message); + + const toolResults = messages.filter((m: any) => m.role === "toolResult"); + expect(toolResults.length).toBe(1); + expect(toolResults[0].toolCallId).toBe("tool-123"); + expect(toolResults[0].isError).toBe(true); + expect(toolResults[0].content[0].text).toContain("missing tool result"); + + // Should have logged a warning + expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("Repaired session transcript")); + + // Should mark session as unflushed so repairs are persisted + expect(sessionManager.flushed).toBe(false); + }); + + it("should not modify session if all tool calls have results", async () => { + const sessionManager = { + sessionId: "test-session", + flushed: true, + fileEntries: [ + { type: "session", id: "test-session", cwd: "/test" }, + { + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "Hello" }], + }, + }, + { + type: "message", + message: { + role: "assistant", + content: [ + { + type: "toolCall", + id: "tool-123", + name: "exec", + arguments: { command: "ls" }, + }, + ], + }, + }, + { + type: "message", + message: { + role: "toolResult", + toolCallId: "tool-123", + toolName: "exec", + content: [{ type: "text", text: "file1.txt\nfile2.txt" }], + isError: false, + }, + }, + ], + byId: new Map(), + labelsById: new Map(), + leafId: null, + }; + + await prepareSessionManagerForRun({ + sessionManager, + sessionFile: "/tmp/test-session.jsonl", + hadSessionFile: true, + sessionId: "test-session", + cwd: "/test", + }); + + // Should not have logged a warning + expect(logWarn).not.toHaveBeenCalled(); + + // Session should remain flushed + expect(sessionManager.flushed).toBe(true); + }); + + it("should handle multiple unpaired tool calls", async () => { + const sessionManager = { + sessionId: "test-session", + flushed: true, + fileEntries: [ + { type: "session", id: "test-session", cwd: "/test" }, + { + type: "message", + message: { + role: "assistant", + content: [ + { type: "toolCall", id: "tool-1", name: "exec" }, + { type: "toolCall", id: "tool-2", name: "read" }, + ], + }, + }, + // Missing both tool results! + ], + byId: new Map(), + labelsById: new Map(), + leafId: null, + }; + + await prepareSessionManagerForRun({ + sessionManager, + sessionFile: "/tmp/test-session.jsonl", + hadSessionFile: true, + sessionId: "test-session", + cwd: "/test", + }); + + const messages = sessionManager.fileEntries + .filter((e: any) => e.type === "message") + .map((e: any) => e.message); + + const toolResults = messages.filter((m: any) => m.role === "toolResult"); + expect(toolResults.length).toBe(2); + + const ids = toolResults.map((r: any) => r.toolCallId); + expect(ids).toContain("tool-1"); + expect(ids).toContain("tool-2"); + }); + + it("should not repair new sessions without existing files", async () => { + const sessionManager = { + sessionId: "test-session", + flushed: false, + fileEntries: [{ type: "session", id: undefined, cwd: undefined }], + byId: new Map(), + labelsById: new Map(), + leafId: null, + }; + + await prepareSessionManagerForRun({ + sessionManager, + sessionFile: "/tmp/new-session.jsonl", + hadSessionFile: false, + sessionId: "new-session", + cwd: "/test", + }); + + // Should not have logged anything + expect(logWarn).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/session-manager-init.ts b/src/agents/pi-embedded-runner/session-manager-init.ts index 95c699947..1fb1d561c 100644 --- a/src/agents/pi-embedded-runner/session-manager-init.ts +++ b/src/agents/pi-embedded-runner/session-manager-init.ts @@ -1,5 +1,8 @@ import fs from "node:fs/promises"; +import { repairToolUseResultPairing } from "../session-transcript-repair.js"; +import { logDebug, logWarn } from "../../logger.js"; + type SessionHeaderEntry = { type: "session"; id?: string; cwd?: string }; type SessionMessageEntry = { type: "message"; message?: { role?: string } }; @@ -50,4 +53,62 @@ export async function prepareSessionManagerForRun(params: { sm.leafId = null; sm.flushed = false; } + + // Repair any unpaired tool calls (e.g., from crashed/timed-out sessions). + // This prevents sessions from getting stuck when tool calls never received results. + if (params.hadSessionFile && hasAssistant) { + repairSessionToolPairing(sm, params.sessionFile); + } +} + +/** + * Repair unpaired tool calls in a loaded session by injecting synthetic error results. + * This handles cases where: + * - A tool execution timed out or crashed before returning + * - The session was interrupted mid-execution + * - Tool results were lost due to storage issues + */ +function repairSessionToolPairing( + sm: { + fileEntries: Array; + flushed: boolean; + }, + sessionFile: string, +): void { + // Extract messages from fileEntries + const messages = sm.fileEntries + .filter((e): e is SessionMessageEntry => e.type === "message") + .map((e) => e.message) + .filter((m): m is NonNullable => m != null); + + if (messages.length === 0) return; + + const report = repairToolUseResultPairing(messages as any); + + if ( + report.added.length > 0 || + report.droppedDuplicateCount > 0 || + report.droppedOrphanCount > 0 + ) { + logWarn( + `Repaired session transcript: file=${sessionFile} ` + + `added=${report.added.length} droppedDuplicates=${report.droppedDuplicateCount} ` + + `droppedOrphans=${report.droppedOrphanCount}`, + ); + + // Rebuild fileEntries with repaired messages (nonMessages includes header) + const nonMessages = sm.fileEntries.filter((e) => e.type !== "message"); + + sm.fileEntries = [ + ...nonMessages, + ...report.messages.map((msg) => ({ type: "message" as const, message: msg })), + ]; + + // Mark as unflushed so the repairs get persisted + sm.flushed = false; + + logDebug( + `Session repair details: added synthetic results for tool calls: ${report.added.map((r) => (r as any).toolCallId).join(", ")}`, + ); + } }