diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 7886ed242..8062d25ea 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -57,81 +57,193 @@ const withSessionStore = async ( } }; describe("gateway server chat", () => { - test("handles history, abort, idempotency, and ordering flows", { timeout: 60_000 }, async () => { - const tempDirs: string[] = []; - const { server, ws } = await startServerWithClient(); - const spy = vi.mocked(agentCommand); - const resetSpy = () => { - spy.mockReset(); - spy.mockResolvedValue(undefined); - }; - try { - await connectOk(ws); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async (historyDir) => { - const bigText = "x".repeat(200_000); - const largeLines: string[] = []; - for (let i = 0; i < 40; i += 1) { - largeLines.push( - JSON.stringify({ - message: { - role: "user", - content: [{ type: "text", text: `${i}:${bigText}` }], - timestamp: Date.now() + i, - }, - }), + const timeoutMs = process.platform === "win32" ? 120_000 : 60_000; + test( + "handles history, abort, idempotency, and ordering flows", + { timeout: timeoutMs }, + async () => { + const tempDirs: string[] = []; + const { server, ws } = await startServerWithClient(); + const spy = vi.mocked(agentCommand); + const resetSpy = () => { + spy.mockReset(); + spy.mockResolvedValue(undefined); + }; + try { + await connectOk(ws); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async (historyDir) => { + const bigText = "x".repeat(200_000); + const largeLines: string[] = []; + for (let i = 0; i < 40; i += 1) { + largeLines.push( + JSON.stringify({ + message: { + role: "user", + content: [{ type: "text", text: `${i}:${bigText}` }], + timestamp: Date.now() + i, + }, + }), + ); + } + await fs.writeFile( + path.join(historyDir, "sess-main.jsonl"), + largeLines.join("\n"), + "utf-8", ); - } - await fs.writeFile( - path.join(historyDir, "sess-main.jsonl"), - largeLines.join("\n"), - "utf-8", - ); - const cappedRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { - sessionKey: "main", - limit: 1000, - }); - expect(cappedRes.ok).toBe(true); - const cappedMsgs = cappedRes.payload?.messages ?? []; - const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8"); - expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024); - expect(cappedMsgs.length).toBeLessThan(60); - }, - ); - await withSessionStore( - tempDirs, - { - main: { - sessionId: "sess-main", - updatedAt: Date.now(), - lastChannel: "whatsapp", - lastTo: "+1555", + const cappedRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { + sessionKey: "main", + limit: 1000, + }); + expect(cappedRes.ok).toBe(true); + const cappedMsgs = cappedRes.payload?.messages ?? []; + const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8"); + expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024); + expect(cappedMsgs.length).toBeLessThan(60); }, - }, - async () => { - const routeRes = await rpcReq(ws, "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-route", - }); - expect(routeRes.ok).toBe(true); - const stored = JSON.parse( - await fs.readFile(testState.sessionStorePath as string, "utf-8"), - ) as Record; - expect(stored["agent:main:main"]?.lastChannel).toBe("whatsapp"); - expect(stored["agent:main:main"]?.lastTo).toBe("+1555"); - }, - ); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - resetSpy(); - let abortInFlight: Promise | undefined; - try { - const callsBefore = spy.mock.calls.length; + ); + await withSessionStore( + tempDirs, + { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + lastChannel: "whatsapp", + lastTo: "+1555", + }, + }, + async () => { + const routeRes = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-route", + }); + expect(routeRes.ok).toBe(true); + const stored = JSON.parse( + await fs.readFile(testState.sessionStorePath as string, "utf-8"), + ) as Record; + expect(stored["agent:main:main"]?.lastChannel).toBe("whatsapp"); + expect(stored["agent:main:main"]?.lastTo).toBe("+1555"); + }, + ); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + resetSpy(); + let abortInFlight: Promise | undefined; + try { + const callsBefore = spy.mock.calls.length; + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + const sendResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-abort-1", + 8000, + ); + const abortResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "abort-1", + 8000, + ); + const abortedEventP = onceMessage( + ws, + (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", + 8000, + ); + abortInFlight = Promise.allSettled([sendResP, abortResP, abortedEventP]); + sendReq(ws, "send-abort-1", "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-1", + timeoutMs: 30_000, + }); + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + await new Promise((resolve, reject) => { + const deadline = Date.now() + 1000; + const tick = () => { + if (spy.mock.calls.length > callsBefore) return resolve(); + if (Date.now() > deadline) + return reject(new Error("timeout waiting for agentCommand")); + setTimeout(tick, 5); + }; + tick(); + }); + sendReq(ws, "abort-1", "chat.abort", { + sessionKey: "main", + runId: "idem-abort-1", + }); + const abortRes = await abortResP; + expect(abortRes.ok).toBe(true); + const evt = await abortedEventP; + expect(evt.payload?.runId).toBe("idem-abort-1"); + expect(evt.payload?.sessionKey).toBe("main"); + } finally { + await abortInFlight; + } + }, + ); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + sessionStoreSaveDelayMs.value = 120; + resetSpy(); + try { + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + const abortedEventP = onceMessage( + ws, + (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", + ); + const sendResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-abort-save-1", + ); + sendReq(ws, "send-abort-save-1", "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-save-1", + timeoutMs: 30_000, + }); + const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-save-1"); + sendReq(ws, "abort-save-1", "chat.abort", { + sessionKey: "main", + runId: "idem-abort-save-1", + }); + const abortRes = await abortResP; + expect(abortRes.ok).toBe(true); + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + const evt = await abortedEventP; + expect(evt.payload?.runId).toBe("idem-abort-save-1"); + expect(evt.payload?.sessionKey).toBe("main"); + } finally { + sessionStoreSaveDelayMs.value = 0; + } + }, + ); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + resetSpy(); + const callsBeforeStop = spy.mock.calls.length; spy.mockImplementationOnce(async (opts) => { const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; await new Promise((resolve) => { @@ -140,383 +252,284 @@ describe("gateway server chat", () => { signal.addEventListener("abort", () => resolve(), { once: true }); }); }); - const sendResP = onceMessage( + const stopSendResP = onceMessage( ws, - (o) => o.type === "res" && o.id === "send-abort-1", + (o) => o.type === "res" && o.id === "send-stop-1", 8000, ); - const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-1", 8000); - const abortedEventP = onceMessage( + sendReq(ws, "send-stop-1", "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-stop-run", + }); + const stopSendRes = await stopSendResP; + expect(stopSendRes.ok).toBe(true); + await waitFor(() => spy.mock.calls.length > callsBeforeStop); + const abortedStopEventP = onceMessage( ws, - (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", + (o) => + o.type === "event" && + o.event === "chat" && + o.payload?.state === "aborted" && + o.payload?.runId === "idem-stop-run", 8000, ); - abortInFlight = Promise.allSettled([sendResP, abortResP, abortedEventP]); - sendReq(ws, "send-abort-1", "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-abort-1", - timeoutMs: 30_000, - }); - const sendRes = await sendResP; - expect(sendRes.ok).toBe(true); - await new Promise((resolve, reject) => { - const deadline = Date.now() + 1000; - const tick = () => { - if (spy.mock.calls.length > callsBefore) return resolve(); - if (Date.now() > deadline) - return reject(new Error("timeout waiting for agentCommand")); - setTimeout(tick, 5); - }; - tick(); - }); - sendReq(ws, "abort-1", "chat.abort", { - sessionKey: "main", - runId: "idem-abort-1", - }); - const abortRes = await abortResP; - expect(abortRes.ok).toBe(true); - const evt = await abortedEventP; - expect(evt.payload?.runId).toBe("idem-abort-1"); - expect(evt.payload?.sessionKey).toBe("main"); - } finally { - await abortInFlight; - } - }, - ); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - sessionStoreSaveDelayMs.value = 120; - resetSpy(); - try { - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; - await new Promise((resolve) => { - if (!signal) return resolve(); - if (signal.aborted) return resolve(); - signal.addEventListener("abort", () => resolve(), { once: true }); - }); - }); - const abortedEventP = onceMessage( + const stopResP = onceMessage( ws, - (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted", + (o) => o.type === "res" && o.id === "send-stop-2", + 8000, ); - const sendResP = onceMessage( - ws, - (o) => o.type === "res" && o.id === "send-abort-save-1", - ); - sendReq(ws, "send-abort-save-1", "chat.send", { + sendReq(ws, "send-stop-2", "chat.send", { sessionKey: "main", - message: "hello", - idempotencyKey: "idem-abort-save-1", - timeoutMs: 30_000, + message: "/stop", + idempotencyKey: "idem-stop-req", }); - const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-save-1"); - sendReq(ws, "abort-save-1", "chat.abort", { - sessionKey: "main", - runId: "idem-abort-save-1", - }); - const abortRes = await abortResP; - expect(abortRes.ok).toBe(true); - const sendRes = await sendResP; - expect(sendRes.ok).toBe(true); - const evt = await abortedEventP; - expect(evt.payload?.runId).toBe("idem-abort-save-1"); - expect(evt.payload?.sessionKey).toBe("main"); - } finally { - sessionStoreSaveDelayMs.value = 0; - } - }, - ); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - resetSpy(); - const callsBeforeStop = spy.mock.calls.length; - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; - await new Promise((resolve) => { - if (!signal) return resolve(); - if (signal.aborted) return resolve(); - signal.addEventListener("abort", () => resolve(), { once: true }); - }); - }); - const stopSendResP = onceMessage( - ws, - (o) => o.type === "res" && o.id === "send-stop-1", - 8000, - ); - sendReq(ws, "send-stop-1", "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-stop-run", - }); - const stopSendRes = await stopSendResP; - expect(stopSendRes.ok).toBe(true); - await waitFor(() => spy.mock.calls.length > callsBeforeStop); - const abortedStopEventP = onceMessage( - ws, - (o) => - o.type === "event" && - o.event === "chat" && - o.payload?.state === "aborted" && - o.payload?.runId === "idem-stop-run", - 8000, - ); - const stopResP = onceMessage(ws, (o) => o.type === "res" && o.id === "send-stop-2", 8000); - sendReq(ws, "send-stop-2", "chat.send", { - sessionKey: "main", - message: "/stop", - idempotencyKey: "idem-stop-req", - }); - const stopRes = await stopResP; - expect(stopRes.ok).toBe(true); - const stopEvt = await abortedStopEventP; - expect(stopEvt.payload?.sessionKey).toBe("main"); - expect(spy.mock.calls.length).toBe(callsBeforeStop + 1); - }, - ); - resetSpy(); - let resolveRun: (() => void) | undefined; - const runDone = new Promise((resolve) => { - resolveRun = resolve; - }); - spy.mockImplementationOnce(async () => { - await runDone; - }); - const started = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-status-1", - }); - expect(started.ok).toBe(true); - expect(started.payload?.status).toBe("started"); - const inFlightRes = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-status-1", - }); - expect(inFlightRes.ok).toBe(true); - expect(inFlightRes.payload?.status).toBe("in_flight"); - resolveRun?.(); - let completed = false; - for (let i = 0; i < 50; i++) { - const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { + const stopRes = await stopResP; + expect(stopRes.ok).toBe(true); + const stopEvt = await abortedStopEventP; + expect(stopEvt.payload?.sessionKey).toBe("main"); + expect(spy.mock.calls.length).toBe(callsBeforeStop + 1); + }, + ); + resetSpy(); + let resolveRun: (() => void) | undefined; + const runDone = new Promise((resolve) => { + resolveRun = resolve; + }); + spy.mockImplementationOnce(async () => { + await runDone; + }); + const started = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { sessionKey: "main", message: "hello", idempotencyKey: "idem-status-1", }); - if (again.ok && again.payload?.status === "ok") { - completed = true; - break; - } - await new Promise((r) => setTimeout(r, 10)); - } - expect(completed).toBe(true); - resetSpy(); - spy.mockImplementationOnce(async (opts) => { - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; - await new Promise((resolve) => { - if (!signal) return resolve(); - if (signal.aborted) return resolve(); - signal.addEventListener("abort", () => resolve(), { once: true }); + expect(started.ok).toBe(true); + expect(started.payload?.status).toBe("started"); + const inFlightRes = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-status-1", }); - }); - const abortedEventP = onceMessage( - ws, - (o) => - o.type === "event" && - o.event === "chat" && - o.payload?.state === "aborted" && - o.payload?.runId === "idem-abort-all-1", - ); - const startedAbortAll = await rpcReq(ws, "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-abort-all-1", - }); - expect(startedAbortAll.ok).toBe(true); - const abortRes = await rpcReq<{ - ok?: boolean; - aborted?: boolean; - runIds?: string[]; - }>(ws, "chat.abort", { sessionKey: "main" }); - expect(abortRes.ok).toBe(true); - expect(abortRes.payload?.aborted).toBe(true); - expect(abortRes.payload?.runIds ?? []).toContain("idem-abort-all-1"); - await abortedEventP; - const noDeltaP = onceMessage( - ws, - (o) => - o.type === "event" && - o.event === "chat" && - (o.payload?.state === "delta" || o.payload?.state === "final") && - o.payload?.runId === "idem-abort-all-1", - 250, - ); - emitAgentEvent({ - runId: "idem-abort-all-1", - stream: "assistant", - data: { text: "should be suppressed" }, - }); - emitAgentEvent({ - runId: "idem-abort-all-1", - stream: "lifecycle", - data: { phase: "end" }, - }); - await expect(noDeltaP).rejects.toThrow(/timeout/i); - await withSessionStore(tempDirs, {}, async () => { - const abortUnknown = await rpcReq<{ + expect(inFlightRes.ok).toBe(true); + expect(inFlightRes.payload?.status).toBe("in_flight"); + resolveRun?.(); + let completed = false; + for (let i = 0; i < 50; i++) { + const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-status-1", + }); + if (again.ok && again.payload?.status === "ok") { + completed = true; + break; + } + await new Promise((r) => setTimeout(r, 10)); + } + expect(completed).toBe(true); + resetSpy(); + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + const abortedEventP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + o.payload?.state === "aborted" && + o.payload?.runId === "idem-abort-all-1", + ); + const startedAbortAll = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-abort-all-1", + }); + expect(startedAbortAll.ok).toBe(true); + const abortRes = await rpcReq<{ ok?: boolean; aborted?: boolean; - }>(ws, "chat.abort", { sessionKey: "main", runId: "missing-run" }); - expect(abortUnknown.ok).toBe(true); - expect(abortUnknown.payload?.aborted).toBe(false); - }); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - resetSpy(); - let agentStartedResolve: (() => void) | undefined; - const agentStartedP = new Promise((resolve) => { - agentStartedResolve = resolve; - }); - spy.mockImplementationOnce(async (opts) => { - agentStartedResolve?.(); - const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; - await new Promise((resolve) => { - if (!signal) return resolve(); - if (signal.aborted) return resolve(); - signal.addEventListener("abort", () => resolve(), { once: true }); + runIds?: string[]; + }>(ws, "chat.abort", { sessionKey: "main" }); + expect(abortRes.ok).toBe(true); + expect(abortRes.payload?.aborted).toBe(true); + expect(abortRes.payload?.runIds ?? []).toContain("idem-abort-all-1"); + await abortedEventP; + const noDeltaP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + (o.payload?.state === "delta" || o.payload?.state === "final") && + o.payload?.runId === "idem-abort-all-1", + 250, + ); + emitAgentEvent({ + runId: "idem-abort-all-1", + stream: "assistant", + data: { text: "should be suppressed" }, + }); + emitAgentEvent({ + runId: "idem-abort-all-1", + stream: "lifecycle", + data: { phase: "end" }, + }); + await expect(noDeltaP).rejects.toThrow(/timeout/i); + await withSessionStore(tempDirs, {}, async () => { + const abortUnknown = await rpcReq<{ + ok?: boolean; + aborted?: boolean; + }>(ws, "chat.abort", { sessionKey: "main", runId: "missing-run" }); + expect(abortUnknown.ok).toBe(true); + expect(abortUnknown.payload?.aborted).toBe(false); + }); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + resetSpy(); + let agentStartedResolve: (() => void) | undefined; + const agentStartedP = new Promise((resolve) => { + agentStartedResolve = resolve; }); - }); - const sendResP = onceMessage( - ws, - (o) => o.type === "res" && o.id === "send-mismatch-1", - 10_000, - ); - sendReq(ws, "send-mismatch-1", "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-mismatch-1", - timeoutMs: 30_000, - }); - await agentStartedP; - const abortMismatch = await rpcReq(ws, "chat.abort", { - sessionKey: "other", - runId: "idem-mismatch-1", - }); - expect(abortMismatch.ok).toBe(false); - expect(abortMismatch.error?.code).toBe("INVALID_REQUEST"); - const abortMismatch2 = await rpcReq(ws, "chat.abort", { - sessionKey: "main", - runId: "idem-mismatch-1", - }); - expect(abortMismatch2.ok).toBe(true); - const sendRes = await sendResP; - expect(sendRes.ok).toBe(true); - }, - ); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - resetSpy(); - spy.mockResolvedValueOnce(undefined); - sendReq(ws, "send-complete-1", "chat.send", { - sessionKey: "main", - message: "hello", - idempotencyKey: "idem-complete-1", - timeoutMs: 30_000, - }); - const sendCompleteRes = await onceMessage( - ws, - (o) => o.type === "res" && o.id === "send-complete-1", - ); - expect(sendCompleteRes.ok).toBe(true); - let completedRun = false; - for (let i = 0; i < 50; i++) { - const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { + spy.mockImplementationOnce(async (opts) => { + agentStartedResolve?.(); + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + const sendResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-mismatch-1", + 10_000, + ); + sendReq(ws, "send-mismatch-1", "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-mismatch-1", + timeoutMs: 30_000, + }); + await agentStartedP; + const abortMismatch = await rpcReq(ws, "chat.abort", { + sessionKey: "other", + runId: "idem-mismatch-1", + }); + expect(abortMismatch.ok).toBe(false); + expect(abortMismatch.error?.code).toBe("INVALID_REQUEST"); + const abortMismatch2 = await rpcReq(ws, "chat.abort", { + sessionKey: "main", + runId: "idem-mismatch-1", + }); + expect(abortMismatch2.ok).toBe(true); + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + }, + ); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + resetSpy(); + spy.mockResolvedValueOnce(undefined); + sendReq(ws, "send-complete-1", "chat.send", { sessionKey: "main", message: "hello", idempotencyKey: "idem-complete-1", timeoutMs: 30_000, }); - if (again.ok && again.payload?.status === "ok") { - completedRun = true; - break; + const sendCompleteRes = await onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-complete-1", + ); + expect(sendCompleteRes.ok).toBe(true); + let completedRun = false; + for (let i = 0; i < 50; i++) { + const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-complete-1", + timeoutMs: 30_000, + }); + if (again.ok && again.payload?.status === "ok") { + completedRun = true; + break; + } + await new Promise((r) => setTimeout(r, 10)); } - await new Promise((r) => setTimeout(r, 10)); - } - expect(completedRun).toBe(true); - const abortCompleteRes = await rpcReq(ws, "chat.abort", { - sessionKey: "main", - runId: "idem-complete-1", - }); - expect(abortCompleteRes.ok).toBe(true); - expect(abortCompleteRes.payload?.aborted).toBe(false); - }, - ); - await withSessionStore( - tempDirs, - { main: { sessionId: "sess-main", updatedAt: Date.now() } }, - async () => { - const res1 = await rpcReq(ws, "chat.send", { - sessionKey: "main", - message: "first", - idempotencyKey: "idem-1", - }); - expect(res1.ok).toBe(true); - const res2 = await rpcReq(ws, "chat.send", { - sessionKey: "main", - message: "second", - idempotencyKey: "idem-2", - }); - expect(res2.ok).toBe(true); - const final1P = onceMessage( - ws, - (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final", - 8000, - ); - emitAgentEvent({ - runId: "idem-1", - stream: "lifecycle", - data: { phase: "end" }, - }); - const final1 = await final1P; - const run1 = - final1.payload && typeof final1.payload === "object" - ? (final1.payload as { runId?: string }).runId - : undefined; - expect(run1).toBe("idem-1"); - const final2P = onceMessage( - ws, - (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final", - 8000, - ); - emitAgentEvent({ - runId: "idem-2", - stream: "lifecycle", - data: { phase: "end" }, - }); - const final2 = await final2P; - const run2 = - final2.payload && typeof final2.payload === "object" - ? (final2.payload as { runId?: string }).runId - : undefined; - expect(run2).toBe("idem-2"); - }, - ); - } finally { - testState.sessionStorePath = undefined; - sessionStoreSaveDelayMs.value = 0; - ws.close(); - await server.close(); - await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true }))); - } - }); + expect(completedRun).toBe(true); + const abortCompleteRes = await rpcReq(ws, "chat.abort", { + sessionKey: "main", + runId: "idem-complete-1", + }); + expect(abortCompleteRes.ok).toBe(true); + expect(abortCompleteRes.payload?.aborted).toBe(false); + }, + ); + await withSessionStore( + tempDirs, + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + async () => { + const res1 = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "first", + idempotencyKey: "idem-1", + }); + expect(res1.ok).toBe(true); + const res2 = await rpcReq(ws, "chat.send", { + sessionKey: "main", + message: "second", + idempotencyKey: "idem-2", + }); + expect(res2.ok).toBe(true); + const final1P = onceMessage( + ws, + (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final", + 8000, + ); + emitAgentEvent({ + runId: "idem-1", + stream: "lifecycle", + data: { phase: "end" }, + }); + const final1 = await final1P; + const run1 = + final1.payload && typeof final1.payload === "object" + ? (final1.payload as { runId?: string }).runId + : undefined; + expect(run1).toBe("idem-1"); + const final2P = onceMessage( + ws, + (o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final", + 8000, + ); + emitAgentEvent({ + runId: "idem-2", + stream: "lifecycle", + data: { phase: "end" }, + }); + const final2 = await final2P; + const run2 = + final2.payload && typeof final2.payload === "object" + ? (final2.payload as { runId?: string }).runId + : undefined; + expect(run2).toBe("idem-2"); + }, + ); + } finally { + testState.sessionStorePath = undefined; + sessionStoreSaveDelayMs.value = 0; + ws.close(); + await server.close(); + await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true }))); + } + }, + ); });