diff --git a/src/auto-reply/reply/agent-runner.memory-flush.test.ts b/src/auto-reply/reply/agent-runner.memory-flush.test.ts index 9d1b86178..0b6b2a803 100644 --- a/src/auto-reply/reply/agent-runner.memory-flush.test.ts +++ b/src/auto-reply/reply/agent-runner.memory-flush.test.ts @@ -10,9 +10,15 @@ import type { FollowupRun, QueueSettings } from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; const runEmbeddedPiAgentMock = vi.fn(); +const runCliAgentMock = vi.fn(); type EmbeddedRunParams = { prompt?: string; + extraSystemPrompt?: string; + onAgentEvent?: (evt: { + stream?: string; + data?: { phase?: string; willRetry?: boolean }; + }) => void; }; vi.mock("../../agents/model-fallback.js", () => ({ @@ -31,6 +37,10 @@ vi.mock("../../agents/model-fallback.js", () => ({ }), })); +vi.mock("../../agents/cli-runner.js", () => ({ + runCliAgent: (params: unknown) => runCliAgentMock(params), +})); + vi.mock("../../agents/pi-embedded.js", () => ({ queueEmbeddedPiMessage: vi.fn().mockReturnValue(false), runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), @@ -65,6 +75,7 @@ function createBaseRun(params: { storePath: string; sessionEntry: Record; config?: Record; + runOverrides?: Partial; }) { const typing = createMockTypingController(); const sessionCtx = { @@ -79,6 +90,8 @@ function createBaseRun(params: { summaryLine: "hello", enqueuedAt: Date.now(), run: { + agentId: "main", + agentDir: "/tmp/agent", sessionId: "session", sessionKey: "main", messageProvider: "whatsapp", @@ -100,17 +113,23 @@ function createBaseRun(params: { blockReplyBreak: "message_end", }, } as unknown as FollowupRun; + const run = { + ...followupRun.run, + ...params.runOverrides, + config: params.config ?? followupRun.run.config, + }; return { typing, sessionCtx, resolvedQueue, - followupRun, + followupRun: { ...followupRun, run }, }; } describe("runReplyAgent memory flush", () => { it("runs a memory flush turn and updates session metadata", async () => { + runEmbeddedPiAgentMock.mockReset(); const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); const storePath = path.join(tmp, "sessions.json"); const sessionKey = "main"; @@ -177,7 +196,282 @@ describe("runReplyAgent memory flush", () => { expect(stored[sessionKey].memoryFlushCompactionCount).toBe(1); }); + it("skips memory flush when disabled in config", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation( + async (_params: EmbeddedRunParams) => ({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }), + ); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { compaction: { memoryFlush: { enabled: false } } }, + }, + }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); + const call = runEmbeddedPiAgentMock.mock.calls[0]?.[0] as + | { prompt?: string } + | undefined; + expect(call?.prompt).toBe("hello"); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); + }); + + it("skips memory flush for CLI providers", async () => { + runEmbeddedPiAgentMock.mockReset(); + runCliAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation( + async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }, + ); + runCliAgentMock.mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + runOverrides: { provider: "codex-cli" }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(runCliAgentMock).toHaveBeenCalledTimes(1); + const call = runCliAgentMock.mock.calls[0]?.[0] as + | { prompt?: string } + | undefined; + expect(call?.prompt).toBe("hello"); + expect(runEmbeddedPiAgentMock).not.toHaveBeenCalled(); + }); + + it("uses configured prompts for memory flush runs", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array = []; + runEmbeddedPiAgentMock.mockImplementation( + async (params: EmbeddedRunParams) => { + calls.push(params); + if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }, + ); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + compaction: { + memoryFlush: { + prompt: "Write notes.", + systemPrompt: "Flush memory now.", + }, + }, + }, + }, + }, + runOverrides: { extraSystemPrompt: "extra system" }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const flushCall = calls[0]; + expect(flushCall?.prompt).toContain("Write notes."); + expect(flushCall?.prompt).toContain("NO_REPLY"); + expect(flushCall?.extraSystemPrompt).toContain("extra system"); + expect(flushCall?.extraSystemPrompt).toContain("Flush memory now."); + expect(flushCall?.extraSystemPrompt).toContain("NO_REPLY"); + expect(calls[1]?.prompt).toBe("hello"); + }); + + it("skips memory flush after a prior flush in the same compaction cycle", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 2, + memoryFlushCompactionCount: 2, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation( + async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }, + ); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual(["hello"]); + }); + it("skips memory flush when the sandbox workspace is read-only", async () => { + runEmbeddedPiAgentMock.mockReset(); const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); const storePath = path.join(tmp, "sessions.json"); const sessionKey = "main"; @@ -243,4 +537,134 @@ describe("runReplyAgent memory flush", () => { const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); expect(stored[sessionKey].memoryFlushAt).toBeUndefined(); }); + + it("skips memory flush when the sandbox workspace is none", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + const calls: Array<{ prompt?: string }> = []; + runEmbeddedPiAgentMock.mockImplementation( + async (params: EmbeddedRunParams) => { + calls.push({ prompt: params.prompt }); + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }, + ); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config: { + agents: { + defaults: { + sandbox: { mode: "all", workspaceAccess: "none" }, + }, + }, + }, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + expect(calls.map((call) => call.prompt)).toEqual(["hello"]); + }); + + it("increments compaction count when flush compaction completes", async () => { + runEmbeddedPiAgentMock.mockReset(); + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-flush-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 80_000, + compactionCount: 1, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockImplementation( + async (params: EmbeddedRunParams) => { + if (params.prompt === DEFAULT_MEMORY_FLUSH_PROMPT) { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false }, + }); + return { payloads: [], meta: {} }; + } + return { + payloads: [{ text: "ok" }], + meta: { agentMeta: { usage: { input: 1, output: 1 } } }, + }; + }, + ); + + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 100_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(2); + expect(stored[sessionKey].memoryFlushCompactionCount).toBe(2); + }); }); diff --git a/src/auto-reply/reply/memory-flush.test.ts b/src/auto-reply/reply/memory-flush.test.ts index 8f8aae2fb..a9d056cb3 100644 --- a/src/auto-reply/reply/memory-flush.test.ts +++ b/src/auto-reply/reply/memory-flush.test.ts @@ -56,6 +56,17 @@ describe("shouldRunMemoryFlush", () => { ).toBe(false); }); + it("skips when entry is missing", () => { + expect( + shouldRunMemoryFlush({ + entry: undefined, + contextWindowTokens: 16_000, + reserveTokensFloor: 1_000, + softThresholdTokens: DEFAULT_MEMORY_FLUSH_SOFT_TOKENS, + }), + ).toBe(false); + }); + it("skips when under threshold", () => { expect( shouldRunMemoryFlush({ @@ -67,6 +78,17 @@ describe("shouldRunMemoryFlush", () => { ).toBe(false); }); + it("triggers at the threshold boundary", () => { + expect( + shouldRunMemoryFlush({ + entry: { totalTokens: 85 }, + contextWindowTokens: 100, + reserveTokensFloor: 10, + softThresholdTokens: 5, + }), + ).toBe(true); + }); + it("skips when already flushed for current compaction count", () => { expect( shouldRunMemoryFlush({