diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b12624c4..430c7febf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ ### Fixes - Messages: make `/stop` clear queued followups and pending session lane work for a hard abort. +- Messages: make `/stop` abort active sub-agent runs spawned from the requester session and report how many were stopped. - WhatsApp: default response prefix only for self-chat, using identity name when set. - Signal/iMessage: bound transport readiness waits to 30s with periodic logging. (#1014) — thanks @Szpadel. - Auth: merge main auth profiles into per-agent stores for sub-agents and document inheritance. (#1013) — thanks @marcmarg. diff --git a/docs/concepts/session.md b/docs/concepts/session.md index b90493267..0fe0b2414 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -106,7 +106,7 @@ Send these as standalone messages so they register. - `clawdbot gateway call sessions.list --params '{}'` — fetch sessions from the running gateway (use `--url`/`--token` for remote gateway access). - Send `/status` as a standalone message in chat to see whether the agent is reachable, how much of the session context is used, current thinking/verbose toggles, and when your WhatsApp web creds were last refreshed (helps spot relink needs). - Send `/context list` or `/context detail` to see what’s in the system prompt and injected workspace files (and the biggest context contributors). -- Send `/stop` as a standalone message to abort the current run and clear queued followups for that session. +- Send `/stop` as a standalone message to abort the current run, clear queued followups for that session, and stop any sub-agent runs spawned from it (the reply includes the stopped count). - Send `/compact` (optional instructions) as a standalone message to summarize older context and free up window space. See [/concepts/compaction](/concepts/compaction). - JSONL transcripts can be opened directly to review full turns. diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index 2ac3b57d4..07c33cb44 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -108,6 +108,11 @@ Sub-agents use a dedicated in-process queue lane: - Lane name: `subagent` - Concurrency: `agents.defaults.subagents.maxConcurrent` (default `1`) +## Stopping + +- Sending `/stop` in the requester chat aborts the requester session and stops any active sub-agent runs spawned from it. +- The `/stop` reply includes how many sub-agent runs were stopped. + ## Limitations - Sub-agent announce is **best-effort**. If the gateway restarts, pending “announce back” work is lost. diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 73c4417d5..3dd821bef 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -354,6 +354,12 @@ export function releaseSubagentRun(runId: string) { if (subagentRuns.size === 0) stopSweeper(); } +export function listSubagentRunsForRequester(requesterSessionKey: string): SubagentRunRecord[] { + const key = requesterSessionKey.trim(); + if (!key) return []; + return [...subagentRuns.values()].filter((entry) => entry.requesterSessionKey === key); +} + export function initSubagentRegistry() { restoreSubagentRunsOnce(); } diff --git a/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts b/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts index fd43f3a0c..6403627d5 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.test.ts @@ -222,7 +222,7 @@ describe("trigger handling", () => { makeCfg(home), ); const text = Array.isArray(res) ? res[0]?.text : res?.text; - expect(text).toBe("⚙️ Agent was aborted."); + expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents."); expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); }); }); @@ -238,7 +238,7 @@ describe("trigger handling", () => { makeCfg(home), ); const text = Array.isArray(res) ? res[0]?.text : res?.text; - expect(text).toBe("⚙️ Agent was aborted."); + expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents."); expect(runEmbeddedPiAgent).not.toHaveBeenCalled(); }); }); diff --git a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts index be6393c25..5768fad42 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.test.ts @@ -159,7 +159,7 @@ describe("trigger handling", () => { ); const text = Array.isArray(res) ? res[0]?.text : res?.text; - expect(text).toBe("⚙️ Agent was aborted."); + expect(text).toBe("⚙️ Agent was aborted. Stopped 0 sub-agents."); expect(vi.mocked(abortEmbeddedPiRun)).toHaveBeenCalledWith(targetSessionId); const store = loadSessionStore(cfg.session.store); expect(store[targetSessionKey]?.abortedLastRun).toBe(true); diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts index 3bd21633b..31d8efae5 100644 --- a/src/auto-reply/reply/abort.test.ts +++ b/src/auto-reply/reply/abort.test.ts @@ -18,6 +18,14 @@ const commandQueueMocks = vi.hoisted(() => ({ vi.mock("../../process/command-queue.js", () => commandQueueMocks); +const subagentRegistryMocks = vi.hoisted(() => ({ + listSubagentRunsForRequester: vi.fn(() => []), +})); + +vi.mock("../../agents/subagent-registry.js", () => ({ + listSubagentRunsForRequester: subagentRegistryMocks.listSubagentRunsForRequester, +})); + describe("abort detection", () => { it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-")); @@ -137,4 +145,59 @@ describe("abort detection", () => { expect(getFollowupQueueDepth(sessionKey)).toBe(0); expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); }); + + it("fast-abort stops active subagent runs for requester session", async () => { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-abort-")); + const storePath = path.join(root, "sessions.json"); + const cfg = { session: { store: storePath } } as ClawdbotConfig; + const sessionKey = "telegram:parent"; + const childKey = "agent:main:subagent:child-1"; + const sessionId = "session-parent"; + const childSessionId = "session-child"; + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId, + updatedAt: Date.now(), + }, + [childKey]: { + sessionId: childSessionId, + updatedAt: Date.now(), + }, + }, + null, + 2, + ), + ); + + subagentRegistryMocks.listSubagentRunsForRequester.mockReturnValueOnce([ + { + runId: "run-1", + childSessionKey: childKey, + requesterSessionKey: sessionKey, + requesterDisplayKey: "telegram:parent", + task: "do work", + cleanup: "keep", + createdAt: Date.now(), + }, + ]); + + const result = await tryFastAbortFromMessage({ + ctx: { + CommandBody: "/stop", + RawBody: "/stop", + SessionKey: sessionKey, + Provider: "telegram", + Surface: "telegram", + From: "telegram:parent", + To: "telegram:parent", + }, + cfg, + }); + + expect(result.stoppedSubagents).toBe(1); + expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${childKey}`); + }); }); diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index da60cdc9f..65f221ba9 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -1,5 +1,6 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js"; +import { listSubagentRunsForRequester } from "../../agents/subagent-registry.js"; import type { ClawdbotConfig } from "../../config/config.js"; import { loadSessionStore, @@ -14,6 +15,7 @@ import type { MsgContext } from "../templating.js"; import { logVerbose } from "../../globals.js"; import { stripMentions, stripStructuralPrefixes } from "./mentions.js"; import { clearSessionQueues } from "./queue.js"; +import { resolveInternalSessionKey, resolveMainSessionAlias } from "../../agents/tools/sessions-helpers.js"; const ABORT_TRIGGERS = new Set(["stop", "esc", "abort", "wait", "exit", "interrupt"]); const ABORT_MEMORY = new Map(); @@ -32,6 +34,14 @@ export function setAbortMemory(key: string, value: boolean): void { ABORT_MEMORY.set(key, value); } +export function formatAbortReplyText(stoppedSubagents?: number): string { + if (typeof stoppedSubagents !== "number") { + return "⚙️ Agent was aborted."; + } + const label = stoppedSubagents === 1 ? "sub-agent" : "sub-agents"; + return `⚙️ Agent was aborted. Stopped ${stoppedSubagents} ${label}.`; +} + function resolveSessionEntryForKey( store: Record | undefined, sessionKey: string | undefined, @@ -54,10 +64,62 @@ function resolveAbortTargetKey(ctx: MsgContext): string | undefined { return sessionKey || undefined; } +function normalizeRequesterSessionKey( + cfg: ClawdbotConfig, + key: string | undefined, +): string | undefined { + const cleaned = key?.trim(); + if (!cleaned) return undefined; + const { mainKey, alias } = resolveMainSessionAlias(cfg); + return resolveInternalSessionKey({ key: cleaned, alias, mainKey }); +} + +export function stopSubagentsForRequester(params: { + cfg: ClawdbotConfig; + requesterSessionKey?: string; +}): { stopped: number } { + const requesterKey = normalizeRequesterSessionKey(params.cfg, params.requesterSessionKey); + if (!requesterKey) return { stopped: 0 }; + const runs = listSubagentRunsForRequester(requesterKey); + if (runs.length === 0) return { stopped: 0 }; + + const storeCache = new Map>(); + const seenChildKeys = new Set(); + let stopped = 0; + + for (const run of runs) { + if (run.endedAt) continue; + const childKey = run.childSessionKey?.trim(); + if (!childKey || seenChildKeys.has(childKey)) continue; + seenChildKeys.add(childKey); + + const cleared = clearSessionQueues([childKey]); + const parsed = parseAgentSessionKey(childKey); + const storePath = resolveStorePath(params.cfg.session?.store, { agentId: parsed?.agentId }); + let store = storeCache.get(storePath); + if (!store) { + store = loadSessionStore(storePath); + storeCache.set(storePath, store); + } + const entry = store[childKey]; + const sessionId = entry?.sessionId; + const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false; + + if (aborted || cleared.followupCleared > 0 || cleared.laneCleared > 0) { + stopped += 1; + } + } + + if (stopped > 0) { + logVerbose(`abort: stopped ${stopped} subagent run(s) for ${requesterKey}`); + } + return { stopped }; +} + export async function tryFastAbortFromMessage(params: { ctx: MsgContext; cfg: ClawdbotConfig; -}): Promise<{ handled: boolean; aborted: boolean }> { +}): Promise<{ handled: boolean; aborted: boolean; stoppedSubagents?: number }> { const { ctx, cfg } = params; const commandAuthorized = ctx.CommandAuthorized ?? true; const auth = resolveCommandAuthorization({ @@ -81,6 +143,7 @@ export async function tryFastAbortFromMessage(params: { if (!abortRequested) return { handled: false, aborted: false }; const abortKey = targetKey ?? auth.from ?? auth.to; + const requesterSessionKey = targetKey ?? ctx.SessionKey ?? abortKey; if (targetKey) { const storePath = resolveStorePath(cfg.session?.store, { agentId }); @@ -108,11 +171,13 @@ export async function tryFastAbortFromMessage(params: { } else if (abortKey) { setAbortMemory(abortKey, true); } - return { handled: true, aborted }; + const { stopped } = stopSubagentsForRequester({ cfg, requesterSessionKey }); + return { handled: true, aborted, stoppedSubagents: stopped }; } if (abortKey) { setAbortMemory(abortKey, true); } - return { handled: true, aborted: false }; + const { stopped } = stopSubagentsForRequester({ cfg, requesterSessionKey }); + return { handled: true, aborted: false, stoppedSubagents: stopped }; } diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index 2391e512d..60b201214 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -6,7 +6,7 @@ import { scheduleGatewaySigusr1Restart, triggerClawdbotRestart } from "../../inf import { parseAgentSessionKey } from "../../routing/session-key.js"; import { parseActivationCommand } from "../group-activation.js"; import { parseSendPolicyCommand } from "../send-policy.js"; -import { isAbortTrigger, setAbortMemory } from "./abort.js"; +import { formatAbortReplyText, isAbortTrigger, setAbortMemory, stopSubagentsForRequester } from "./abort.js"; import { clearSessionQueues } from "./queue.js"; import type { CommandHandler } from "./commands-types.js"; @@ -208,7 +208,14 @@ export const handleStopCommand: CommandHandler = async (params, allowTextCommand } else if (params.command.abortKey) { setAbortMemory(params.command.abortKey, true); } - return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; + const { stopped } = stopSubagentsForRequester({ + cfg: params.cfg, + requesterSessionKey: abortTarget.key ?? params.sessionKey, + }); + return { + shouldContinue: false, + reply: { text: formatAbortReplyText(stopped) }, + }; }; export const handleAbortTrigger: CommandHandler = async (params, allowTextCommands) => { @@ -241,5 +248,12 @@ export const handleAbortTrigger: CommandHandler = async (params, allowTextComman } else if (params.command.abortKey) { setAbortMemory(params.command.abortKey, true); } - return { shouldContinue: false, reply: { text: "⚙️ Agent was aborted." } }; + const { stopped } = stopSubagentsForRequester({ + cfg: params.cfg, + requesterSessionKey: abortTarget.key ?? params.sessionKey, + }); + return { + shouldContinue: false, + reply: { text: formatAbortReplyText(stopped) }, + }; }; diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index 96cd2942d..bc8d048e5 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -24,6 +24,11 @@ vi.mock("./route-reply.js", () => ({ vi.mock("./abort.js", () => ({ tryFastAbortFromMessage: mocks.tryFastAbortFromMessage, + formatAbortReplyText: (stoppedSubagents?: number) => { + if (typeof stoppedSubagents !== "number") return "⚙️ Agent was aborted."; + const label = stoppedSubagents === 1 ? "sub-agent" : "sub-agents"; + return `⚙️ Agent was aborted. Stopped ${stoppedSubagents} ${label}.`; + }, })); const { dispatchReplyFromConfig } = await import("./dispatch-from-config.js"); @@ -123,6 +128,31 @@ describe("dispatchReplyFromConfig", () => { }); }); + it("fast-abort reply includes stopped subagent count when provided", async () => { + mocks.tryFastAbortFromMessage.mockResolvedValue({ + handled: true, + aborted: true, + stoppedSubagents: 2, + }); + const cfg = {} as ClawdbotConfig; + const dispatcher = createDispatcher(); + const ctx: MsgContext = { + Provider: "telegram", + Body: "/stop", + }; + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher, + replyResolver: vi.fn(async () => ({ text: "hi" }) as ReplyPayload), + }); + + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ + text: "⚙️ Agent was aborted. Stopped 2 sub-agents.", + }); + }); + it("deduplicates inbound messages by MessageSid and origin", async () => { mocks.tryFastAbortFromMessage.mockResolvedValue({ handled: false, diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 7b81d9c94..c7a766fed 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -3,7 +3,7 @@ import { logVerbose } from "../../globals.js"; import { getReplyFromConfig } from "../reply.js"; import type { MsgContext } from "../templating.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; -import { tryFastAbortFromMessage } from "./abort.js"; +import { formatAbortReplyText, tryFastAbortFromMessage } from "./abort.js"; import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; import type { ReplyDispatcher, ReplyDispatchKind } from "./reply-dispatcher.js"; import { isRoutableChannel, routeReply } from "./route-reply.js"; @@ -70,7 +70,9 @@ export async function dispatchReplyFromConfig(params: { const fastAbort = await tryFastAbortFromMessage({ ctx, cfg }); if (fastAbort.handled) { - const payload = { text: "⚙️ Agent was aborted." } satisfies ReplyPayload; + const payload = { + text: formatAbortReplyText(fastAbort.stoppedSubagents), + } satisfies ReplyPayload; let queuedFinal = false; let routedFinalCount = 0; if (shouldRouteToOriginating && originatingChannel && originatingTo) {