From d7815089527c10e1350ab3c674cc1063cab481fa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 10 Jan 2026 17:02:20 +0100 Subject: [PATCH] fix: make chat.send non-blocking --- CHANGELOG.md | 1 + src/gateway/server-bridge.ts | 92 ++++++++++++++++--- src/gateway/server-methods/chat.ts | 90 +++++++++++++++--- src/gateway/server.chat.test.ts | 141 +++++++++++++++++++++++++++-- ui/src/ui/app-render.ts | 13 ++- ui/src/ui/app.ts | 28 ++++-- ui/src/ui/controllers/chat.ts | 16 ++++ ui/src/ui/views/chat.ts | 15 ++- 8 files changed, 347 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f737ad3f0..76f1e5e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Agents/OpenAI: fix Responses tool-only → follow-up turn handling (avoid standalone `reasoning` items that trigger 400 “required following item”). - Auth: update Claude Code keychain credentials in-place during refresh sync; share JSON file helpers; add CLI fallback coverage. - Onboarding/Gateway: persist non-interactive gateway token auth in config; add WS wizard + gateway tool-calling regression coverage. +- Gateway/Control UI: make `chat.send` non-blocking, wire Stop to `chat.abort`, and treat `/stop` as an out-of-band abort. (#653) - CLI: `clawdbot sessions` now includes `elev:*` + `usage:*` flags in the table output. - CLI/Pairing: accept positional provider for `pairing list|approve` (npm-run compatible); update docs/bot hints. - Branding: normalize user-facing “ClawdBot”/“CLAWDBOT” → “Clawdbot” (CLI, status, docs). diff --git a/src/gateway/server-bridge.ts b/src/gateway/server-bridge.ts index 56ac4bf55..d44649482 100644 --- a/src/gateway/server-bridge.ts +++ b/src/gateway/server-bridge.ts @@ -9,6 +9,7 @@ import { waitForEmbeddedPiRunEnd, } from "../agents/pi-embedded.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; +import { isAbortTrigger } from "../auto-reply/reply/abort.js"; import type { CliDeps } from "../cli/deps.js"; import { agentCommand } from "../commands/agent.js"; import type { HealthSummary } from "../commands/health.js"; @@ -764,6 +765,12 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { timeoutMs?: number; idempotencyKey: string; }; + const stopCommand = (() => { + const msg = p.message.trim(); + if (!msg) return false; + const normalized = msg.toLowerCase(); + return normalized === "/stop" || isAbortTrigger(msg); + })(); const normalizedAttachments = p.attachments?.map((a) => ({ type: typeof a?.type === "string" ? a.type : undefined, @@ -818,6 +825,35 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { const clientRunId = p.idempotencyKey; registerAgentRunContext(clientRunId, { sessionKey: p.sessionKey }); + if (stopCommand) { + const runIds: string[] = []; + for (const [runId, active] of ctx.chatAbortControllers) { + if (active.sessionKey !== p.sessionKey) continue; + active.controller.abort(); + ctx.chatAbortControllers.delete(runId); + ctx.chatRunBuffers.delete(runId); + ctx.chatDeltaSentAt.delete(runId); + ctx.removeChatRun(runId, runId, p.sessionKey); + const payload = { + runId, + sessionKey: p.sessionKey, + seq: (ctx.agentRunSeq.get(runId) ?? 0) + 1, + state: "aborted" as const, + }; + ctx.broadcast("chat", payload); + ctx.bridgeSendToSession(p.sessionKey, "chat", payload); + runIds.push(runId); + } + return { + ok: true, + payloadJSON: JSON.stringify({ + ok: true, + aborted: runIds.length > 0, + runIds, + }), + }; + } + const cached = ctx.dedupe.get(`chat:${clientRunId}`); if (cached) { if (cached.ok) { @@ -832,6 +868,17 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { }; } + const activeExisting = ctx.chatAbortControllers.get(clientRunId); + if (activeExisting) { + return { + ok: true, + payloadJSON: JSON.stringify({ + runId: clientRunId, + status: "in_flight", + }), + }; + } + try { const abortController = new AbortController(); ctx.chatAbortControllers.set(clientRunId, { @@ -851,7 +898,11 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { } } - await agentCommand( + const ackPayload = { + runId: clientRunId, + status: "started" as const, + }; + void agentCommand( { message: messageWithAttachments, sessionId, @@ -865,17 +916,32 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { }, defaultRuntime, ctx.deps, - ); - const payload = { - runId: clientRunId, - status: "ok" as const, - }; - ctx.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: true, - payload, - }); - return { ok: true, payloadJSON: JSON.stringify(payload) }; + ) + .then(() => { + ctx.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: true, + payload: { runId: clientRunId, status: "ok" as const }, + }); + }) + .catch((err) => { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + ctx.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: false, + payload: { + runId: clientRunId, + status: "error" as const, + summary: String(err), + }, + error, + }); + }) + .finally(() => { + ctx.chatAbortControllers.delete(clientRunId); + }); + + return { ok: true, payloadJSON: JSON.stringify(ackPayload) }; } catch (err) { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { @@ -896,8 +962,6 @@ export function createBridgeHandlers(ctx: BridgeHandlersContext) { message: String(err), }, }; - } finally { - ctx.chatAbortControllers.delete(clientRunId); } } default: diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index f0f36cbaf..4305db9d2 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto"; import { resolveThinkingDefault } from "../../agents/model-selection.js"; import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; +import { isAbortTrigger } from "../../auto-reply/reply/abort.js"; import { agentCommand } from "../../commands/agent.js"; import { mergeSessionEntry, saveSessionStore } from "../../config/sessions.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; @@ -157,6 +158,12 @@ export const chatHandlers: GatewayRequestHandlers = { timeoutMs?: number; idempotencyKey: string; }; + const stopCommand = (() => { + const msg = p.message.trim(); + if (!msg) return false; + const normalized = msg.toLowerCase(); + return normalized === "/stop" || isAbortTrigger(msg); + })(); const normalizedAttachments = p.attachments?.map((a) => ({ type: typeof a?.type === "string" ? a.type : undefined, @@ -223,6 +230,33 @@ export const chatHandlers: GatewayRequestHandlers = { return; } + if (stopCommand) { + const runIds: string[] = []; + for (const [runId, active] of context.chatAbortControllers) { + if (active.sessionKey !== p.sessionKey) continue; + active.controller.abort(); + context.chatAbortControllers.delete(runId); + context.chatRunBuffers.delete(runId); + context.chatDeltaSentAt.delete(runId); + context.removeChatRun(runId, runId, p.sessionKey); + const payload = { + runId, + sessionKey: p.sessionKey, + seq: (context.agentRunSeq.get(runId) ?? 0) + 1, + state: "aborted" as const, + }; + context.broadcast("chat", payload); + context.bridgeSendToSession(p.sessionKey, "chat", payload); + runIds.push(runId); + } + respond(true, { + ok: true, + aborted: runIds.length > 0, + runIds, + }); + return; + } + const cached = context.dedupe.get(`chat:${clientRunId}`); if (cached) { respond(cached.ok, cached.payload, cached.error, { @@ -231,6 +265,17 @@ export const chatHandlers: GatewayRequestHandlers = { return; } + const activeExisting = context.chatAbortControllers.get(clientRunId); + if (activeExisting) { + respond( + true, + { runId: clientRunId, status: "in_flight" as const }, + undefined, + { cached: true, runId: clientRunId }, + ); + return; + } + try { const abortController = new AbortController(); context.chatAbortControllers.set(clientRunId, { @@ -250,7 +295,13 @@ export const chatHandlers: GatewayRequestHandlers = { } } - await agentCommand( + const ackPayload = { + runId: clientRunId, + status: "started" as const, + }; + respond(true, ackPayload, undefined, { runId: clientRunId }); + + void agentCommand( { message: messageWithAttachments, sessionId, @@ -264,17 +315,30 @@ export const chatHandlers: GatewayRequestHandlers = { }, defaultRuntime, context.deps, - ); - const payload = { - runId: clientRunId, - status: "ok" as const, - }; - context.dedupe.set(`chat:${clientRunId}`, { - ts: Date.now(), - ok: true, - payload, - }); - respond(true, payload, undefined, { runId: clientRunId }); + ) + .then(() => { + context.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: true, + payload: { runId: clientRunId, status: "ok" as const }, + }); + }) + .catch((err) => { + const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); + context.dedupe.set(`chat:${clientRunId}`, { + ts: Date.now(), + ok: false, + payload: { + runId: clientRunId, + status: "error" as const, + summary: String(err), + }, + error, + }); + }) + .finally(() => { + context.chatAbortControllers.delete(clientRunId); + }); } catch (err) { const error = errorShape(ErrorCodes.UNAVAILABLE, String(err)); const payload = { @@ -292,8 +356,6 @@ export const chatHandlers: GatewayRequestHandlers = { runId: clientRunId, error: formatForLog(err), }); - } finally { - context.chatAbortControllers.delete(clientRunId); } }, }; diff --git a/src/gateway/server.chat.test.ts b/src/gateway/server.chat.test.ts index b4650cc51..f6d37f18b 100644 --- a/src/gateway/server.chat.test.ts +++ b/src/gateway/server.chat.test.ts @@ -17,6 +17,15 @@ import { installGatewayTestHooks(); +async function waitFor(condition: () => boolean, timeoutMs = 1500) { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (condition()) return; + await new Promise((r) => setTimeout(r, 5)); + } + throw new Error("timeout waiting for condition"); +} + describe("gateway server chat", () => { test("webchat can chat.send without a mobile node", async () => { const { server, ws } = await startServerWithClient(); @@ -45,6 +54,8 @@ describe("gateway server chat", () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); + const spy = vi.mocked(agentCommand); + const callsBefore = spy.mock.calls.length; const res = await rpcReq(ws, "chat.send", { sessionKey: "main", message: "hello", @@ -52,9 +63,8 @@ describe("gateway server chat", () => { }); expect(res.ok).toBe(true); - const call = vi.mocked(agentCommand).mock.calls.at(-1)?.[0] as - | { timeout?: string } - | undefined; + await waitFor(() => spy.mock.calls.length > callsBefore); + const call = spy.mock.calls.at(-1)?.[0] as { timeout?: string } | undefined; expect(call?.timeout).toBe("123"); ws.close(); @@ -65,6 +75,8 @@ describe("gateway server chat", () => { const { server, ws } = await startServerWithClient(); await connectOk(ws); + const spy = vi.mocked(agentCommand); + const callsBefore = spy.mock.calls.length; const res = await rpcReq(ws, "chat.send", { sessionKey: "agent:main:subagent:abc", message: "hello", @@ -72,7 +84,8 @@ describe("gateway server chat", () => { }); expect(res.ok).toBe(true); - const call = vi.mocked(agentCommand).mock.calls.at(-1)?.[0] as + await waitFor(() => spy.mock.calls.length > callsBefore); + const call = spy.mock.calls.at(-1)?.[0] as | { sessionKey?: string } | undefined; expect(call?.sessionKey).toBe("agent:main:subagent:abc"); @@ -607,6 +620,9 @@ describe("gateway server chat", () => { }), ); + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + await new Promise((resolve, reject) => { const deadline = Date.now() + 1000; const tick = () => { @@ -630,9 +646,6 @@ describe("gateway server chat", () => { const abortRes = await abortResP; expect(abortRes.ok).toBe(true); - const sendRes = await sendResP; - expect(sendRes.ok).toBe(true); - const evt = await abortedEventP; expect(evt.payload?.runId).toBe("idem-abort-1"); expect(evt.payload?.sessionKey).toBe("main"); @@ -731,6 +744,98 @@ describe("gateway server chat", () => { await server.close(); }); + test( + "chat.send treats /stop as an out-of-band abort", + { timeout: 15000 }, + async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); + testState.sessionStorePath = path.join(dir, "sessions.json"); + await fs.writeFile( + testState.sessionStorePath, + JSON.stringify( + { main: { sessionId: "sess-main", updatedAt: Date.now() } }, + null, + 2, + ), + "utf-8", + ); + + const { server, ws } = await startServerWithClient(); + await connectOk(ws); + + const spy = vi.mocked(agentCommand); + const callsBefore = spy.mock.calls.length; + spy.mockImplementationOnce(async (opts) => { + const signal = (opts as { abortSignal?: AbortSignal }).abortSignal; + await new Promise((resolve) => { + if (!signal) return resolve(); + if (signal.aborted) return resolve(); + signal.addEventListener("abort", () => resolve(), { once: true }); + }); + }); + + const sendResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-stop-1", + 8000, + ); + ws.send( + JSON.stringify({ + type: "req", + id: "send-stop-1", + method: "chat.send", + params: { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-stop-run", + }, + }), + ); + const sendRes = await sendResP; + expect(sendRes.ok).toBe(true); + + await waitFor(() => spy.mock.calls.length > callsBefore); + + const abortedEventP = onceMessage( + ws, + (o) => + o.type === "event" && + o.event === "chat" && + o.payload?.state === "aborted" && + o.payload?.runId === "idem-stop-run", + 8000, + ); + + const stopResP = onceMessage( + ws, + (o) => o.type === "res" && o.id === "send-stop-2", + 8000, + ); + ws.send( + JSON.stringify({ + type: "req", + id: "send-stop-2", + method: "chat.send", + params: { + sessionKey: "main", + message: "/stop", + idempotencyKey: "idem-stop-req", + }, + }), + ); + const stopRes = await stopResP; + expect(stopRes.ok).toBe(true); + + const evt = await abortedEventP; + expect(evt.payload?.sessionKey).toBe("main"); + + expect(spy.mock.calls.length).toBe(callsBefore + 1); + + ws.close(); + await server.close(); + }, + ); + test("chat.abort returns aborted=false for unknown runId", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-gw-")); testState.sessionStorePath = path.join(dir, "sessions.json"); @@ -876,6 +981,28 @@ describe("gateway server chat", () => { ); expect(sendRes.ok).toBe(true); + // chat.send returns before the run ends; wait until dedupe is populated + // (meaning the run completed and the abort controller was cleared). + let completed = false; + for (let i = 0; i < 50; i++) { + const again = await rpcReq<{ runId?: string; status?: string }>( + ws, + "chat.send", + { + sessionKey: "main", + message: "hello", + idempotencyKey: "idem-complete-1", + timeoutMs: 30_000, + }, + ); + if (again.ok && again.payload?.status === "ok") { + completed = true; + break; + } + await new Promise((r) => setTimeout(r, 10)); + } + expect(completed).toBe(true); + const abortRes = await rpcReq(ws, "chat.abort", { sessionKey: "main", runId: "idem-complete-1", diff --git a/ui/src/ui/app-render.ts b/ui/src/ui/app-render.ts index 888c6db7d..6013538a7 100644 --- a/ui/src/ui/app-render.ts +++ b/ui/src/ui/app-render.ts @@ -201,6 +201,7 @@ export type AppViewState = { handleWhatsAppLogout: () => Promise; handleTelegramSave: () => Promise; handleSendChat: (messageOverride?: string, opts?: { restoreDraft?: boolean }) => Promise; + handleAbortChat: () => Promise; removeQueuedMessage: (id: string) => void; resetToolStream: () => void; handleLogsScroll: (event: Event) => void; @@ -493,11 +494,13 @@ export function renderApp(state: AppViewState) { ...state.settings, useNewChatLayout: !state.settings.useNewChatLayout, }), - onDraftChange: (next) => (state.chatMessage = next), - onSend: () => state.handleSendChat(), - onQueueRemove: (id) => state.removeQueuedMessage(id), - onNewSession: () => - state.handleSendChat("/new", { restoreDraft: true }), + onDraftChange: (next) => (state.chatMessage = next), + onSend: () => state.handleSendChat(), + canAbort: Boolean(state.chatRunId), + onAbort: () => void state.handleAbortChat(), + onQueueRemove: (id) => state.removeQueuedMessage(id), + onNewSession: () => + state.handleSendChat("/new", { restoreDraft: true }), // Sidebar props for tool output viewing sidebarOpen: state.sidebarOpen, sidebarContent: state.sidebarContent, diff --git a/ui/src/ui/app.ts b/ui/src/ui/app.ts index c33da2cca..e16bc2739 100644 --- a/ui/src/ui/app.ts +++ b/ui/src/ui/app.ts @@ -52,6 +52,7 @@ import { import { loadChatHistory, sendChatMessage, + abortChatRun, handleChatEvent, type ChatEventPayload, } from "./controllers/chat"; @@ -1028,6 +1029,20 @@ export class ClawdbotApp extends LitElement { return this.chatSending || Boolean(this.chatRunId); } + private isChatStopCommand(text: string) { + const trimmed = text.trim(); + if (!trimmed) return false; + const normalized = trimmed.toLowerCase(); + return normalized === "/stop" || normalized === "stop" || normalized === "abort"; + } + + async handleAbortChat() { + if (!this.connected) return; + this.chatMessage = ""; + if (!this.chatRunId) return; + await abortChatRun(this); + } + private enqueueChatMessage(text: string) { const trimmed = text.trim(); if (!trimmed) return; @@ -1053,14 +1068,6 @@ export class ClawdbotApp extends LitElement { if (ok) { this.setLastActiveSessionKey(this.sessionKey); } - if (ok && this.chatRunId) { - // chat.send returned (run finished), but we missed the chat final event. - this.chatRunId = null; - this.chatStream = null; - this.chatStreamStartedAt = null; - this.resetToolStream(); - void loadChatHistory(this); - } if (ok && opts?.restoreDraft && opts.previousDraft?.trim()) { this.chatMessage = opts.previousDraft; } @@ -1095,6 +1102,11 @@ export class ClawdbotApp extends LitElement { const message = (messageOverride ?? this.chatMessage).trim(); if (!message) return; + if (this.isChatStopCommand(message)) { + await this.handleAbortChat(); + return; + } + if (messageOverride == null) { this.chatMessage = ""; } diff --git a/ui/src/ui/controllers/chat.ts b/ui/src/ui/controllers/chat.ts index 2e52d9d47..55c71b65f 100644 --- a/ui/src/ui/controllers/chat.ts +++ b/ui/src/ui/controllers/chat.ts @@ -92,6 +92,22 @@ export async function sendChatMessage(state: ChatState, message: string): Promis } } +export async function abortChatRun(state: ChatState): Promise { + if (!state.client || !state.connected) return false; + const runId = state.chatRunId; + if (!runId) return false; + try { + await state.client.request("chat.abort", { + sessionKey: state.sessionKey, + runId, + }); + return true; + } catch (err) { + state.lastError = String(err); + return false; + } +} + export function handleChatEvent( state: ChatState, payload?: ChatEventPayload, diff --git a/ui/src/ui/views/chat.ts b/ui/src/ui/views/chat.ts index 272e663a2..104281458 100644 --- a/ui/src/ui/views/chat.ts +++ b/ui/src/ui/views/chat.ts @@ -23,6 +23,7 @@ export type ChatProps = { thinkingLevel: string | null; loading: boolean; sending: boolean; + canAbort?: boolean; messages: unknown[]; toolMessages: unknown[]; stream: string | null; @@ -52,6 +53,7 @@ export type ChatProps = { onToggleLayout?: () => void; onDraftChange: (next: string) => void; onSend: () => void; + onAbort?: () => void; onQueueRemove: (id: string) => void; onNewSession: () => void; onOpenSidebar?: (content: string) => void; @@ -61,7 +63,7 @@ export type ChatProps = { export function renderChat(props: ChatProps) { const canCompose = props.connected; - const isBusy = props.sending || Boolean(props.stream); + const isBusy = props.sending || props.stream !== null; const activeSession = props.sessions?.sessions?.find( (row) => row.key === props.sessionKey, ); @@ -222,6 +224,17 @@ export function renderChat(props: ChatProps) { > New session + ${props.onAbort + ? html` + + ` + : nothing}