fix: repair unpaired tool calls when loading sessions
When a session is loaded with assistant tool calls that never received results (e.g., due to exec timeout, crash, or interruption), the session would become stuck and unresponsive. This fix adds repair logic during session initialization that: - Detects unpaired tool calls in the loaded session history - Injects synthetic error tool results for missing results - Marks the session as unflushed so repairs are persisted This ensures sessions can always recover from incomplete tool executions instead of remaining permanently stuck. Fixes group chat sessions getting stuck when exec commands time out.
This commit is contained in:
parent
4583f88626
commit
0e4ae1b173
202
src/agents/pi-embedded-runner/session-manager-init.test.ts
Normal file
202
src/agents/pi-embedded-runner/session-manager-init.test.ts
Normal file
@ -0,0 +1,202 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
|
||||
// Mock logger before importing the module
|
||||
vi.mock("../../logger.js", () => ({
|
||||
logDebug: vi.fn(),
|
||||
logWarn: vi.fn(),
|
||||
}));
|
||||
|
||||
import { prepareSessionManagerForRun } from "./session-manager-init.js";
|
||||
import { logWarn } from "../../logger.js";
|
||||
|
||||
describe("prepareSessionManagerForRun", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("tool use/result repair on load", () => {
|
||||
it("should inject synthetic tool results for unpaired tool calls", async () => {
|
||||
const sessionManager = {
|
||||
sessionId: "test-session",
|
||||
flushed: true,
|
||||
fileEntries: [
|
||||
{ type: "session", id: "test-session", cwd: "/test" },
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hello" }],
|
||||
},
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "text", text: "Let me check that" },
|
||||
{
|
||||
type: "toolCall",
|
||||
id: "tool-123",
|
||||
name: "exec",
|
||||
arguments: { command: "find /Users -name '*.txt'" },
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
// Missing toolResult for tool-123!
|
||||
],
|
||||
byId: new Map(),
|
||||
labelsById: new Map(),
|
||||
leafId: null,
|
||||
};
|
||||
|
||||
await prepareSessionManagerForRun({
|
||||
sessionManager,
|
||||
sessionFile: "/tmp/test-session.jsonl",
|
||||
hadSessionFile: true,
|
||||
sessionId: "test-session",
|
||||
cwd: "/test",
|
||||
});
|
||||
|
||||
// Should have added a synthetic tool result
|
||||
const messages = sessionManager.fileEntries
|
||||
.filter((e: any) => e.type === "message")
|
||||
.map((e: any) => e.message);
|
||||
|
||||
const toolResults = messages.filter((m: any) => m.role === "toolResult");
|
||||
expect(toolResults.length).toBe(1);
|
||||
expect(toolResults[0].toolCallId).toBe("tool-123");
|
||||
expect(toolResults[0].isError).toBe(true);
|
||||
expect(toolResults[0].content[0].text).toContain("missing tool result");
|
||||
|
||||
// Should have logged a warning
|
||||
expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("Repaired session transcript"));
|
||||
|
||||
// Should mark session as unflushed so repairs are persisted
|
||||
expect(sessionManager.flushed).toBe(false);
|
||||
});
|
||||
|
||||
it("should not modify session if all tool calls have results", async () => {
|
||||
const sessionManager = {
|
||||
sessionId: "test-session",
|
||||
flushed: true,
|
||||
fileEntries: [
|
||||
{ type: "session", id: "test-session", cwd: "/test" },
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "Hello" }],
|
||||
},
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "toolCall",
|
||||
id: "tool-123",
|
||||
name: "exec",
|
||||
arguments: { command: "ls" },
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "toolResult",
|
||||
toolCallId: "tool-123",
|
||||
toolName: "exec",
|
||||
content: [{ type: "text", text: "file1.txt\nfile2.txt" }],
|
||||
isError: false,
|
||||
},
|
||||
},
|
||||
],
|
||||
byId: new Map(),
|
||||
labelsById: new Map(),
|
||||
leafId: null,
|
||||
};
|
||||
|
||||
await prepareSessionManagerForRun({
|
||||
sessionManager,
|
||||
sessionFile: "/tmp/test-session.jsonl",
|
||||
hadSessionFile: true,
|
||||
sessionId: "test-session",
|
||||
cwd: "/test",
|
||||
});
|
||||
|
||||
// Should not have logged a warning
|
||||
expect(logWarn).not.toHaveBeenCalled();
|
||||
|
||||
// Session should remain flushed
|
||||
expect(sessionManager.flushed).toBe(true);
|
||||
});
|
||||
|
||||
it("should handle multiple unpaired tool calls", async () => {
|
||||
const sessionManager = {
|
||||
sessionId: "test-session",
|
||||
flushed: true,
|
||||
fileEntries: [
|
||||
{ type: "session", id: "test-session", cwd: "/test" },
|
||||
{
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "toolCall", id: "tool-1", name: "exec" },
|
||||
{ type: "toolCall", id: "tool-2", name: "read" },
|
||||
],
|
||||
},
|
||||
},
|
||||
// Missing both tool results!
|
||||
],
|
||||
byId: new Map(),
|
||||
labelsById: new Map(),
|
||||
leafId: null,
|
||||
};
|
||||
|
||||
await prepareSessionManagerForRun({
|
||||
sessionManager,
|
||||
sessionFile: "/tmp/test-session.jsonl",
|
||||
hadSessionFile: true,
|
||||
sessionId: "test-session",
|
||||
cwd: "/test",
|
||||
});
|
||||
|
||||
const messages = sessionManager.fileEntries
|
||||
.filter((e: any) => e.type === "message")
|
||||
.map((e: any) => e.message);
|
||||
|
||||
const toolResults = messages.filter((m: any) => m.role === "toolResult");
|
||||
expect(toolResults.length).toBe(2);
|
||||
|
||||
const ids = toolResults.map((r: any) => r.toolCallId);
|
||||
expect(ids).toContain("tool-1");
|
||||
expect(ids).toContain("tool-2");
|
||||
});
|
||||
|
||||
it("should not repair new sessions without existing files", async () => {
|
||||
const sessionManager = {
|
||||
sessionId: "test-session",
|
||||
flushed: false,
|
||||
fileEntries: [{ type: "session", id: undefined, cwd: undefined }],
|
||||
byId: new Map(),
|
||||
labelsById: new Map(),
|
||||
leafId: null,
|
||||
};
|
||||
|
||||
await prepareSessionManagerForRun({
|
||||
sessionManager,
|
||||
sessionFile: "/tmp/new-session.jsonl",
|
||||
hadSessionFile: false,
|
||||
sessionId: "new-session",
|
||||
cwd: "/test",
|
||||
});
|
||||
|
||||
// Should not have logged anything
|
||||
expect(logWarn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -1,5 +1,8 @@
|
||||
import fs from "node:fs/promises";
|
||||
|
||||
import { repairToolUseResultPairing } from "../session-transcript-repair.js";
|
||||
import { logDebug, logWarn } from "../../logger.js";
|
||||
|
||||
type SessionHeaderEntry = { type: "session"; id?: string; cwd?: string };
|
||||
type SessionMessageEntry = { type: "message"; message?: { role?: string } };
|
||||
|
||||
@ -50,4 +53,62 @@ export async function prepareSessionManagerForRun(params: {
|
||||
sm.leafId = null;
|
||||
sm.flushed = false;
|
||||
}
|
||||
|
||||
// Repair any unpaired tool calls (e.g., from crashed/timed-out sessions).
|
||||
// This prevents sessions from getting stuck when tool calls never received results.
|
||||
if (params.hadSessionFile && hasAssistant) {
|
||||
repairSessionToolPairing(sm, params.sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Repair unpaired tool calls in a loaded session by injecting synthetic error results.
|
||||
* This handles cases where:
|
||||
* - A tool execution timed out or crashed before returning
|
||||
* - The session was interrupted mid-execution
|
||||
* - Tool results were lost due to storage issues
|
||||
*/
|
||||
function repairSessionToolPairing(
|
||||
sm: {
|
||||
fileEntries: Array<SessionHeaderEntry | SessionMessageEntry | { type: string }>;
|
||||
flushed: boolean;
|
||||
},
|
||||
sessionFile: string,
|
||||
): void {
|
||||
// Extract messages from fileEntries
|
||||
const messages = sm.fileEntries
|
||||
.filter((e): e is SessionMessageEntry => e.type === "message")
|
||||
.map((e) => e.message)
|
||||
.filter((m): m is NonNullable<typeof m> => m != null);
|
||||
|
||||
if (messages.length === 0) return;
|
||||
|
||||
const report = repairToolUseResultPairing(messages as any);
|
||||
|
||||
if (
|
||||
report.added.length > 0 ||
|
||||
report.droppedDuplicateCount > 0 ||
|
||||
report.droppedOrphanCount > 0
|
||||
) {
|
||||
logWarn(
|
||||
`Repaired session transcript: file=${sessionFile} ` +
|
||||
`added=${report.added.length} droppedDuplicates=${report.droppedDuplicateCount} ` +
|
||||
`droppedOrphans=${report.droppedOrphanCount}`,
|
||||
);
|
||||
|
||||
// Rebuild fileEntries with repaired messages (nonMessages includes header)
|
||||
const nonMessages = sm.fileEntries.filter((e) => e.type !== "message");
|
||||
|
||||
sm.fileEntries = [
|
||||
...nonMessages,
|
||||
...report.messages.map((msg) => ({ type: "message" as const, message: msg })),
|
||||
];
|
||||
|
||||
// Mark as unflushed so the repairs get persisted
|
||||
sm.flushed = false;
|
||||
|
||||
logDebug(
|
||||
`Session repair details: added synthetic results for tool calls: ${report.added.map((r) => (r as any).toolCallId).join(", ")}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user