test(gateway): consolidate server suites for speed

This commit is contained in:
Peter Steinberger 2026-01-23 06:21:35 +00:00
parent 1e6e58b23b
commit c7ca312f97
21 changed files with 1280 additions and 1598 deletions

View File

@ -1,11 +1,23 @@
import { describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { agentCommand, getFreePort, installGatewayTestHooks } from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let enabledServer: Awaited<ReturnType<typeof startServer>>;
let enabledPort: number;
beforeAll(async () => {
enabledPort = await getFreePort();
enabledServer = await startServer(enabledPort);
});
afterAll(async () => {
await enabledServer.close({ reason: "openai http enabled suite done" });
});
async function startServerWithDefaultConfig(port: number) {
const { startGatewayServer } = await import("./server.js");
@ -82,8 +94,7 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
});
it("handles request validation and routing", async () => {
const port = await getFreePort();
const server = await startServer(port);
const port = enabledPort;
const mockAgentOnce = (payloads: Array<{ text: string }>) => {
agentCommand.mockReset();
agentCommand.mockResolvedValueOnce({ payloads } as never);
@ -330,13 +341,12 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
);
}
} finally {
await server.close({ reason: "test done" });
// shared server
}
});
it("streams SSE chunks when stream=true", async () => {
const port = await getFreePort();
const server = await startServer(port);
const port = enabledPort;
try {
{
agentCommand.mockReset();
@ -416,7 +426,7 @@ describe("OpenAI-compatible HTTP API (e2e)", () => {
expect(fallbackText).toContain("hello");
}
} finally {
await server.close({ reason: "test done" });
// shared server
}
});
});

View File

@ -1,11 +1,23 @@
import { describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { HISTORY_CONTEXT_MARKER } from "../auto-reply/reply/history.js";
import { CURRENT_MESSAGE_MARKER } from "../auto-reply/reply/mentions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
import { agentCommand, getFreePort, installGatewayTestHooks } from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let enabledServer: Awaited<ReturnType<typeof startServer>>;
let enabledPort: number;
beforeAll(async () => {
enabledPort = await getFreePort();
enabledServer = await startServer(enabledPort);
});
afterAll(async () => {
await enabledServer.close({ reason: "openresponses enabled suite done" });
});
async function startServerWithDefaultConfig(port: number) {
const { startGatewayServer } = await import("./server.js");
@ -72,7 +84,7 @@ async function ensureResponseConsumed(res: Response) {
describe("OpenResponses HTTP API (e2e)", () => {
it("rejects when disabled (default + config)", { timeout: 120_000 }, async () => {
const port = await getFreePort();
const server = await startServerWithDefaultConfig(port);
const _server = await startServerWithDefaultConfig(port);
try {
const res = await postResponses(port, {
model: "clawdbot",
@ -81,7 +93,7 @@ describe("OpenResponses HTTP API (e2e)", () => {
expect(res.status).toBe(404);
await ensureResponseConsumed(res);
} finally {
await server.close({ reason: "test done" });
// shared server
}
const disabledPort = await getFreePort();
@ -101,8 +113,7 @@ describe("OpenResponses HTTP API (e2e)", () => {
});
it("handles OpenResponses request parsing and validation", async () => {
const port = await getFreePort();
const server = await startServer(port);
const port = enabledPort;
const mockAgentOnce = (payloads: Array<{ text: string }>, meta?: unknown) => {
agentCommand.mockReset();
agentCommand.mockResolvedValueOnce({ payloads, meta } as never);
@ -406,14 +417,12 @@ describe("OpenResponses HTTP API (e2e)", () => {
);
await ensureResponseConsumed(resNoUser);
} finally {
await server.close({ reason: "test done" });
// shared server
}
});
it("streams OpenResponses SSE events", async () => {
const port = await getFreePort();
const server = await startServer(port);
const port = enabledPort;
try {
agentCommand.mockReset();
agentCommand.mockImplementationOnce(async (opts: unknown) => {
@ -489,7 +498,7 @@ describe("OpenResponses HTTP API (e2e)", () => {
expect(event.event).toBe(parsed.type);
}
} finally {
await server.close({ reason: "test done" });
// shared server
}
});
});

View File

@ -1,9 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test, vi } from "vitest";
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
import type { ChannelPlugin } from "../channels/plugins/types.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import {
agentCommand,
connectOk,
@ -14,7 +15,22 @@ import {
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: Awaited<ReturnType<typeof startServerWithClient>>["ws"];
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
const registryState = vi.hoisted(() => ({
registry: {
@ -43,6 +59,11 @@ vi.mock("./server-plugins.js", async () => {
};
});
const setRegistry = (registry: PluginRegistry) => {
registryState.registry = registry;
setActivePluginRegistry(registry);
};
const BASE_IMAGE_PNG =
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+X3mIAAAAASUVORK5CYII=";
@ -142,7 +163,7 @@ const defaultRegistry = createRegistry([
describe("gateway server agent", () => {
test("agent marks implicit delivery when lastTo is stale", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+436769770569"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -156,10 +177,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -175,14 +192,11 @@ describe("gateway server agent", () => {
expect(call.to).toBe("+1555");
expect(call.deliveryTargetMode).toBe("implicit");
expect(call.sessionId).toBe("sess-main-stale");
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent forwards sessionKey to agentCommand", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -193,10 +207,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "agent:main:subagent:abc",
@ -211,13 +221,10 @@ describe("gateway server agent", () => {
expectChannels(call, "webchat");
expect(call.deliver).toBe(false);
expect(call.to).toBeUndefined();
ws.close();
await server.close();
});
test("agent derives sessionKey from agentId", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
testState.agentsConfig = { list: [{ id: "ops" }] };
@ -230,10 +237,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
agentId: "ops",
@ -245,16 +248,10 @@ describe("gateway server agent", () => {
const call = spy.mock.calls.at(-1)?.[0] as Record<string, unknown>;
expect(call.sessionKey).toBe("agent:ops:main");
expect(call.sessionId).toBe("sess-ops");
ws.close();
await server.close();
});
test("agent rejects unknown reply channel", async () => {
registryState.registry = defaultRegistry;
const { server, ws } = await startServerWithClient();
await connectOk(ws);
setRegistry(defaultRegistry);
const res = await rpcReq(ws, "agent", {
message: "hi",
replyChannel: "unknown-channel",
@ -265,18 +262,11 @@ describe("gateway server agent", () => {
const spy = vi.mocked(agentCommand);
expect(spy).not.toHaveBeenCalled();
ws.close();
await server.close();
});
test("agent rejects mismatched agentId and sessionKey", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.agentsConfig = { list: [{ id: "ops" }] };
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
agentId: "ops",
@ -288,13 +278,10 @@ describe("gateway server agent", () => {
const spy = vi.mocked(agentCommand);
expect(spy).not.toHaveBeenCalled();
ws.close();
await server.close();
});
test("agent forwards accountId to agentCommand", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -309,10 +296,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -329,14 +312,11 @@ describe("gateway server agent", () => {
expect(call.accountId).toBe("kev");
const runContext = call.runContext as { accountId?: string } | undefined;
expect(runContext?.accountId).toBe("kev");
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent avoids lastAccountId when explicit to is provided", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -351,10 +331,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -369,14 +345,11 @@ describe("gateway server agent", () => {
expectChannels(call, "whatsapp");
expect(call.to).toBe("+1666");
expect(call.accountId).toBeUndefined();
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent keeps explicit accountId when explicit to is provided", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -391,10 +364,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -410,14 +379,11 @@ describe("gateway server agent", () => {
expectChannels(call, "whatsapp");
expect(call.to).toBe("+1666");
expect(call.accountId).toBe("primary");
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent falls back to lastAccountId for implicit delivery", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -432,10 +398,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -449,14 +411,11 @@ describe("gateway server agent", () => {
expectChannels(call, "whatsapp");
expect(call.to).toBe("+1555");
expect(call.accountId).toBe("kev");
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent forwards image attachments as images[]", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -467,10 +426,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "what is in the image?",
sessionKey: "main",
@ -497,13 +452,10 @@ describe("gateway server agent", () => {
expect(images[0]?.type).toBe("image");
expect(images[0]?.mimeType).toBe("image/png");
expect(images[0]?.data).toBe(BASE_IMAGE_PNG);
ws.close();
await server.close();
});
test("agent falls back to whatsapp when delivery requested and no last channel exists", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
testState.allowFrom = ["+1555"];
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
@ -515,10 +467,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -533,14 +481,11 @@ describe("gateway server agent", () => {
expect(call.to).toBe("+1555");
expect(call.deliver).toBe(true);
expect(call.sessionId).toBe("sess-main-missing-provider");
ws.close();
await server.close();
testState.allowFrom = undefined;
});
test("agent routes main last-channel whatsapp", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -553,10 +498,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -574,13 +515,10 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-main-whatsapp");
ws.close();
await server.close();
});
test("agent routes main last-channel telegram", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -593,10 +531,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -613,13 +547,10 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-main");
ws.close();
await server.close();
});
test("agent routes main last-channel discord", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -632,10 +563,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -652,13 +579,10 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-discord");
ws.close();
await server.close();
});
test("agent routes main last-channel slack", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -671,10 +595,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -691,13 +611,10 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-slack");
ws.close();
await server.close();
});
test("agent routes main last-channel signal", async () => {
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
@ -710,10 +627,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -730,8 +643,5 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-signal");
ws.close();
await server.close();
});
});

View File

@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import type { ChannelPlugin } from "../channels/plugins/types.js";
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
@ -22,7 +22,24 @@ import {
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: Awaited<ReturnType<typeof startServerWithClient>>["ws"];
let port: number;
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
port = started.port;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
const registryState = vi.hoisted(() => ({
registry: {
@ -130,10 +147,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -150,9 +163,6 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-teams");
ws.close();
await server.close();
});
test("agent accepts channel aliases (imsg/teams)", async () => {
@ -177,10 +187,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const resIMessage = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -208,15 +214,9 @@ describe("gateway server agent", () => {
const lastTeamsCall = spy.mock.calls.at(-1)?.[0] as Record<string, unknown>;
expectChannels(lastTeamsCall, "msteams");
expect(lastTeamsCall.to).toBe("conversation:teams-abc");
ws.close();
await server.close();
});
test("agent rejects unknown channel", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -225,9 +225,6 @@ describe("gateway server agent", () => {
});
expect(res.ok).toBe(false);
expect(res.error?.code).toBe("INVALID_REQUEST");
ws.close();
await server.close();
});
test("agent ignores webchat last-channel for routing", async () => {
@ -244,10 +241,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -264,9 +257,6 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(true);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-main-webchat");
ws.close();
await server.close();
});
test("agent uses webchat for internal runs when last provider is webchat", async () => {
@ -282,10 +272,6 @@ describe("gateway server agent", () => {
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
@ -302,15 +288,9 @@ describe("gateway server agent", () => {
expect(call.deliver).toBe(false);
expect(call.bestEffortDeliver).toBe(true);
expect(call.sessionId).toBe("sess-main-webchat-internal");
ws.close();
await server.close();
});
test("agent ack response then final response", { timeout: 8000 }, async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const ackP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "ag1" && o.payload?.status === "accepted",
@ -333,15 +313,9 @@ describe("gateway server agent", () => {
expect(ack.payload.runId).toBeDefined();
expect(final.payload.runId).toBe(ack.payload.runId);
expect(final.payload.status).toBe("ok");
ws.close();
await server.close();
});
test("agent dedupes by idempotencyKey after completion", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const firstFinalP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "ag1" && o.payload?.status !== "accepted",
@ -367,9 +341,6 @@ describe("gateway server agent", () => {
);
const second = await secondP;
expect(second.payload).toEqual(firstFinal.payload);
ws.close();
await server.close();
});
test("agent dedupe survives reconnect", { timeout: 60_000 }, async () => {
@ -433,8 +404,9 @@ describe("gateway server agent", () => {
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws, {
const webchatWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => webchatWs.once("open", resolve));
await connectOk(webchatWs, {
client: {
id: GATEWAY_CLIENT_NAMES.WEBCHAT,
version: "1.0.0",
@ -446,7 +418,7 @@ describe("gateway server agent", () => {
registerAgentRunContext("run-auto-1", { sessionKey: "main" });
const finalChatP = onceMessage(
ws,
webchatWs,
(o) => {
if (o.type !== "event" || o.event !== "chat") return false;
const payload = o.payload as { state?: unknown; runId?: unknown } | undefined;
@ -474,7 +446,6 @@ describe("gateway server agent", () => {
expect(payload.sessionKey).toBe("main");
expect(payload.runId).toBe("run-auto-1");
ws.close();
await server.close();
webchatWs.close();
});
});

View File

@ -1,200 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import {
connectOk,
installGatewayTestHooks,
onceMessage,
rpcReq,
startServerWithClient,
testState,
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
const _BASE_IMAGE_PNG =
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+X3mIAAAAASUVORK5CYII=";
function _expectChannels(call: Record<string, unknown>, channel: string) {
expect(call.channel).toBe(channel);
expect(call.messageChannel).toBe(channel);
}
describe("gateway server agent", () => {
test("agent events include sessionKey and agent.wait covers lifecycle flows", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
verboseLevel: "off",
},
},
});
const { server, ws } = await startServerWithClient();
await connectOk(ws, {
client: {
id: GATEWAY_CLIENT_NAMES.WEBCHAT,
version: "1.0.0",
platform: "test",
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
},
});
registerAgentRunContext("run-tool-1", {
sessionKey: "main",
verboseLevel: "on",
});
{
const agentEvtP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "agent" && o.payload?.runId === "run-tool-1",
8000,
);
emitAgentEvent({
runId: "run-tool-1",
stream: "tool",
data: { phase: "start", name: "read", toolCallId: "tool-1" },
});
const evt = await agentEvtP;
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.sessionKey).toBe("main");
}
{
registerAgentRunContext("run-tool-off", { sessionKey: "agent:main:main" });
emitAgentEvent({
runId: "run-tool-off",
stream: "tool",
data: { phase: "start", name: "read", toolCallId: "tool-1" },
});
emitAgentEvent({
runId: "run-tool-off",
stream: "assistant",
data: { text: "hello" },
});
const evt = await onceMessage(
ws,
(o) => o.type === "event" && o.event === "agent" && o.payload?.runId === "run-tool-off",
8000,
);
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.stream).toBe("assistant");
}
{
const waitP = rpcReq(ws, "agent.wait", {
runId: "run-wait-1",
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-1",
stream: "lifecycle",
data: { phase: "end", startedAt: 200, endedAt: 210 },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(200);
}
{
emitAgentEvent({
runId: "run-wait-early",
stream: "lifecycle",
data: { phase: "end", startedAt: 50, endedAt: 55 },
});
const res = await rpcReq(ws, "agent.wait", {
runId: "run-wait-early",
timeoutMs: 1000,
});
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(50);
}
{
const res = await rpcReq(ws, "agent.wait", {
runId: "run-wait-3",
timeoutMs: 30,
});
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("timeout");
}
{
const waitP = rpcReq(ws, "agent.wait", {
runId: "run-wait-err",
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-err",
stream: "lifecycle",
data: { phase: "error", error: "boom" },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("error");
expect(res.payload.error).toBe("boom");
}
{
const waitP = rpcReq(ws, "agent.wait", {
runId: "run-wait-start",
timeoutMs: 1000,
});
emitAgentEvent({
runId: "run-wait-start",
stream: "lifecycle",
data: { phase: "start", startedAt: 123 },
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-start",
stream: "lifecycle",
data: { phase: "end", endedAt: 456 },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(123);
expect(res.payload.endedAt).toBe(456);
}
ws.close();
await server.close();
await fs.rm(dir, { recursive: true, force: true });
testState.sessionStorePath = undefined;
});
});

View File

@ -1,44 +0,0 @@
import { describe, expect, test } from "vitest";
import {
connectOk,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
testState,
} from "./test-helpers.js";
installGatewayTestHooks();
describe("gateway server agents", () => {
test("lists configured agents via agents.list RPC", async () => {
testState.agentsConfig = {
list: [
{ id: "work", name: "Work", default: true },
{ id: "home", name: "Home" },
],
};
const { ws } = await startServerWithClient();
const hello = await connectOk(ws);
expect((hello as unknown as { features?: { methods?: string[] } }).features?.methods).toEqual(
expect.arrayContaining(["agents.list"]),
);
const res = await rpcReq<{
defaultId: string;
mainKey: string;
scope: string;
agents: Array<{ id: string; name?: string }>;
}>(ws, "agents.list", {});
expect(res.ok).toBe(true);
expect(res.payload?.defaultId).toBe("work");
expect(res.payload?.mainKey).toBe("main");
expect(res.payload?.scope).toBe("per-sender");
expect(res.payload?.agents.map((agent) => agent.id)).toEqual(["work", "home", "main"]);
const work = res.payload?.agents.find((agent) => agent.id === "work");
const home = res.payload?.agents.find((agent) => agent.id === "home");
expect(work?.name).toBe("Work");
expect(home?.name).toBe("Home");
});
});

View File

@ -13,7 +13,7 @@ import {
} from "./test-helpers.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
async function waitForWsClose(ws: WebSocket, timeoutMs: number): Promise<boolean> {
if (ws.readyState === WebSocket.CLOSED) return true;

View File

@ -1,6 +1,7 @@
import { afterEach, describe, expect, test, vi } from "vitest";
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
import type { ChannelPlugin } from "../channels/plugins/types.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import {
connectOk,
installGatewayTestHooks,
@ -10,7 +11,7 @@ import {
const loadConfigHelpers = async () => await import("../config/config.js");
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
const registryState = vi.hoisted(() => ({
registry: {
@ -131,30 +132,31 @@ const defaultRegistry = createRegistry([
},
]);
const servers: Array<Awaited<ReturnType<typeof startServerWithClient>>> = [];
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: Awaited<ReturnType<typeof startServerWithClient>>["ws"];
afterEach(async () => {
for (const { server, ws } of servers) {
try {
ws.close();
await server.close();
} catch {
/* ignore */
}
}
servers.length = 0;
await new Promise((resolve) => setTimeout(resolve, 50));
beforeAll(async () => {
setRegistry(defaultRegistry);
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
function setRegistry(registry: PluginRegistry) {
registryState.registry = registry;
setActivePluginRegistry(registry);
}
describe("gateway server channels", () => {
test("channels.status returns snapshot without probe", async () => {
vi.stubEnv("TELEGRAM_BOT_TOKEN", undefined);
registryState.registry = defaultRegistry;
const result = await startServerWithClient();
servers.push(result);
const { ws } = result;
await connectOk(ws);
setRegistry(defaultRegistry);
const res = await rpcReq<{
channels?: Record<
string,
@ -181,12 +183,7 @@ describe("gateway server channels", () => {
});
test("channels.logout reports no session when missing", async () => {
registryState.registry = defaultRegistry;
const result = await startServerWithClient();
servers.push(result);
const { ws } = result;
await connectOk(ws);
setRegistry(defaultRegistry);
const res = await rpcReq<{ cleared?: boolean; channel?: string }>(ws, "channels.logout", {
channel: "whatsapp",
});
@ -197,7 +194,7 @@ describe("gateway server channels", () => {
test("channels.logout clears telegram bot token from config", async () => {
vi.stubEnv("TELEGRAM_BOT_TOKEN", undefined);
registryState.registry = defaultRegistry;
setRegistry(defaultRegistry);
const { readConfigFileSnapshot, writeConfigFile } = await loadConfigHelpers();
await writeConfigFile({
channels: {
@ -207,12 +204,6 @@ describe("gateway server channels", () => {
},
},
});
const result = await startServerWithClient();
servers.push(result);
const { ws } = result;
await connectOk(ws);
const res = await rpcReq<{
cleared?: boolean;
envToken?: boolean;

View File

@ -14,7 +14,7 @@ import {
testState,
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
async function waitFor(condition: () => boolean, timeoutMs = 1500) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
@ -38,24 +38,6 @@ const sendReq = (
}),
);
};
const withSessionStore = async <T>(
tempDirs: string[],
entries: Record<
string,
{ sessionId: string; updatedAt: number; lastChannel?: string; lastTo?: string }
>,
fn: (dir: string) => Promise<T>,
): Promise<T> => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
tempDirs.push(dir);
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({ entries });
try {
return await fn(dir);
} finally {
testState.sessionStorePath = undefined;
}
};
describe("gateway server chat", () => {
const timeoutMs = process.platform === "win32" ? 120_000 : 60_000;
test(
@ -71,226 +53,206 @@ describe("gateway server chat", () => {
};
try {
await connectOk(ws);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async (historyDir) => {
const bigText = "x".repeat(200_000);
const largeLines: string[] = [];
for (let i = 0; i < 40; i += 1) {
largeLines.push(
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: `${i}:${bigText}` }],
timestamp: Date.now() + i,
},
}),
);
}
await fs.writeFile(
path.join(historyDir, "sess-main.jsonl"),
largeLines.join("\n"),
"utf-8",
);
const cappedRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
sessionKey: "main",
limit: 1000,
});
expect(cappedRes.ok).toBe(true);
const cappedMsgs = cappedRes.payload?.messages ?? [];
const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8");
expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024);
expect(cappedMsgs.length).toBeLessThan(60);
},
const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
tempDirs.push(sessionDir);
testState.sessionStorePath = path.join(sessionDir, "sessions.json");
const writeStore = async (
entries: Record<
string,
{ sessionId: string; updatedAt: number; lastChannel?: string; lastTo?: string }
>,
) => {
await writeSessionStore({ entries });
};
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
const bigText = "x".repeat(155_000);
const largeLines: string[] = [];
for (let i = 0; i < 40; i += 1) {
largeLines.push(
JSON.stringify({
message: {
role: "user",
content: [{ type: "text", text: `${i}:${bigText}` }],
timestamp: Date.now() + i,
},
}),
);
}
await fs.writeFile(
path.join(sessionDir, "sess-main.jsonl"),
largeLines.join("\n"),
"utf-8",
);
await withSessionStore(
tempDirs,
{
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastTo: "+1555",
},
const cappedRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", {
sessionKey: "main",
limit: 1000,
});
expect(cappedRes.ok).toBe(true);
const cappedMsgs = cappedRes.payload?.messages ?? [];
const bytes = Buffer.byteLength(JSON.stringify(cappedMsgs), "utf8");
expect(bytes).toBeLessThanOrEqual(6 * 1024 * 1024);
expect(cappedMsgs.length).toBeLessThan(60);
await writeStore({
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
lastChannel: "whatsapp",
lastTo: "+1555",
},
async () => {
const routeRes = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-route",
});
const routeRes = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-route",
});
expect(routeRes.ok).toBe(true);
const stored = JSON.parse(
await fs.readFile(testState.sessionStorePath as string, "utf-8"),
) as Record<string, { lastChannel?: string; lastTo?: string } | undefined>;
expect(stored["agent:main:main"]?.lastChannel).toBe("whatsapp");
expect(stored["agent:main:main"]?.lastTo).toBe("+1555");
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
resetSpy();
let abortInFlight: Promise<unknown> | undefined;
try {
const callsBefore = spy.mock.calls.length;
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
expect(routeRes.ok).toBe(true);
const stored = JSON.parse(
await fs.readFile(testState.sessionStorePath as string, "utf-8"),
) as Record<string, { lastChannel?: string; lastTo?: string } | undefined>;
expect(stored["agent:main:main"]?.lastChannel).toBe("whatsapp");
expect(stored["agent:main:main"]?.lastTo).toBe("+1555");
},
);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
resetSpy();
let abortInFlight: Promise<unknown> | undefined;
try {
const callsBefore = spy.mock.calls.length;
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const sendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-abort-1",
8000,
);
const abortResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "abort-1",
8000,
);
const abortedEventP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted",
8000,
);
abortInFlight = Promise.allSettled([sendResP, abortResP, abortedEventP]);
sendReq(ws, "send-abort-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-abort-1",
timeoutMs: 30_000,
});
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
await new Promise<void>((resolve, reject) => {
const deadline = Date.now() + 1000;
const tick = () => {
if (spy.mock.calls.length > callsBefore) return resolve();
if (Date.now() > deadline)
return reject(new Error("timeout waiting for agentCommand"));
setTimeout(tick, 5);
};
tick();
});
sendReq(ws, "abort-1", "chat.abort", {
sessionKey: "main",
runId: "idem-abort-1",
});
const abortRes = await abortResP;
expect(abortRes.ok).toBe(true);
const evt = await abortedEventP;
expect(evt.payload?.runId).toBe("idem-abort-1");
expect(evt.payload?.sessionKey).toBe("main");
} finally {
await abortInFlight;
}
},
);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
sessionStoreSaveDelayMs.value = 120;
resetSpy();
try {
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const abortedEventP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted",
);
const sendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-abort-save-1",
);
sendReq(ws, "send-abort-save-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-abort-save-1",
timeoutMs: 30_000,
});
const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-save-1");
sendReq(ws, "abort-save-1", "chat.abort", {
sessionKey: "main",
runId: "idem-abort-save-1",
});
const abortRes = await abortResP;
expect(abortRes.ok).toBe(true);
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
const evt = await abortedEventP;
expect(evt.payload?.runId).toBe("idem-abort-save-1");
expect(evt.payload?.sessionKey).toBe("main");
} finally {
sessionStoreSaveDelayMs.value = 0;
}
},
);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
resetSpy();
const callsBeforeStop = spy.mock.calls.length;
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const sendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-abort-1",
8000,
);
const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-1", 8000);
const abortedEventP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted",
8000,
);
abortInFlight = Promise.allSettled([sendResP, abortResP, abortedEventP]);
sendReq(ws, "send-abort-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-abort-1",
timeoutMs: 30_000,
});
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
await new Promise<void>((resolve, reject) => {
const deadline = Date.now() + 1000;
const tick = () => {
if (spy.mock.calls.length > callsBefore) return resolve();
if (Date.now() > deadline)
return reject(new Error("timeout waiting for agentCommand"));
setTimeout(tick, 5);
};
tick();
});
sendReq(ws, "abort-1", "chat.abort", {
sessionKey: "main",
runId: "idem-abort-1",
});
const abortRes = await abortResP;
expect(abortRes.ok).toBe(true);
const evt = await abortedEventP;
expect(evt.payload?.runId).toBe("idem-abort-1");
expect(evt.payload?.sessionKey).toBe("main");
} finally {
await abortInFlight;
}
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
sessionStoreSaveDelayMs.value = 120;
resetSpy();
try {
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
const stopSendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-stop-1",
8000,
);
sendReq(ws, "send-stop-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-stop-run",
});
const stopSendRes = await stopSendResP;
expect(stopSendRes.ok).toBe(true);
await waitFor(() => spy.mock.calls.length > callsBeforeStop);
const abortedStopEventP = onceMessage(
ws,
(o) =>
o.type === "event" &&
o.event === "chat" &&
o.payload?.state === "aborted" &&
o.payload?.runId === "idem-stop-run",
8000,
);
const stopResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-stop-2",
8000,
);
sendReq(ws, "send-stop-2", "chat.send", {
sessionKey: "main",
message: "/stop",
idempotencyKey: "idem-stop-req",
});
const stopRes = await stopResP;
expect(stopRes.ok).toBe(true);
const stopEvt = await abortedStopEventP;
expect(stopEvt.payload?.sessionKey).toBe("main");
expect(spy.mock.calls.length).toBe(callsBeforeStop + 1);
},
});
const abortedEventP = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "aborted",
);
const sendResP = onceMessage(ws, (o) => o.type === "res" && o.id === "send-abort-save-1");
sendReq(ws, "send-abort-save-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-abort-save-1",
timeoutMs: 30_000,
});
const abortResP = onceMessage(ws, (o) => o.type === "res" && o.id === "abort-save-1");
sendReq(ws, "abort-save-1", "chat.abort", {
sessionKey: "main",
runId: "idem-abort-save-1",
});
const abortRes = await abortResP;
expect(abortRes.ok).toBe(true);
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
const evt = await abortedEventP;
expect(evt.payload?.runId).toBe("idem-abort-save-1");
expect(evt.payload?.sessionKey).toBe("main");
} finally {
sessionStoreSaveDelayMs.value = 0;
}
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
resetSpy();
const callsBeforeStop = spy.mock.calls.length;
spy.mockImplementationOnce(async (opts) => {
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const stopSendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-stop-1",
8000,
);
sendReq(ws, "send-stop-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-stop-run",
});
const stopSendRes = await stopSendResP;
expect(stopSendRes.ok).toBe(true);
await waitFor(() => spy.mock.calls.length > callsBeforeStop);
const abortedStopEventP = onceMessage(
ws,
(o) =>
o.type === "event" &&
o.event === "chat" &&
o.payload?.state === "aborted" &&
o.payload?.runId === "idem-stop-run",
8000,
);
const stopResP = onceMessage(ws, (o) => o.type === "res" && o.id === "send-stop-2", 8000);
sendReq(ws, "send-stop-2", "chat.send", {
sessionKey: "main",
message: "/stop",
idempotencyKey: "idem-stop-req",
});
const stopRes = await stopResP;
expect(stopRes.ok).toBe(true);
const stopEvt = await abortedStopEventP;
expect(stopEvt.payload?.sessionKey).toBe("main");
expect(spy.mock.calls.length).toBe(callsBeforeStop + 1);
resetSpy();
let resolveRun: (() => void) | undefined;
const runDone = new Promise<void>((resolve) => {
@ -315,7 +277,7 @@ describe("gateway server chat", () => {
expect(inFlightRes.payload?.status).toBe("in_flight");
resolveRun?.();
let completed = false;
for (let i = 0; i < 50; i++) {
for (let i = 0; i < 20; i++) {
const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", {
sessionKey: "main",
message: "hello",
@ -380,149 +342,136 @@ describe("gateway server chat", () => {
data: { phase: "end" },
});
await expect(noDeltaP).rejects.toThrow(/timeout/i);
await withSessionStore(tempDirs, {}, async () => {
const abortUnknown = await rpcReq<{
ok?: boolean;
aborted?: boolean;
}>(ws, "chat.abort", { sessionKey: "main", runId: "missing-run" });
expect(abortUnknown.ok).toBe(true);
expect(abortUnknown.payload?.aborted).toBe(false);
await writeStore({});
const abortUnknown = await rpcReq<{
ok?: boolean;
aborted?: boolean;
}>(ws, "chat.abort", { sessionKey: "main", runId: "missing-run" });
expect(abortUnknown.ok).toBe(true);
expect(abortUnknown.payload?.aborted).toBe(false);
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
resetSpy();
let agentStartedResolve: (() => void) | undefined;
const agentStartedP = new Promise<void>((resolve) => {
agentStartedResolve = resolve;
});
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
resetSpy();
let agentStartedResolve: (() => void) | undefined;
const agentStartedP = new Promise<void>((resolve) => {
agentStartedResolve = resolve;
});
spy.mockImplementationOnce(async (opts) => {
agentStartedResolve?.();
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const sendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-mismatch-1",
10_000,
);
sendReq(ws, "send-mismatch-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-mismatch-1",
timeoutMs: 30_000,
});
await agentStartedP;
const abortMismatch = await rpcReq(ws, "chat.abort", {
sessionKey: "other",
runId: "idem-mismatch-1",
});
expect(abortMismatch.ok).toBe(false);
expect(abortMismatch.error?.code).toBe("INVALID_REQUEST");
const abortMismatch2 = await rpcReq(ws, "chat.abort", {
sessionKey: "main",
runId: "idem-mismatch-1",
});
expect(abortMismatch2.ok).toBe(true);
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
},
spy.mockImplementationOnce(async (opts) => {
agentStartedResolve?.();
const signal = (opts as { abortSignal?: AbortSignal }).abortSignal;
await new Promise<void>((resolve) => {
if (!signal) return resolve();
if (signal.aborted) return resolve();
signal.addEventListener("abort", () => resolve(), { once: true });
});
});
const sendResP = onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-mismatch-1",
10_000,
);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
resetSpy();
spy.mockResolvedValueOnce(undefined);
sendReq(ws, "send-complete-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-complete-1",
timeoutMs: 30_000,
});
const sendCompleteRes = await onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-complete-1",
);
expect(sendCompleteRes.ok).toBe(true);
let completedRun = false;
for (let i = 0; i < 50; i++) {
const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-complete-1",
timeoutMs: 30_000,
});
if (again.ok && again.payload?.status === "ok") {
completedRun = true;
break;
}
await new Promise((r) => setTimeout(r, 10));
}
expect(completedRun).toBe(true);
const abortCompleteRes = await rpcReq(ws, "chat.abort", {
sessionKey: "main",
runId: "idem-complete-1",
});
expect(abortCompleteRes.ok).toBe(true);
expect(abortCompleteRes.payload?.aborted).toBe(false);
},
sendReq(ws, "send-mismatch-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-mismatch-1",
timeoutMs: 30_000,
});
await agentStartedP;
const abortMismatch = await rpcReq(ws, "chat.abort", {
sessionKey: "other",
runId: "idem-mismatch-1",
});
expect(abortMismatch.ok).toBe(false);
expect(abortMismatch.error?.code).toBe("INVALID_REQUEST");
const abortMismatch2 = await rpcReq(ws, "chat.abort", {
sessionKey: "main",
runId: "idem-mismatch-1",
});
expect(abortMismatch2.ok).toBe(true);
const sendRes = await sendResP;
expect(sendRes.ok).toBe(true);
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
resetSpy();
spy.mockResolvedValueOnce(undefined);
sendReq(ws, "send-complete-1", "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-complete-1",
timeoutMs: 30_000,
});
const sendCompleteRes = await onceMessage(
ws,
(o) => o.type === "res" && o.id === "send-complete-1",
);
await withSessionStore(
tempDirs,
{ main: { sessionId: "sess-main", updatedAt: Date.now() } },
async () => {
const res1 = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "first",
idempotencyKey: "idem-1",
});
expect(res1.ok).toBe(true);
const res2 = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "second",
idempotencyKey: "idem-2",
});
expect(res2.ok).toBe(true);
const final1P = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final",
8000,
);
emitAgentEvent({
runId: "idem-1",
stream: "lifecycle",
data: { phase: "end" },
});
const final1 = await final1P;
const run1 =
final1.payload && typeof final1.payload === "object"
? (final1.payload as { runId?: string }).runId
: undefined;
expect(run1).toBe("idem-1");
const final2P = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final",
8000,
);
emitAgentEvent({
runId: "idem-2",
stream: "lifecycle",
data: { phase: "end" },
});
const final2 = await final2P;
const run2 =
final2.payload && typeof final2.payload === "object"
? (final2.payload as { runId?: string }).runId
: undefined;
expect(run2).toBe("idem-2");
},
expect(sendCompleteRes.ok).toBe(true);
let completedRun = false;
for (let i = 0; i < 20; i++) {
const again = await rpcReq<{ runId?: string; status?: string }>(ws, "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-complete-1",
timeoutMs: 30_000,
});
if (again.ok && again.payload?.status === "ok") {
completedRun = true;
break;
}
await new Promise((r) => setTimeout(r, 10));
}
expect(completedRun).toBe(true);
const abortCompleteRes = await rpcReq(ws, "chat.abort", {
sessionKey: "main",
runId: "idem-complete-1",
});
expect(abortCompleteRes.ok).toBe(true);
expect(abortCompleteRes.payload?.aborted).toBe(false);
await writeStore({ main: { sessionId: "sess-main", updatedAt: Date.now() } });
const res1 = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "first",
idempotencyKey: "idem-1",
});
expect(res1.ok).toBe(true);
const res2 = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "second",
idempotencyKey: "idem-2",
});
expect(res2.ok).toBe(true);
const final1P = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final",
8000,
);
emitAgentEvent({
runId: "idem-1",
stream: "lifecycle",
data: { phase: "end" },
});
const final1 = await final1P;
const run1 =
final1.payload && typeof final1.payload === "object"
? (final1.payload as { runId?: string }).runId
: undefined;
expect(run1).toBe("idem-1");
const final2P = onceMessage(
ws,
(o) => o.type === "event" && o.event === "chat" && o.payload?.state === "final",
8000,
);
emitAgentEvent({
runId: "idem-2",
stream: "lifecycle",
data: { phase: "end" },
});
const final2 = await final2P;
const run2 =
final2.payload && typeof final2.payload === "object"
? (final2.payload as { runId?: string }).runId
: undefined;
expect(run2).toBe("idem-2");
} finally {
testState.sessionStorePath = undefined;
sessionStoreSaveDelayMs.value = 0;

View File

@ -1,9 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test, vi } from "vitest";
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js";
import {
agentCommand,
connectOk,
@ -15,7 +16,24 @@ import {
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: WebSocket;
let port: number;
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
port = started.port;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
async function waitFor(condition: () => boolean, timeoutMs = 1500) {
const deadline = Date.now() + timeoutMs;
@ -29,12 +47,9 @@ async function waitFor(condition: () => boolean, timeoutMs = 1500) {
describe("gateway server chat", () => {
test("handles chat send and history flows", async () => {
const tempDirs: string[] = [];
const { server, ws, port } = await startServerWithClient();
let webchatWs: WebSocket | undefined;
try {
await connectOk(ws);
webchatWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => webchatWs?.once("open", resolve));
await connectOk(webchatWs, {
@ -240,9 +255,182 @@ describe("gateway server chat", () => {
testState.sessionStorePath = undefined;
testState.sessionConfig = undefined;
if (webchatWs) webchatWs.close();
ws.close();
await server.close();
await Promise.all(tempDirs.map((dir) => fs.rm(dir, { recursive: true, force: true })));
}
});
test("agent events include sessionKey and agent.wait covers lifecycle flows", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-"));
testState.sessionStorePath = path.join(dir, "sessions.json");
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
verboseLevel: "off",
},
},
});
const webchatWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => webchatWs.once("open", resolve));
await connectOk(webchatWs, {
client: {
id: GATEWAY_CLIENT_NAMES.WEBCHAT,
version: "1.0.0",
platform: "test",
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
},
});
try {
registerAgentRunContext("run-tool-1", {
sessionKey: "main",
verboseLevel: "on",
});
{
const agentEvtP = onceMessage(
webchatWs,
(o) => o.type === "event" && o.event === "agent" && o.payload?.runId === "run-tool-1",
8000,
);
emitAgentEvent({
runId: "run-tool-1",
stream: "tool",
data: { phase: "start", name: "read", toolCallId: "tool-1" },
});
const evt = await agentEvtP;
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.sessionKey).toBe("main");
}
{
registerAgentRunContext("run-tool-off", { sessionKey: "agent:main:main" });
emitAgentEvent({
runId: "run-tool-off",
stream: "tool",
data: { phase: "start", name: "read", toolCallId: "tool-1" },
});
emitAgentEvent({
runId: "run-tool-off",
stream: "assistant",
data: { text: "hello" },
});
const evt = await onceMessage(
webchatWs,
(o) => o.type === "event" && o.event === "agent" && o.payload?.runId === "run-tool-off",
8000,
);
const payload =
evt.payload && typeof evt.payload === "object"
? (evt.payload as Record<string, unknown>)
: {};
expect(payload.stream).toBe("assistant");
}
{
const waitP = rpcReq(webchatWs, "agent.wait", {
runId: "run-wait-1",
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-1",
stream: "lifecycle",
data: { phase: "end", startedAt: 200, endedAt: 210 },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(200);
}
{
emitAgentEvent({
runId: "run-wait-early",
stream: "lifecycle",
data: { phase: "end", startedAt: 50, endedAt: 55 },
});
const res = await rpcReq(webchatWs, "agent.wait", {
runId: "run-wait-early",
timeoutMs: 1000,
});
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(50);
}
{
const res = await rpcReq(webchatWs, "agent.wait", {
runId: "run-wait-3",
timeoutMs: 30,
});
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("timeout");
}
{
const waitP = rpcReq(webchatWs, "agent.wait", {
runId: "run-wait-err",
timeoutMs: 1000,
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-err",
stream: "lifecycle",
data: { phase: "error", error: "boom" },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("error");
expect(res.payload.error).toBe("boom");
}
{
const waitP = rpcReq(webchatWs, "agent.wait", {
runId: "run-wait-start",
timeoutMs: 1000,
});
emitAgentEvent({
runId: "run-wait-start",
stream: "lifecycle",
data: { phase: "start", startedAt: 123 },
});
setTimeout(() => {
emitAgentEvent({
runId: "run-wait-start",
stream: "lifecycle",
data: { phase: "end", endedAt: 456 },
});
}, 5);
const res = await waitP;
expect(res.ok).toBe(true);
expect(res.payload.status).toBe("ok");
expect(res.payload.startedAt).toBe(123);
expect(res.payload.endedAt).toBe(456);
}
} finally {
webchatWs.close();
await fs.rm(dir, { recursive: true, force: true });
testState.sessionStorePath = undefined;
}
});
});

View File

@ -1,4 +1,7 @@
import { describe, expect, it } from "vitest";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { resolveConfigSnapshotHash } from "../config/config.js";
@ -6,16 +9,31 @@ import {
connectOk,
installGatewayTestHooks,
onceMessage,
rpcReq,
startServerWithClient,
testState,
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: Awaited<ReturnType<typeof startServerWithClient>>["ws"];
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
describe("gateway config.patch", () => {
it("merges patches without clobbering unrelated config", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const setId = "req-set";
ws.send(
JSON.stringify({
@ -100,15 +118,9 @@ describe("gateway config.patch", () => {
expect(get2Res.ok).toBe(true);
expect(get2Res.payload?.config?.gateway?.mode).toBe("local");
expect(get2Res.payload?.config?.channels?.telegram?.botToken).toBe("token-1");
ws.close();
await server.close();
});
it("requires base hash when config exists", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const setId = "req-set-2";
ws.send(
JSON.stringify({
@ -145,15 +157,9 @@ describe("gateway config.patch", () => {
);
expect(patchRes.ok).toBe(false);
expect(patchRes.error?.message).toContain("base hash");
ws.close();
await server.close();
});
it("requires base hash for config.set when config exists", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const setId = "req-set-3";
ws.send(
JSON.stringify({
@ -192,8 +198,108 @@ describe("gateway config.patch", () => {
);
expect(set2Res.ok).toBe(false);
expect(set2Res.error?.message).toContain("base hash");
ws.close();
await server.close();
});
});
describe("gateway server sessions", () => {
it("filters sessions by agentId", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-agents-"));
testState.sessionConfig = {
store: path.join(dir, "{agentId}", "sessions.json"),
};
testState.agentsConfig = {
list: [{ id: "home", default: true }, { id: "work" }],
};
const homeDir = path.join(dir, "home");
const workDir = path.join(dir, "work");
await fs.mkdir(homeDir, { recursive: true });
await fs.mkdir(workDir, { recursive: true });
await writeSessionStore({
storePath: path.join(homeDir, "sessions.json"),
agentId: "home",
entries: {
main: {
sessionId: "sess-home-main",
updatedAt: Date.now(),
},
"discord:group:dev": {
sessionId: "sess-home-group",
updatedAt: Date.now() - 1000,
},
},
});
await writeSessionStore({
storePath: path.join(workDir, "sessions.json"),
agentId: "work",
entries: {
main: {
sessionId: "sess-work-main",
updatedAt: Date.now(),
},
},
});
const homeSessions = await rpcReq<{
sessions: Array<{ key: string }>;
}>(ws, "sessions.list", {
includeGlobal: false,
includeUnknown: false,
agentId: "home",
});
expect(homeSessions.ok).toBe(true);
expect(homeSessions.payload?.sessions.map((s) => s.key).sort()).toEqual([
"agent:home:discord:group:dev",
"agent:home:main",
]);
const workSessions = await rpcReq<{
sessions: Array<{ key: string }>;
}>(ws, "sessions.list", {
includeGlobal: false,
includeUnknown: false,
agentId: "work",
});
expect(workSessions.ok).toBe(true);
expect(workSessions.payload?.sessions.map((s) => s.key)).toEqual(["agent:work:main"]);
});
it("resolves and patches main alias to default agent main key", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
testState.agentsConfig = { list: [{ id: "ops", default: true }] };
testState.sessionConfig = { mainKey: "work" };
await writeSessionStore({
storePath,
agentId: "ops",
mainKey: "work",
entries: {
main: {
sessionId: "sess-ops-main",
updatedAt: Date.now(),
},
},
});
const resolved = await rpcReq<{ ok: true; key: string }>(ws, "sessions.resolve", {
key: "main",
});
expect(resolved.ok).toBe(true);
expect(resolved.payload?.key).toBe("agent:ops:work");
const patched = await rpcReq<{ ok: true; key: string }>(ws, "sessions.patch", {
key: "main",
thinkingLevel: "medium",
});
expect(patched.ok).toBe(true);
expect(patched.payload?.key).toBe("agent:ops:work");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ thinkingLevel?: string }
>;
expect(stored["agent:ops:work"]?.thinkingLevel).toBe("medium");
expect(stored.main).toBeUndefined();
});
});

View File

@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test, vi } from "vitest";
import { describe, expect, test } from "vitest";
import {
connectOk,
installGatewayTestHooks,
@ -44,7 +44,7 @@ async function waitForCronFinished(ws: { send: (data: string) => void }, jobId:
o.event === "cron" &&
o.payload?.action === "finished" &&
o.payload?.jobId === jobId,
10_000,
20_000,
);
}
@ -345,14 +345,7 @@ describe("gateway server cron", () => {
const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : "";
expect(autoJobId.length > 0).toBe(true);
vi.useFakeTimers();
try {
const autoFinishedP = waitForCronFinished(ws, autoJobId);
await vi.advanceTimersByTimeAsync(1000);
await autoFinishedP;
} finally {
vi.useRealTimers();
}
await waitForCronFinished(ws, autoJobId);
await waitForNonEmptyFile(path.join(dir, "cron", "runs", `${autoJobId}.jsonl`));
const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as

View File

@ -10,206 +10,150 @@ import {
waitForSystemEvent,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
const resolveMainKey = () => resolveMainSessionKeyFromConfig();
describe("gateway server hooks", () => {
test("hooks wake requires auth", async () => {
test("handles auth, wake, and agent flows", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: "Ping" }),
});
expect(res.status).toBe(401);
await server.close();
});
try {
const resNoAuth = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: "Ping" }),
});
expect(resNoAuth.status).toBe(401);
test("hooks wake enqueues system event", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ text: "Ping", mode: "next-heartbeat" }),
});
expect(res.status).toBe(200);
const events = await waitForSystemEvent();
expect(events.some((e) => e.includes("Ping"))).toBe(true);
drainSystemEvents(resolveMainKey());
await server.close();
});
const resWake = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ text: "Ping", mode: "next-heartbeat" }),
});
expect(resWake.status).toBe(200);
const wakeEvents = await waitForSystemEvent();
expect(wakeEvents.some((e) => e.includes("Ping"))).toBe(true);
drainSystemEvents(resolveMainKey());
test("hooks agent posts summary to main", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
cronIsolatedRun.mockResolvedValueOnce({
status: "ok",
summary: "done",
});
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: "Do it", name: "Email" }),
});
expect(res.status).toBe(202);
const events = await waitForSystemEvent();
expect(events.some((e) => e.includes("Hook Email: done"))).toBe(true);
drainSystemEvents(resolveMainKey());
await server.close();
});
cronIsolatedRun.mockReset();
cronIsolatedRun.mockResolvedValueOnce({
status: "ok",
summary: "done",
});
const resAgent = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: "Do it", name: "Email" }),
});
expect(resAgent.status).toBe(202);
const agentEvents = await waitForSystemEvent();
expect(agentEvents.some((e) => e.includes("Hook Email: done"))).toBe(true);
drainSystemEvents(resolveMainKey());
test("hooks agent forwards model override", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
cronIsolatedRun.mockClear();
cronIsolatedRun.mockResolvedValueOnce({
status: "ok",
summary: "done",
});
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({
message: "Do it",
name: "Email",
model: "openai/gpt-4.1-mini",
}),
});
expect(res.status).toBe(202);
await waitForSystemEvent();
const call = cronIsolatedRun.mock.calls[0]?.[0] as {
job?: { payload?: { model?: string } };
};
expect(call?.job?.payload?.model).toBe("openai/gpt-4.1-mini");
drainSystemEvents(resolveMainKey());
await server.close();
});
cronIsolatedRun.mockReset();
cronIsolatedRun.mockResolvedValueOnce({
status: "ok",
summary: "done",
});
const resAgentModel = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({
message: "Do it",
name: "Email",
model: "openai/gpt-4.1-mini",
}),
});
expect(resAgentModel.status).toBe(202);
await waitForSystemEvent();
const call = cronIsolatedRun.mock.calls[0]?.[0] as {
job?: { payload?: { model?: string } };
};
expect(call?.job?.payload?.model).toBe("openai/gpt-4.1-mini");
drainSystemEvents(resolveMainKey());
test("hooks wake accepts query token", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake?token=hook-secret`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: "Query auth" }),
});
expect(res.status).toBe(200);
const events = await waitForSystemEvent();
expect(events.some((e) => e.includes("Query auth"))).toBe(true);
drainSystemEvents(resolveMainKey());
await server.close();
});
const resQuery = await fetch(`http://127.0.0.1:${port}/hooks/wake?token=hook-secret`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ text: "Query auth" }),
});
expect(resQuery.status).toBe(200);
const queryEvents = await waitForSystemEvent();
expect(queryEvents.some((e) => e.includes("Query auth"))).toBe(true);
drainSystemEvents(resolveMainKey());
test("hooks agent rejects invalid channel", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: "Nope", channel: "sms" }),
});
expect(res.status).toBe(400);
expect(peekSystemEvents(resolveMainKey()).length).toBe(0);
await server.close();
});
const resBadChannel = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: "Nope", channel: "sms" }),
});
expect(resBadChannel.status).toBe(400);
expect(peekSystemEvents(resolveMainKey()).length).toBe(0);
test("hooks wake accepts x-clawdbot-token header", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-clawdbot-token": "hook-secret",
},
body: JSON.stringify({ text: "Header auth" }),
});
expect(res.status).toBe(200);
const events = await waitForSystemEvent();
expect(events.some((e) => e.includes("Header auth"))).toBe(true);
drainSystemEvents(resolveMainKey());
await server.close();
});
const resHeader = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
"x-clawdbot-token": "hook-secret",
},
body: JSON.stringify({ text: "Header auth" }),
});
expect(resHeader.status).toBe(200);
const headerEvents = await waitForSystemEvent();
expect(headerEvents.some((e) => e.includes("Header auth"))).toBe(true);
drainSystemEvents(resolveMainKey());
test("hooks rejects non-post", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "GET",
headers: { Authorization: "Bearer hook-secret" },
});
expect(res.status).toBe(405);
await server.close();
});
const resGet = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "GET",
headers: { Authorization: "Bearer hook-secret" },
});
expect(resGet.status).toBe(405);
test("hooks wake requires text", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ text: " " }),
});
expect(res.status).toBe(400);
await server.close();
});
const resBlankText = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ text: " " }),
});
expect(resBlankText.status).toBe(400);
test("hooks agent requires message", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: " " }),
});
expect(res.status).toBe(400);
await server.close();
});
const resBlankMessage = await fetch(`http://127.0.0.1:${port}/hooks/agent`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: JSON.stringify({ message: " " }),
});
expect(resBlankMessage.status).toBe(400);
test("hooks rejects invalid json", async () => {
testState.hooksConfig = { enabled: true, token: "hook-secret" };
const port = await getFreePort();
const server = await startGatewayServer(port);
const res = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: "{",
});
expect(res.status).toBe(400);
await server.close();
const resBadJson = await fetch(`http://127.0.0.1:${port}/hooks/wake`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: "Bearer hook-secret",
},
body: "{",
});
expect(resBadJson.status).toBe(400);
} finally {
await server.close();
}
});
});

View File

@ -1,189 +0,0 @@
import fs from "node:fs/promises";
import { createServer } from "node:net";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
import { getChannelPlugin } from "../channels/plugins/index.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import type { ChannelOutboundAdapter } from "../channels/plugins/types.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin } from "../test-utils/channel-plugins.js";
import {
connectOk,
getFreePort,
installGatewayTestHooks,
occupyPort,
onceMessage,
startGatewayServer,
startServerWithClient,
testState,
testTailnetIPv4,
} from "./test-helpers.js";
installGatewayTestHooks();
const whatsappOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
sendText: async ({ deps, to, text }) => {
if (!deps?.sendWhatsApp) {
throw new Error("Missing sendWhatsApp dep");
}
return { channel: "whatsapp", ...(await deps.sendWhatsApp(to, text, {})) };
},
sendMedia: async ({ deps, to, text, mediaUrl }) => {
if (!deps?.sendWhatsApp) {
throw new Error("Missing sendWhatsApp dep");
}
return { channel: "whatsapp", ...(await deps.sendWhatsApp(to, text, { mediaUrl })) };
},
};
const whatsappPlugin = createOutboundTestPlugin({
id: "whatsapp",
outbound: whatsappOutbound,
label: "WhatsApp",
});
const createRegistry = (channels: PluginRegistry["channels"]): PluginRegistry => ({
plugins: [],
tools: [],
channels,
providers: [],
gatewayHandlers: {},
httpHandlers: [],
cliRegistrars: [],
services: [],
diagnostics: [],
});
const whatsappRegistry = createRegistry([
{
pluginId: "whatsapp",
source: "test",
plugin: whatsappPlugin,
},
]);
const emptyRegistry = createRegistry([]);
describe("gateway server misc", () => {
test("hello-ok advertises the gateway port for canvas host", async () => {
const prevToken = process.env.CLAWDBOT_GATEWAY_TOKEN;
const prevCanvasPort = process.env.CLAWDBOT_CANVAS_HOST_PORT;
process.env.CLAWDBOT_GATEWAY_TOKEN = "secret";
testTailnetIPv4.value = "100.64.0.1";
testState.gatewayBind = "lan";
const canvasPort = await getFreePort();
testState.canvasHostPort = canvasPort;
process.env.CLAWDBOT_CANVAS_HOST_PORT = String(canvasPort);
const port = await getFreePort();
const canvasHostUrl = resolveCanvasHostUrl({
canvasPort,
requestHost: `100.64.0.1:${port}`,
localAddress: "127.0.0.1",
});
expect(canvasHostUrl).toBe(`http://100.64.0.1:${canvasPort}`);
if (prevToken === undefined) {
delete process.env.CLAWDBOT_GATEWAY_TOKEN;
} else {
process.env.CLAWDBOT_GATEWAY_TOKEN = prevToken;
}
if (prevCanvasPort === undefined) {
delete process.env.CLAWDBOT_CANVAS_HOST_PORT;
} else {
process.env.CLAWDBOT_CANVAS_HOST_PORT = prevCanvasPort;
}
});
test("send dedupes by idempotencyKey", { timeout: 60_000 }, async () => {
const prevRegistry = getActivePluginRegistry() ?? emptyRegistry;
try {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
setActivePluginRegistry(whatsappRegistry);
expect(getChannelPlugin("whatsapp")).toBeDefined();
const idem = "same-key";
const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1");
const res2P = onceMessage(ws, (o) => o.type === "res" && o.id === "a2");
const sendReq = (id: string) =>
ws.send(
JSON.stringify({
type: "req",
id,
method: "send",
params: { to: "+15550000000", message: "hi", idempotencyKey: idem },
}),
);
sendReq("a1");
sendReq("a2");
const res1 = await res1P;
const res2 = await res2P;
expect(res1.ok).toBe(true);
expect(res2.ok).toBe(true);
expect(res1.payload).toEqual(res2.payload);
ws.close();
await server.close();
} finally {
setActivePluginRegistry(prevRegistry);
}
});
test("auto-enables configured channel plugins on startup", async () => {
const configPath = process.env.CLAWDBOT_CONFIG_PATH;
if (!configPath) throw new Error("Missing CLAWDBOT_CONFIG_PATH");
await fs.mkdir(path.dirname(configPath), { recursive: true });
await fs.writeFile(
configPath,
JSON.stringify(
{
channels: {
discord: {
token: "token-123",
},
},
},
null,
2,
),
"utf-8",
);
const port = await getFreePort();
const server = await startGatewayServer(port);
await server.close();
const updated = JSON.parse(await fs.readFile(configPath, "utf-8")) as Record<string, unknown>;
const plugins = updated.plugins as Record<string, unknown> | undefined;
const entries = plugins?.entries as Record<string, unknown> | undefined;
const discord = entries?.discord as Record<string, unknown> | undefined;
expect(discord?.enabled).toBe(true);
expect((updated.channels as Record<string, unknown> | undefined)?.discord).toMatchObject({
token: "token-123",
});
});
test("refuses to start when port already bound", async () => {
const { server: blocker, port } = await occupyPort();
await expect(startGatewayServer(port)).rejects.toBeInstanceOf(GatewayLockError);
await expect(startGatewayServer(port)).rejects.toThrow(/already listening/i);
blocker.close();
});
test("releases port after close", async () => {
const port = await getFreePort();
const server = await startGatewayServer(port);
await server.close();
const probe = createServer();
await new Promise<void>((resolve, reject) => {
probe.once("error", reject);
probe.listen(port, "127.0.0.1", () => resolve());
});
await new Promise<void>((resolve, reject) =>
probe.close((err) => (err ? reject(err) : resolve())),
);
});
});

View File

@ -1,19 +1,93 @@
import fs from "node:fs/promises";
import { createServer } from "node:net";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "vitest";
import { afterAll, beforeAll, describe, expect, test } from "vitest";
import { WebSocket } from "ws";
import { getChannelPlugin } from "../channels/plugins/index.js";
import type { ChannelOutboundAdapter } from "../channels/plugins/types.js";
import { resolveCanvasHostUrl } from "../infra/canvas-host-url.js";
import { GatewayLockError } from "../infra/gateway-lock.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import { createOutboundTestPlugin } from "../test-utils/channel-plugins.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import {
connectOk,
getFreePort,
installGatewayTestHooks,
occupyPort,
onceMessage,
piSdkMock,
rpcReq,
startGatewayServer,
startServerWithClient,
testState,
testTailnetIPv4,
} from "./test-helpers.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: WebSocket;
let port: number;
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
port = started.port;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
const whatsappOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
sendText: async ({ deps, to, text }) => {
if (!deps?.sendWhatsApp) {
throw new Error("Missing sendWhatsApp dep");
}
return { channel: "whatsapp", ...(await deps.sendWhatsApp(to, text, {})) };
},
sendMedia: async ({ deps, to, text, mediaUrl }) => {
if (!deps?.sendWhatsApp) {
throw new Error("Missing sendWhatsApp dep");
}
return { channel: "whatsapp", ...(await deps.sendWhatsApp(to, text, { mediaUrl })) };
},
};
const whatsappPlugin = createOutboundTestPlugin({
id: "whatsapp",
outbound: whatsappOutbound,
label: "WhatsApp",
});
const createRegistry = (channels: PluginRegistry["channels"]): PluginRegistry => ({
plugins: [],
tools: [],
channels,
providers: [],
gatewayHandlers: {},
httpHandlers: [],
cliRegistrars: [],
services: [],
diagnostics: [],
});
const whatsappRegistry = createRegistry([
{
pluginId: "whatsapp",
source: "test",
plugin: whatsappPlugin,
},
]);
const emptyRegistry = createRegistry([]);
describe("gateway server models + voicewake", () => {
const setTempHome = (homeDir: string) => {
@ -68,9 +142,6 @@ describe("gateway server models + voicewake", () => {
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-home-"));
const restoreHome = setTempHome(homeDir);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const initial = await rpcReq<{ triggers: string[] }>(ws, "voicewake.get");
expect(initial.ok).toBe(true);
expect(initial.payload?.triggers).toEqual(["clawd", "claude", "computer"]);
@ -104,9 +175,6 @@ describe("gateway server models + voicewake", () => {
expect(onDisk.triggers).toEqual(["hi", "there"]);
expect(typeof onDisk.updatedAtMs).toBe("number");
ws.close();
await server.close();
restoreHome();
},
);
@ -115,9 +183,6 @@ describe("gateway server models + voicewake", () => {
const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-home-"));
const restoreHome = setTempHome(homeDir);
const { server, ws, port } = await startServerWithClient();
await connectOk(ws);
const nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => nodeWs.once("open", resolve));
const firstEventP = onceMessage<{ type: "event"; event: string; payload?: unknown }>(
@ -159,9 +224,6 @@ describe("gateway server models + voicewake", () => {
]);
nodeWs.close();
ws.close();
await server.close();
restoreHome();
});
@ -189,9 +251,6 @@ describe("gateway server models + voicewake", () => {
},
];
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res1 = await rpcReq<{
models: Array<{
id: string;
@ -241,23 +300,132 @@ describe("gateway server models + voicewake", () => {
]);
expect(piSdkMock.discoverCalls).toBe(1);
ws.close();
await server.close();
});
test("models.list rejects unknown params", async () => {
piSdkMock.enabled = true;
piSdkMock.models = [{ id: "gpt-test-a", name: "A", provider: "openai" }];
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq(ws, "models.list", { extra: true });
expect(res.ok).toBe(false);
expect(res.error?.message ?? "").toMatch(/invalid models\.list params/i);
ws.close();
await server.close();
});
});
describe("gateway server misc", () => {
test("hello-ok advertises the gateway port for canvas host", async () => {
const prevToken = process.env.CLAWDBOT_GATEWAY_TOKEN;
const prevCanvasPort = process.env.CLAWDBOT_CANVAS_HOST_PORT;
process.env.CLAWDBOT_GATEWAY_TOKEN = "secret";
testTailnetIPv4.value = "100.64.0.1";
testState.gatewayBind = "lan";
const canvasPort = await getFreePort();
testState.canvasHostPort = canvasPort;
process.env.CLAWDBOT_CANVAS_HOST_PORT = String(canvasPort);
const testPort = await getFreePort();
const canvasHostUrl = resolveCanvasHostUrl({
canvasPort,
requestHost: `100.64.0.1:${testPort}`,
localAddress: "127.0.0.1",
});
expect(canvasHostUrl).toBe(`http://100.64.0.1:${canvasPort}`);
if (prevToken === undefined) {
delete process.env.CLAWDBOT_GATEWAY_TOKEN;
} else {
process.env.CLAWDBOT_GATEWAY_TOKEN = prevToken;
}
if (prevCanvasPort === undefined) {
delete process.env.CLAWDBOT_CANVAS_HOST_PORT;
} else {
process.env.CLAWDBOT_CANVAS_HOST_PORT = prevCanvasPort;
}
});
test("send dedupes by idempotencyKey", { timeout: 60_000 }, async () => {
const prevRegistry = getActivePluginRegistry() ?? emptyRegistry;
try {
setActivePluginRegistry(whatsappRegistry);
expect(getChannelPlugin("whatsapp")).toBeDefined();
const idem = "same-key";
const res1P = onceMessage(ws, (o) => o.type === "res" && o.id === "a1");
const res2P = onceMessage(ws, (o) => o.type === "res" && o.id === "a2");
const sendReq = (id: string) =>
ws.send(
JSON.stringify({
type: "req",
id,
method: "send",
params: { to: "+15550000000", message: "hi", idempotencyKey: idem },
}),
);
sendReq("a1");
sendReq("a2");
const res1 = await res1P;
const res2 = await res2P;
expect(res1.ok).toBe(true);
expect(res2.ok).toBe(true);
expect(res1.payload).toEqual(res2.payload);
} finally {
setActivePluginRegistry(prevRegistry);
}
});
test("auto-enables configured channel plugins on startup", async () => {
const configPath = process.env.CLAWDBOT_CONFIG_PATH;
if (!configPath) throw new Error("Missing CLAWDBOT_CONFIG_PATH");
await fs.mkdir(path.dirname(configPath), { recursive: true });
await fs.writeFile(
configPath,
JSON.stringify(
{
channels: {
discord: {
token: "token-123",
},
},
},
null,
2,
),
"utf-8",
);
const autoPort = await getFreePort();
const autoServer = await startGatewayServer(autoPort);
await autoServer.close();
const updated = JSON.parse(await fs.readFile(configPath, "utf-8")) as Record<string, unknown>;
const plugins = updated.plugins as Record<string, unknown> | undefined;
const entries = plugins?.entries as Record<string, unknown> | undefined;
const discord = entries?.discord as Record<string, unknown> | undefined;
expect(discord?.enabled).toBe(true);
expect((updated.channels as Record<string, unknown> | undefined)?.discord).toMatchObject({
token: "token-123",
});
});
test("refuses to start when port already bound", async () => {
const { server: blocker, port: blockedPort } = await occupyPort();
await expect(startGatewayServer(blockedPort)).rejects.toBeInstanceOf(GatewayLockError);
await expect(startGatewayServer(blockedPort)).rejects.toThrow(/already listening/i);
blocker.close();
});
test("releases port after close", async () => {
const releasePort = await getFreePort();
const releaseServer = await startGatewayServer(releasePort);
await releaseServer.close();
const probe = createServer();
await new Promise<void>((resolve, reject) => {
probe.once("error", reject);
probe.listen(releasePort, "127.0.0.1", () => resolve());
});
await new Promise<void>((resolve, reject) =>
probe.close((err) => (err ? reject(err) : resolve())),
);
});
});

View File

@ -1,5 +1,12 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getFreePort, installGatewayTestHooks, startGatewayServer } from "./test-helpers.js";
import {
connectOk,
getFreePort,
installGatewayTestHooks,
rpcReq,
startGatewayServer,
startServerWithClient,
} from "./test-helpers.js";
const hoisted = vi.hoisted(() => {
const cronInstances: Array<{
@ -158,7 +165,7 @@ vi.mock("./config-reload.js", () => ({
startGatewayConfigReloader: hoisted.startGatewayConfigReloader,
}));
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
describe("gateway hot reload", () => {
let prevSkipChannels: string | undefined;
@ -298,3 +305,15 @@ describe("gateway hot reload", () => {
await server.close();
});
});
describe("gateway agents", () => {
it("lists configured agents via agents.list RPC", async () => {
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const res = await rpcReq<{ agents: Array<{ id: string }> }>(ws, "agents.list", {});
expect(res.ok).toBe(true);
expect(res.payload?.agents.map((agent) => agent.id)).toContain("main");
ws.close();
await server.close();
});
});

View File

@ -1,15 +1,48 @@
import { describe, expect, test } from "vitest";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, test, vi } from "vitest";
import { WebSocket } from "ws";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { GatewayClient } from "./client.js";
vi.mock("../infra/update-runner.js", () => ({
runGatewayUpdate: vi.fn(async () => ({
status: "ok",
mode: "git",
root: "/repo",
steps: [],
durationMs: 12,
})),
}));
import {
connectOk,
installGatewayTestHooks,
onceMessage,
rpcReq,
startServerWithClient,
} from "./test-helpers.js";
import { GatewayClient } from "./client.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
let server: Awaited<ReturnType<typeof startServerWithClient>>["server"];
let ws: WebSocket;
let port: number;
beforeAll(async () => {
const started = await startServerWithClient();
server = started.server;
ws = started.ws;
port = started.port;
await connectOk(ws);
});
afterAll(async () => {
ws.close();
await server.close();
});
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
@ -65,11 +98,99 @@ const connectNodeClient = async (params: {
return client;
};
async function waitForSignal(check: () => boolean, timeoutMs = 2000) {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (check()) return;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error("timeout");
}
describe("gateway role enforcement", () => {
test("enforces operator and node permissions", async () => {
const nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => nodeWs.once("open", resolve));
try {
const eventRes = await rpcReq(ws, "node.event", { event: "test", payload: { ok: true } });
expect(eventRes.ok).toBe(false);
expect(eventRes.error?.message ?? "").toContain("unauthorized role");
const invokeRes = await rpcReq(ws, "node.invoke.result", {
id: "invoke-1",
nodeId: "node-1",
ok: true,
});
expect(invokeRes.ok).toBe(false);
expect(invokeRes.error?.message ?? "").toContain("unauthorized role");
await connectOk(nodeWs, {
role: "node",
client: {
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
version: "1.0.0",
platform: "ios",
mode: GATEWAY_CLIENT_MODES.NODE,
},
commands: [],
});
const binsRes = await rpcReq<{ bins?: unknown[] }>(nodeWs, "skills.bins", {});
expect(binsRes.ok).toBe(true);
expect(Array.isArray(binsRes.payload?.bins)).toBe(true);
const statusRes = await rpcReq(nodeWs, "status", {});
expect(statusRes.ok).toBe(false);
expect(statusRes.error?.message ?? "").toContain("unauthorized role");
} finally {
nodeWs.close();
}
});
});
describe("gateway update.run", () => {
test("writes sentinel and schedules restart", async () => {
const sigusr1 = vi.fn();
process.on("SIGUSR1", sigusr1);
try {
const id = "req-update";
ws.send(
JSON.stringify({
type: "req",
id,
method: "update.run",
params: {
sessionKey: "agent:main:whatsapp:dm:+15555550123",
restartDelayMs: 0,
},
}),
);
const res = await onceMessage<{ ok: boolean; payload?: unknown }>(
ws,
(o) => o.type === "res" && o.id === id,
);
expect(res.ok).toBe(true);
await waitForSignal(() => sigusr1.mock.calls.length > 0);
expect(sigusr1).toHaveBeenCalled();
const sentinelPath = path.join(os.homedir(), ".clawdbot", "restart-sentinel.json");
const raw = await fs.readFile(sentinelPath, "utf-8");
const parsed = JSON.parse(raw) as {
payload?: { kind?: string; stats?: { mode?: string } };
};
expect(parsed.payload?.kind).toBe("update");
expect(parsed.payload?.stats?.mode).toBe("git");
} finally {
process.off("SIGUSR1", sigusr1);
}
});
});
describe("gateway node command allowlist", () => {
test("enforces command allowlists across node clients", async () => {
const { server, ws, port } = await startServerWithClient();
await connectOk(ws);
const waitForConnectedCount = async (count: number) => {
await expect
.poll(
@ -96,8 +217,12 @@ describe("gateway node command allowlist", () => {
return nodeId;
};
let systemClient: GatewayClient | undefined;
let emptyClient: GatewayClient | undefined;
let allowedClient: GatewayClient | undefined;
try {
const systemClient = await connectNodeClient({
systemClient = await connectNodeClient({
port,
commands: ["system.run"],
instanceId: "node-system-run",
@ -115,7 +240,7 @@ describe("gateway node command allowlist", () => {
systemClient.stop();
await waitForConnectedCount(0);
const emptyClient = await connectNodeClient({
emptyClient = await connectNodeClient({
port,
commands: [],
instanceId: "node-empty",
@ -138,7 +263,7 @@ describe("gateway node command allowlist", () => {
new Promise<{ id?: string; nodeId?: string }>((resolve) => {
resolveInvoke = resolve;
});
const allowedClient = await connectNodeClient({
allowedClient = await connectNodeClient({
port,
commands: ["canvas.snapshot"],
instanceId: "node-allowed",
@ -187,11 +312,10 @@ describe("gateway node command allowlist", () => {
});
const invokeNullRes = await invokeNullResP;
expect(invokeNullRes.ok).toBe(true);
allowedClient.stop();
} finally {
ws.close();
await server.close();
systemClient?.stop();
emptyClient?.stop();
allowedClient?.stop();
}
});
});

View File

@ -1,56 +0,0 @@
import { describe, expect, test } from "vitest";
import { WebSocket } from "ws";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import {
connectOk,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
} from "./test-helpers.js";
installGatewayTestHooks();
describe("gateway role enforcement", () => {
test("enforces operator and node permissions", async () => {
const { server, ws, port } = await startServerWithClient();
await connectOk(ws);
const eventRes = await rpcReq(ws, "node.event", { event: "test", payload: { ok: true } });
expect(eventRes.ok).toBe(false);
expect(eventRes.error?.message ?? "").toContain("unauthorized role");
const invokeRes = await rpcReq(ws, "node.invoke.result", {
id: "invoke-1",
nodeId: "node-1",
ok: true,
});
expect(invokeRes.ok).toBe(false);
expect(invokeRes.error?.message ?? "").toContain("unauthorized role");
const nodeWs = new WebSocket(`ws://127.0.0.1:${port}`);
await new Promise<void>((resolve) => nodeWs.once("open", resolve));
await connectOk(nodeWs, {
role: "node",
client: {
id: GATEWAY_CLIENT_NAMES.NODE_HOST,
version: "1.0.0",
platform: "ios",
mode: GATEWAY_CLIENT_MODES.NODE,
},
commands: [],
});
const binsRes = await rpcReq<{ bins?: unknown[] }>(nodeWs, "skills.bins", {});
expect(binsRes.ok).toBe(true);
expect(Array.isArray(binsRes.payload?.bins)).toBe(true);
const statusRes = await rpcReq(nodeWs, "status", {});
expect(statusRes.ok).toBe(false);
expect(statusRes.error?.message ?? "").toContain("unauthorized role");
nodeWs.close();
ws.close();
await server.close();
});
});

View File

@ -1,6 +1,6 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { createClawdbotTools } from "../agents/clawdbot-tools.js";
import { resolveSessionTranscriptPath } from "../config/sessions.js";
import { emitAgentEvent } from "../infra/agent-events.js";
@ -11,30 +11,38 @@ import {
startGatewayServer,
} from "./test-helpers.js";
installGatewayTestHooks();
installGatewayTestHooks({ scope: "suite" });
const servers: Array<Awaited<ReturnType<typeof startGatewayServer>>> = [];
let server: Awaited<ReturnType<typeof startGatewayServer>>;
let gatewayPort: number;
let prevGatewayPort: string | undefined;
let prevGatewayToken: string | undefined;
afterEach(async () => {
for (const server of servers) {
try {
await server.close();
} catch {
/* ignore */
}
beforeAll(async () => {
prevGatewayPort = process.env.CLAWDBOT_GATEWAY_PORT;
prevGatewayToken = process.env.CLAWDBOT_GATEWAY_TOKEN;
gatewayPort = await getFreePort();
process.env.CLAWDBOT_GATEWAY_PORT = String(gatewayPort);
process.env.CLAWDBOT_GATEWAY_TOKEN = "test-token";
server = await startGatewayServer(gatewayPort);
});
afterAll(async () => {
await server.close();
if (prevGatewayPort === undefined) {
delete process.env.CLAWDBOT_GATEWAY_PORT;
} else {
process.env.CLAWDBOT_GATEWAY_PORT = prevGatewayPort;
}
if (prevGatewayToken === undefined) {
delete process.env.CLAWDBOT_GATEWAY_TOKEN;
} else {
process.env.CLAWDBOT_GATEWAY_TOKEN = prevGatewayToken;
}
servers.length = 0;
// Add small delay to ensure port is fully released by OS
await new Promise((resolve) => setTimeout(resolve, 50));
});
describe("sessions_send gateway loopback", () => {
it("returns reply when lifecycle ends before agent.wait", async () => {
const port = await getFreePort();
vi.stubEnv("CLAWDBOT_GATEWAY_PORT", String(port));
vi.stubEnv("CLAWDBOT_GATEWAY_TOKEN", "test-token");
const server = await startGatewayServer(port);
const spy = vi.mocked(agentCommand);
spy.mockImplementation(async (opts) => {
const params = opts as {
@ -78,8 +86,6 @@ describe("sessions_send gateway loopback", () => {
});
});
servers.push(server);
const tool = createClawdbotTools().find((candidate) => candidate.name === "sessions_send");
if (!tool) throw new Error("missing sessions_send tool");
@ -104,12 +110,6 @@ describe("sessions_send gateway loopback", () => {
describe("sessions_send label lookup", () => {
it("finds session by label and sends message", { timeout: 60_000 }, async () => {
const port = await getFreePort();
vi.stubEnv("CLAWDBOT_GATEWAY_PORT", String(port));
vi.stubEnv("CLAWDBOT_GATEWAY_TOKEN", "test-token");
const server = await startGatewayServer(port);
servers.push(server);
const spy = vi.mocked(agentCommand);
spy.mockImplementation(async (opts) => {
const params = opts as {
@ -171,13 +171,6 @@ describe("sessions_send label lookup", () => {
});
it("returns error when label not found", { timeout: 60_000 }, async () => {
const port = await getFreePort();
vi.stubEnv("CLAWDBOT_GATEWAY_PORT", String(port));
vi.stubEnv("CLAWDBOT_GATEWAY_TOKEN", "test-token");
const server = await startGatewayServer(port);
servers.push(server);
const tool = createClawdbotTools().find((candidate) => candidate.name === "sessions_send");
if (!tool) throw new Error("missing sessions_send tool");
@ -192,13 +185,6 @@ describe("sessions_send label lookup", () => {
});
it("returns error when neither sessionKey nor label provided", { timeout: 60_000 }, async () => {
const port = await getFreePort();
vi.stubEnv("CLAWDBOT_GATEWAY_PORT", String(port));
vi.stubEnv("CLAWDBOT_GATEWAY_TOKEN", "test-token");
const server = await startGatewayServer(port);
servers.push(server);
const tool = createClawdbotTools().find((candidate) => candidate.name === "sessions_send");
if (!tool) throw new Error("missing sessions_send tool");

View File

@ -1,122 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, test } from "vitest";
import {
connectOk,
installGatewayTestHooks,
rpcReq,
startServerWithClient,
testState,
writeSessionStore,
} from "./test-helpers.js";
installGatewayTestHooks();
describe("gateway server sessions", () => {
test("filters sessions by agentId", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-agents-"));
testState.sessionConfig = {
store: path.join(dir, "{agentId}", "sessions.json"),
};
testState.agentsConfig = {
list: [{ id: "home", default: true }, { id: "work" }],
};
const homeDir = path.join(dir, "home");
const workDir = path.join(dir, "work");
await fs.mkdir(homeDir, { recursive: true });
await fs.mkdir(workDir, { recursive: true });
await writeSessionStore({
storePath: path.join(homeDir, "sessions.json"),
agentId: "home",
entries: {
main: {
sessionId: "sess-home-main",
updatedAt: Date.now(),
},
"discord:group:dev": {
sessionId: "sess-home-group",
updatedAt: Date.now() - 1000,
},
},
});
await writeSessionStore({
storePath: path.join(workDir, "sessions.json"),
agentId: "work",
entries: {
main: {
sessionId: "sess-work-main",
updatedAt: Date.now(),
},
},
});
const { ws } = await startServerWithClient();
await connectOk(ws);
const homeSessions = await rpcReq<{
sessions: Array<{ key: string }>;
}>(ws, "sessions.list", {
includeGlobal: false,
includeUnknown: false,
agentId: "home",
});
expect(homeSessions.ok).toBe(true);
expect(homeSessions.payload?.sessions.map((s) => s.key).sort()).toEqual([
"agent:home:discord:group:dev",
"agent:home:main",
]);
const workSessions = await rpcReq<{
sessions: Array<{ key: string }>;
}>(ws, "sessions.list", {
includeGlobal: false,
includeUnknown: false,
agentId: "work",
});
expect(workSessions.ok).toBe(true);
expect(workSessions.payload?.sessions.map((s) => s.key)).toEqual(["agent:work:main"]);
});
test("resolves and patches main alias to default agent main key", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-sessions-"));
const storePath = path.join(dir, "sessions.json");
testState.sessionStorePath = storePath;
testState.agentsConfig = { list: [{ id: "ops", default: true }] };
testState.sessionConfig = { mainKey: "work" };
await writeSessionStore({
storePath,
agentId: "ops",
mainKey: "work",
entries: {
main: {
sessionId: "sess-ops-main",
updatedAt: Date.now(),
},
},
});
const { ws } = await startServerWithClient();
await connectOk(ws);
const resolved = await rpcReq<{ ok: true; key: string }>(ws, "sessions.resolve", {
key: "main",
});
expect(resolved.ok).toBe(true);
expect(resolved.payload?.key).toBe("agent:ops:work");
const patched = await rpcReq<{ ok: true; key: string }>(ws, "sessions.patch", {
key: "main",
thinkingLevel: "medium",
});
expect(patched.ok).toBe(true);
expect(patched.payload?.key).toBe("agent:ops:work");
const stored = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record<
string,
{ thinkingLevel?: string }
>;
expect(stored["agent:ops:work"]?.thinkingLevel).toBe("medium");
expect(stored.main).toBeUndefined();
});
});

View File

@ -1,75 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it, vi } from "vitest";
vi.mock("../infra/update-runner.js", () => ({
runGatewayUpdate: vi.fn(async () => ({
status: "ok",
mode: "git",
root: "/repo",
steps: [],
durationMs: 12,
})),
}));
import {
connectOk,
installGatewayTestHooks,
onceMessage,
startServerWithClient,
} from "./test-helpers.js";
installGatewayTestHooks();
async function waitForSignal(check: () => boolean, timeoutMs = 2000) {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (check()) return;
await new Promise((resolve) => setTimeout(resolve, 10));
}
throw new Error("timeout");
}
describe("gateway update.run", () => {
it("writes sentinel and schedules restart", async () => {
const sigusr1 = vi.fn();
process.on("SIGUSR1", sigusr1);
const { server, ws } = await startServerWithClient();
await connectOk(ws);
const id = "req-update";
ws.send(
JSON.stringify({
type: "req",
id,
method: "update.run",
params: {
sessionKey: "agent:main:whatsapp:dm:+15555550123",
restartDelayMs: 0,
},
}),
);
const res = await onceMessage<{ ok: boolean; payload?: unknown }>(
ws,
(o) => o.type === "res" && o.id === id,
);
expect(res.ok).toBe(true);
await waitForSignal(() => sigusr1.mock.calls.length > 0);
expect(sigusr1).toHaveBeenCalled();
const sentinelPath = path.join(os.homedir(), ".clawdbot", "restart-sentinel.json");
const raw = await fs.readFile(sentinelPath, "utf-8");
const parsed = JSON.parse(raw) as {
payload?: { kind?: string; stats?: { mode?: string } };
};
expect(parsed.payload?.kind).toBe("update");
expect(parsed.payload?.stats?.mode).toBe("git");
ws.close();
await server.close();
process.off("SIGUSR1", sigusr1);
});
});