From 6fa9b70a49a2995257e43bcf6fbe63c43eda1ddd Mon Sep 17 00:00:00 2001 From: Glucksberg Date: Sun, 25 Jan 2026 19:42:06 +0000 Subject: [PATCH 1/4] fix(tts): generate audio when block streaming drops final reply When block streaming succeeds, final replies are dropped but TTS was only applied to final replies. Fix by accumulating block text during streaming and generating TTS-only audio after streaming completes. Also: - Change truncate vs skip behavior when summary OFF (now truncates) - Align TTS limits with Telegram max (4096 chars) - Improve /tts command help messages with examples - Add newline separator between accumulated blocks --- src/auto-reply/reply/commands-tts.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/auto-reply/reply/commands-tts.ts b/src/auto-reply/reply/commands-tts.ts index 04b60a4e9..bba7e2b02 100644 --- a/src/auto-reply/reply/commands-tts.ts +++ b/src/auto-reply/reply/commands-tts.ts @@ -25,11 +25,11 @@ type ParsedTtsCommand = { }; function parseTtsCommand(normalized: string): ParsedTtsCommand | null { - // Accept `/tts` and `/tts [args]` as a single control surface. - if (normalized === "/tts") return { action: "status", args: "" }; + // Accept `/tts [args]` - return null for `/tts` alone to trigger inline menu. + if (normalized === "/tts") return null; if (!normalized.startsWith("/tts ")) return null; const rest = normalized.slice(5).trim(); - if (!rest) return { action: "status", args: "" }; + if (!rest) return null; const [action, ...tail] = rest.split(/\s+/); return { action: action.toLowerCase(), args: tail.join(" ").trim() }; } From bc7ba73a34c43e704a98bf245e0abfa99a85659e Mon Sep 17 00:00:00 2001 From: Glucksberg Date: Mon, 26 Jan 2026 05:21:16 +0000 Subject: [PATCH 2/4] feat(gateway): add human-in-the-loop approval for outbound messaging (#2023) Add `approvals.message` config that requires human confirmation before executing outbound messaging tools. Follows the existing exec approval pattern with in-memory state management and Promise-based waiting. New components: - MessageApprovalManager: tracks pending approvals with timeout handling - Gateway RPC handlers: message.approval.request and message.approval.resolve - MessageApprovalForwarder: delivers approval requests to chat channels - Approval interception in runMessageAction with gatewayClient/skipApproval params Extended /approve command to handle message approvals (IDs starting with "msg-"). Config schema added under approvals.message with options for: - enabled, mode (session/targets/both), actions, channels - agentFilter, sessionFilter, targets, timeout Co-Authored-By: Claude Opus 4.5 --- src/auto-reply/reply/commands-approve.ts | 121 ++++++- src/commands/message-format.ts | 2 +- src/config/types.approvals.ts | 22 ++ src/config/zod-schema.approvals.ts | 19 +- src/gateway/message-approval-manager.ts | 84 +++++ src/gateway/protocol/index.ts | 12 + src/gateway/protocol/schema.ts | 1 + .../protocol/schema/message-approvals.ts | 26 ++ .../protocol/schema/protocol-schemas.ts | 6 + src/gateway/protocol/schema/types.ts | 6 + .../server-methods/message-approval.test.ts | 305 ++++++++++++++++++ .../server-methods/message-approval.ts | 137 ++++++++ src/gateway/server.impl.ts | 10 + src/infra/message-approval-forwarder.test.ts | 196 +++++++++++ src/infra/message-approval-forwarder.ts | 295 +++++++++++++++++ src/infra/outbound/message-action-runner.ts | 59 ++++ src/infra/outbound/message-approval-check.ts | 114 +++++++ 17 files changed, 1395 insertions(+), 20 deletions(-) create mode 100644 src/gateway/message-approval-manager.ts create mode 100644 src/gateway/protocol/schema/message-approvals.ts create mode 100644 src/gateway/server-methods/message-approval.test.ts create mode 100644 src/gateway/server-methods/message-approval.ts create mode 100644 src/infra/message-approval-forwarder.test.ts create mode 100644 src/infra/message-approval-forwarder.ts create mode 100644 src/infra/outbound/message-approval-check.ts diff --git a/src/auto-reply/reply/commands-approve.ts b/src/auto-reply/reply/commands-approve.ts index a34e4b31c..8ed3600a5 100644 --- a/src/auto-reply/reply/commands-approve.ts +++ b/src/auto-reply/reply/commands-approve.ts @@ -5,7 +5,8 @@ import type { CommandHandler } from "./commands-types.js"; const COMMAND = "/approve"; -const DECISION_ALIASES: Record = { +// Exec approval decisions +const EXEC_DECISION_ALIASES: Record = { allow: "allow-once", once: "allow-once", "allow-once": "allow-once", @@ -18,39 +19,86 @@ const DECISION_ALIASES: Record = block: "deny", }; -type ParsedApproveCommand = - | { ok: true; id: string; decision: "allow-once" | "allow-always" | "deny" } +// Message approval decisions (simpler - just allow/deny) +const MESSAGE_DECISION_ALIASES: Record = { + allow: "allow", + yes: "allow", + ok: "allow", + approve: "allow", + deny: "deny", + no: "deny", + reject: "deny", + block: "deny", +}; + +type ParsedExecApproveCommand = + | { ok: true; type: "exec"; id: string; decision: "allow-once" | "allow-always" | "deny" } | { ok: false; error: string }; +type ParsedMessageApproveCommand = + | { ok: true; type: "message"; id: string; decision: "allow" | "deny" } + | { ok: false; error: string }; + +type ParsedApproveCommand = ParsedExecApproveCommand | ParsedMessageApproveCommand; + +function isMessageApprovalId(id: string): boolean { + // Message approval IDs start with "msg-" + return id.toLowerCase().startsWith("msg-"); +} + function parseApproveCommand(raw: string): ParsedApproveCommand | null { const trimmed = raw.trim(); if (!trimmed.toLowerCase().startsWith(COMMAND)) return null; const rest = trimmed.slice(COMMAND.length).trim(); if (!rest) { - return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; + return { + ok: false, + error: + "Usage: /approve allow|deny (for message) or allow-once|allow-always|deny (for exec)", + }; } const tokens = rest.split(/\s+/).filter(Boolean); if (tokens.length < 2) { - return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; + return { + ok: false, + error: + "Usage: /approve allow|deny (for message) or allow-once|allow-always|deny (for exec)", + }; } const first = tokens[0].toLowerCase(); const second = tokens[1].toLowerCase(); - if (DECISION_ALIASES[first]) { - return { - ok: true, - decision: DECISION_ALIASES[first], - id: tokens.slice(1).join(" ").trim(), - }; + // Determine if this is a message approval based on ID prefix + const idFirst = !EXEC_DECISION_ALIASES[first] && !MESSAGE_DECISION_ALIASES[first]; + const id = idFirst ? tokens[0] : tokens.slice(1).join(" ").trim(); + const decisionToken = idFirst ? second : first; + + if (isMessageApprovalId(id)) { + // Message approval + const decision = MESSAGE_DECISION_ALIASES[decisionToken]; + if (!decision) { + return { ok: false, error: "Usage: /approve allow|deny" }; + } + return { ok: true, type: "message", id, decision }; } - if (DECISION_ALIASES[second]) { - return { - ok: true, - decision: DECISION_ALIASES[second], - id: tokens[0], - }; + + // Exec approval + const execDecision = EXEC_DECISION_ALIASES[decisionToken]; + if (execDecision) { + return { ok: true, type: "exec", id, decision: execDecision }; } + + // If the decision token matches message decisions but ID doesn't have msg- prefix, + // still try to resolve as exec with allow -> allow-once mapping + const msgDecision = MESSAGE_DECISION_ALIASES[decisionToken]; + if (msgDecision === "allow") { + return { ok: true, type: "exec", id, decision: "allow-once" }; + } + if (msgDecision === "deny") { + return { ok: true, type: "exec", id, decision: "deny" }; + } + return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; } @@ -77,6 +125,45 @@ export const handleApproveCommand: CommandHandler = async (params, allowTextComm } const resolvedBy = buildResolvedByLabel(params); + + if (parsed.type === "message") { + // Handle message approval + try { + await callGateway({ + method: "message.approval.resolve", + params: { id: parsed.id, decision: parsed.decision }, + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientDisplayName: `Chat message approval (${resolvedBy})`, + mode: GATEWAY_CLIENT_MODES.BACKEND, + }); + } catch { + // If message approval fails, try exec approval as fallback + try { + const execDecision = parsed.decision === "allow" ? "allow-once" : "deny"; + await callGateway({ + method: "exec.approval.resolve", + params: { id: parsed.id, decision: execDecision }, + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientDisplayName: `Chat approval (${resolvedBy})`, + mode: GATEWAY_CLIENT_MODES.BACKEND, + }); + } catch (execErr) { + return { + shouldContinue: false, + reply: { + text: `❌ Failed to submit approval: ${String(execErr)}`, + }, + }; + } + } + + return { + shouldContinue: false, + reply: { text: `✅ Message approval ${parsed.decision} submitted for ${parsed.id}.` }, + }; + } + + // Handle exec approval try { await callGateway({ method: "exec.approval.resolve", diff --git a/src/commands/message-format.ts b/src/commands/message-format.ts index e22baa85f..4b30d1e76 100644 --- a/src/commands/message-format.ts +++ b/src/commands/message-format.ts @@ -32,7 +32,7 @@ export type MessageCliJsonEnvelope = { action: ChannelMessageActionName; channel: ChannelId; dryRun: boolean; - handledBy: "plugin" | "core" | "dry-run"; + handledBy: "plugin" | "core" | "dry-run" | "approval-denied"; payload: unknown; }; diff --git a/src/config/types.approvals.ts b/src/config/types.approvals.ts index d86d05b8e..d84af81d7 100644 --- a/src/config/types.approvals.ts +++ b/src/config/types.approvals.ts @@ -24,6 +24,28 @@ export type ExecApprovalForwardingConfig = { targets?: ExecApprovalForwardTarget[]; }; +export type MessageApprovalForwardingMode = "session" | "targets" | "both"; + +export type MessageApprovalForwardingConfig = { + /** Enable requiring human approval for outbound messages. Default: false. */ + enabled?: boolean; + /** Delivery mode (session=origin chat, targets=config targets, both=both). Default: session. */ + mode?: MessageApprovalForwardingMode; + /** Only require approval for these actions (e.g. ["send", "broadcast"]). Omit = all actions. */ + actions?: string[]; + /** Only require approval for these channels. Omit or ["*"] = all channels. */ + channels?: string[]; + /** Only require approval for these agent IDs. Omit = all agents. */ + agentFilter?: string[]; + /** Only require approval matching these session key patterns (substring or regex). */ + sessionFilter?: string[]; + /** Explicit delivery targets (used when mode includes targets). */ + targets?: ExecApprovalForwardTarget[]; + /** Approval timeout in seconds. Default: 120. */ + timeout?: number; +}; + export type ApprovalsConfig = { exec?: ExecApprovalForwardingConfig; + message?: MessageApprovalForwardingConfig; }; diff --git a/src/config/zod-schema.approvals.ts b/src/config/zod-schema.approvals.ts index e5276d19a..84368738a 100644 --- a/src/config/zod-schema.approvals.ts +++ b/src/config/zod-schema.approvals.ts @@ -20,9 +20,24 @@ const ExecApprovalForwardingSchema = z .strict() .optional(); -export const ApprovalsSchema = z +const MessageApprovalForwardingSchema = z .object({ - exec: ExecApprovalForwardingSchema, + enabled: z.boolean().optional(), + mode: z.union([z.literal("session"), z.literal("targets"), z.literal("both")]).optional(), + actions: z.array(z.string()).optional(), + channels: z.array(z.string()).optional(), + agentFilter: z.array(z.string()).optional(), + sessionFilter: z.array(z.string()).optional(), + targets: z.array(ExecApprovalForwardTargetSchema).optional(), + timeout: z.number().optional(), + }) + .strict() + .optional(); + +export const ApprovalsSchema = z + .object({ + exec: ExecApprovalForwardingSchema, + message: MessageApprovalForwardingSchema, }) .strict() .optional(); diff --git a/src/gateway/message-approval-manager.ts b/src/gateway/message-approval-manager.ts new file mode 100644 index 000000000..fb4573e59 --- /dev/null +++ b/src/gateway/message-approval-manager.ts @@ -0,0 +1,84 @@ +import { randomUUID } from "node:crypto"; + +export type MessageApprovalDecision = "allow" | "deny"; + +export type MessageApprovalRequestPayload = { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export type MessageApprovalRecord = { + id: string; + request: MessageApprovalRequestPayload; + createdAtMs: number; + expiresAtMs: number; + resolvedAtMs?: number; + decision?: MessageApprovalDecision; + resolvedBy?: string | null; +}; + +type PendingEntry = { + record: MessageApprovalRecord; + resolve: (decision: MessageApprovalDecision | null) => void; + reject: (err: Error) => void; + timer: ReturnType; +}; + +export class MessageApprovalManager { + private pending = new Map(); + + create( + request: MessageApprovalRequestPayload, + timeoutMs: number, + id?: string | null, + ): MessageApprovalRecord { + const now = Date.now(); + const resolvedId = id && id.trim().length > 0 ? id.trim() : `msg-${randomUUID()}`; + const record: MessageApprovalRecord = { + id: resolvedId, + request, + createdAtMs: now, + expiresAtMs: now + timeoutMs, + }; + return record; + } + + async waitForDecision( + record: MessageApprovalRecord, + timeoutMs: number, + ): Promise { + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(record.id); + resolve(null); + }, timeoutMs); + this.pending.set(record.id, { record, resolve, reject, timer }); + }); + } + + resolve( + recordId: string, + decision: MessageApprovalDecision, + resolvedBy?: string | null, + ): boolean { + const pending = this.pending.get(recordId); + if (!pending) return false; + clearTimeout(pending.timer); + pending.record.resolvedAtMs = Date.now(); + pending.record.decision = decision; + pending.record.resolvedBy = resolvedBy ?? null; + this.pending.delete(recordId); + pending.resolve(decision); + return true; + } + + getSnapshot(recordId: string): MessageApprovalRecord | null { + const entry = this.pending.get(recordId); + return entry?.record ?? null; + } +} diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 6e5a862d1..332f9d8a6 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -83,6 +83,10 @@ import { ExecApprovalRequestParamsSchema, type ExecApprovalResolveParams, ExecApprovalResolveParamsSchema, + type MessageApprovalRequestParams, + MessageApprovalRequestParamsSchema, + type MessageApprovalResolveParams, + MessageApprovalResolveParamsSchema, ErrorCodes, type ErrorShape, ErrorShapeSchema, @@ -303,6 +307,12 @@ export const validateExecApprovalRequestParams = ajv.compile( ExecApprovalResolveParamsSchema, ); +export const validateMessageApprovalRequestParams = ajv.compile( + MessageApprovalRequestParamsSchema, +); +export const validateMessageApprovalResolveParams = ajv.compile( + MessageApprovalResolveParamsSchema, +); export const validateExecApprovalsNodeGetParams = ajv.compile( ExecApprovalsNodeGetParamsSchema, ); @@ -512,6 +522,8 @@ export type { ExecApprovalsGetParams, ExecApprovalsSetParams, ExecApprovalsSnapshot, + MessageApprovalRequestParams, + MessageApprovalResolveParams, LogsTailParams, LogsTailResult, PollParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 614942008..a77cf577f 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -5,6 +5,7 @@ export * from "./schema/config.js"; export * from "./schema/cron.js"; export * from "./schema/error-codes.js"; export * from "./schema/exec-approvals.js"; +export * from "./schema/message-approvals.js"; export * from "./schema/devices.js"; export * from "./schema/frames.js"; export * from "./schema/logs-chat.js"; diff --git a/src/gateway/protocol/schema/message-approvals.ts b/src/gateway/protocol/schema/message-approvals.ts new file mode 100644 index 000000000..68353c737 --- /dev/null +++ b/src/gateway/protocol/schema/message-approvals.ts @@ -0,0 +1,26 @@ +import { Type } from "@sinclair/typebox"; + +import { NonEmptyString } from "./primitives.js"; + +export const MessageApprovalRequestParamsSchema = Type.Object( + { + id: Type.Optional(NonEmptyString), + action: NonEmptyString, + channel: NonEmptyString, + to: NonEmptyString, + message: Type.Optional(Type.Union([Type.String(), Type.Null()])), + mediaUrl: Type.Optional(Type.Union([Type.String(), Type.Null()])), + agentId: Type.Optional(Type.Union([Type.String(), Type.Null()])), + sessionKey: Type.Optional(Type.Union([Type.String(), Type.Null()])), + timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })), + }, + { additionalProperties: false }, +); + +export const MessageApprovalResolveParamsSchema = Type.Object( + { + id: NonEmptyString, + decision: NonEmptyString, + }, + { additionalProperties: false }, +); diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index e92f114e2..d101f201c 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -60,6 +60,10 @@ import { ExecApprovalRequestParamsSchema, ExecApprovalResolveParamsSchema, } from "./exec-approvals.js"; +import { + MessageApprovalRequestParamsSchema, + MessageApprovalResolveParamsSchema, +} from "./message-approvals.js"; import { DevicePairApproveParamsSchema, DevicePairListParamsSchema, @@ -211,6 +215,8 @@ export const ProtocolSchemas: Record = { ExecApprovalsSnapshot: ExecApprovalsSnapshotSchema, ExecApprovalRequestParams: ExecApprovalRequestParamsSchema, ExecApprovalResolveParams: ExecApprovalResolveParamsSchema, + MessageApprovalRequestParams: MessageApprovalRequestParamsSchema, + MessageApprovalResolveParams: MessageApprovalResolveParamsSchema, DevicePairListParams: DevicePairListParamsSchema, DevicePairApproveParams: DevicePairApproveParamsSchema, DevicePairRejectParams: DevicePairRejectParamsSchema, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 696503721..b6359132a 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -58,6 +58,10 @@ import type { ExecApprovalRequestParamsSchema, ExecApprovalResolveParamsSchema, } from "./exec-approvals.js"; +import type { + MessageApprovalRequestParamsSchema, + MessageApprovalResolveParamsSchema, +} from "./message-approvals.js"; import type { DevicePairApproveParamsSchema, DevicePairListParamsSchema, @@ -200,6 +204,8 @@ export type ExecApprovalsNodeSetParams = Static; export type ExecApprovalRequestParams = Static; export type ExecApprovalResolveParams = Static; +export type MessageApprovalRequestParams = Static; +export type MessageApprovalResolveParams = Static; export type DevicePairListParams = Static; export type DevicePairApproveParams = Static; export type DevicePairRejectParams = Static; diff --git a/src/gateway/server-methods/message-approval.test.ts b/src/gateway/server-methods/message-approval.test.ts new file mode 100644 index 000000000..8b2ef98ee --- /dev/null +++ b/src/gateway/server-methods/message-approval.test.ts @@ -0,0 +1,305 @@ +import { describe, expect, it, vi } from "vitest"; +import { MessageApprovalManager } from "../message-approval-manager.js"; +import { createMessageApprovalHandlers } from "./message-approval.js"; +import { validateMessageApprovalRequestParams } from "../protocol/index.js"; + +const noop = () => {}; + +describe("message approval handlers", () => { + describe("MessageApprovalRequestParams validation", () => { + it("accepts valid request params", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Hello world", + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("accepts request with optional fields omitted", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("accepts request with null optional fields", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + message: null, + mediaUrl: null, + agentId: null, + sessionKey: null, + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("rejects request missing required fields", () => { + const params = { + action: "send", + channel: "telegram", + // missing 'to' + }; + expect(validateMessageApprovalRequestParams(params)).toBe(false); + }); + }); + + it("broadcasts request + resolve", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + + const respond = vi.fn(); + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + expect(requested).toBeTruthy(); + const id = (requested?.payload as { id?: string })?.id ?? ""; + expect(id).not.toBe(""); + expect(id.startsWith("msg-")).toBe(true); + + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + + expect(resolveRespond).toHaveBeenCalledWith(true, { ok: true }, undefined); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ id, decision: "allow" }), + undefined, + ); + expect(broadcasts.some((entry) => entry.event === "message.approval.resolved")).toBe(true); + }); + + it("accepts resolve during broadcast", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respond = vi.fn(); + const resolveRespond = vi.fn(); + + const resolveContext = { + broadcast: () => {}, + }; + + const context = { + broadcast: (event: string, payload: unknown) => { + if (event !== "message.approval.requested") return; + const id = (payload as { id?: string })?.id ?? ""; + void handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: resolveContext as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + }, + }; + + await handlers["message.approval.request"]({ + params: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + expect(resolveRespond).toHaveBeenCalledWith(true, { ok: true }, undefined); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ decision: "allow" }), + undefined, + ); + }); + + it("accepts explicit approval ids", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + + const respond = vi.fn(); + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + id: "msg-approval-123", + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + const id = (requested?.payload as { id?: string })?.id ?? ""; + expect(id).toBe("msg-approval-123"); + + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ id: "msg-approval-123", decision: "allow" }), + undefined, + ); + }); + + it("rejects duplicate approval ids", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respondA = vi.fn(); + const respondB = vi.fn(); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + id: "msg-dup-1", + action: "send", + channel: "telegram", + to: "+1234567890", + }, + respond: respondA, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + await handlers["message.approval.request"]({ + params: { + id: "msg-dup-1", + action: "send", + channel: "slack", + to: "U1234", + }, + respond: respondB, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-2", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + expect(respondB).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ message: "approval id already pending" }), + ); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + const id = (requested?.payload as { id?: string })?.id ?? ""; + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "deny" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-3", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + }); + + it("rejects invalid decision", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respond = vi.fn(); + + await handlers["message.approval.resolve"]({ + params: { id: "msg-123", decision: "allow-always" }, // invalid for message approvals + respond, + context: { broadcast: () => {} } as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + expect(respond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ message: "invalid decision" }), + ); + }); +}); diff --git a/src/gateway/server-methods/message-approval.ts b/src/gateway/server-methods/message-approval.ts new file mode 100644 index 000000000..9fc39068b --- /dev/null +++ b/src/gateway/server-methods/message-approval.ts @@ -0,0 +1,137 @@ +import type { MessageApprovalForwarder } from "../../infra/message-approval-forwarder.js"; +import type { + MessageApprovalDecision, + MessageApprovalManager, +} from "../message-approval-manager.js"; +import { + ErrorCodes, + errorShape, + formatValidationErrors, + validateMessageApprovalRequestParams, + validateMessageApprovalResolveParams, +} from "../protocol/index.js"; +import type { GatewayRequestHandlers } from "./types.js"; + +export function createMessageApprovalHandlers( + manager: MessageApprovalManager, + opts?: { forwarder?: MessageApprovalForwarder }, +): GatewayRequestHandlers { + return { + "message.approval.request": async ({ params, respond, context }) => { + if (!validateMessageApprovalRequestParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid message.approval.request params: ${formatValidationErrors( + validateMessageApprovalRequestParams.errors, + )}`, + ), + ); + return; + } + const p = params as { + id?: string; + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; + timeoutMs?: number; + }; + const timeoutMs = typeof p.timeoutMs === "number" ? p.timeoutMs : 120_000; + const explicitId = typeof p.id === "string" && p.id.trim().length > 0 ? p.id.trim() : null; + if (explicitId && manager.getSnapshot(explicitId)) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "approval id already pending"), + ); + return; + } + const request = { + action: p.action, + channel: p.channel, + to: p.to, + message: p.message ?? null, + mediaUrl: p.mediaUrl ?? null, + agentId: p.agentId ?? null, + sessionKey: p.sessionKey ?? null, + }; + const record = manager.create(request, timeoutMs, explicitId); + const decisionPromise = manager.waitForDecision(record, timeoutMs); + context.broadcast( + "message.approval.requested", + { + id: record.id, + request: record.request, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }, + { dropIfSlow: true }, + ); + void opts?.forwarder + ?.handleRequested({ + id: record.id, + request: record.request, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }) + .catch((err) => { + context.logGateway?.error?.(`message approvals: forward request failed: ${String(err)}`); + }); + const decision = await decisionPromise; + respond( + true, + { + id: record.id, + decision, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }, + undefined, + ); + }, + "message.approval.resolve": async ({ params, respond, client, context }) => { + if (!validateMessageApprovalResolveParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid message.approval.resolve params: ${formatValidationErrors( + validateMessageApprovalResolveParams.errors, + )}`, + ), + ); + return; + } + const p = params as { id: string; decision: string }; + const decision = p.decision as MessageApprovalDecision; + if (decision !== "allow" && decision !== "deny") { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "invalid decision")); + return; + } + const resolvedBy = client?.connect?.client?.displayName ?? client?.connect?.client?.id; + const ok = manager.resolve(p.id, decision, resolvedBy ?? null); + if (!ok) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unknown approval id")); + return; + } + context.broadcast( + "message.approval.resolved", + { id: p.id, decision, resolvedBy, ts: Date.now() }, + { dropIfSlow: true }, + ); + void opts?.forwarder + ?.handleResolved({ id: p.id, decision, resolvedBy, ts: Date.now() }) + .catch((err) => { + context.logGateway?.error?.(`message approvals: forward resolve failed: ${String(err)}`); + }); + respond(true, { ok: true }, undefined); + }, + }; +} diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index efa91be76..3910a1a94 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -45,6 +45,9 @@ import { startGatewayDiscovery } from "./server-discovery-runtime.js"; import { ExecApprovalManager } from "./exec-approval-manager.js"; import { createExecApprovalHandlers } from "./server-methods/exec-approval.js"; import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js"; +import { MessageApprovalManager } from "./message-approval-manager.js"; +import { createMessageApprovalHandlers } from "./server-methods/message-approval.js"; +import { createMessageApprovalForwarder } from "../infra/message-approval-forwarder.js"; import type { startBrowserControlServerIfEnabled } from "./server-browser.js"; import { createChannelManager } from "./server-channels.js"; import { createAgentEventHandler } from "./server-chat.js"; @@ -417,6 +420,12 @@ export async function startGatewayServer( forwarder: execApprovalForwarder, }); + const messageApprovalManager = new MessageApprovalManager(); + const messageApprovalForwarder = createMessageApprovalForwarder(); + const messageApprovalHandlers = createMessageApprovalHandlers(messageApprovalManager, { + forwarder: messageApprovalForwarder, + }); + const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; attachGatewayWsHandlers({ @@ -435,6 +444,7 @@ export async function startGatewayServer( extraHandlers: { ...pluginRegistry.gatewayHandlers, ...execApprovalHandlers, + ...messageApprovalHandlers, }, broadcast, context: { diff --git a/src/infra/message-approval-forwarder.test.ts b/src/infra/message-approval-forwarder.test.ts new file mode 100644 index 000000000..784a1909b --- /dev/null +++ b/src/infra/message-approval-forwarder.test.ts @@ -0,0 +1,196 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +import type { ClawdbotConfig } from "../config/config.js"; +import { createMessageApprovalForwarder } from "./message-approval-forwarder.js"; + +const baseRequest = { + id: "msg-req-1", + request: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Hello world", + agentId: "main", + sessionKey: "agent:main:main", + }, + createdAtMs: 1000, + expiresAtMs: 6000, +}; + +afterEach(() => { + vi.useRealTimers(); +}); + +describe("message approval forwarder", () => { + it("forwards to session target and resolves", async () => { + vi.useFakeTimers(); + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + await forwarder.handleResolved({ + id: baseRequest.id, + decision: "allow", + resolvedBy: "slack:U1", + ts: 2000, + }); + expect(deliver).toHaveBeenCalledTimes(2); + + await vi.runAllTimersAsync(); + expect(deliver).toHaveBeenCalledTimes(2); + }); + + it("forwards to explicit targets and expires", async () => { + vi.useFakeTimers(); + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "targets", + targets: [{ channel: "telegram", to: "123" }], + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => null, + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + await vi.runAllTimersAsync(); + expect(deliver).toHaveBeenCalledTimes(2); + }); + + it("filters by action", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + actions: ["broadcast"], // only broadcast, not send + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // action is "send" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("filters by channel", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + channels: ["slack"], // only slack, not telegram + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // channel is "telegram" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("filters by agentId", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + agentFilter: ["other-agent"], // not main + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // agentId is "main" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("includes message content in forwarded notification", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + const call = deliver.mock.calls[0][0]; + expect(call.payloads[0].text).toContain("Message approval required"); + expect(call.payloads[0].text).toContain("Hello world"); + expect(call.payloads[0].text).toContain("send"); + expect(call.payloads[0].text).toContain("telegram"); + }); + + it("truncates long messages", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + const longMessage = "x".repeat(500); + await forwarder.handleRequested({ + ...baseRequest, + request: { ...baseRequest.request, message: longMessage }, + }); + + const call = deliver.mock.calls[0][0]; + expect(call.payloads[0].text).toContain("..."); + expect(call.payloads[0].text.length).toBeLessThan(longMessage.length + 200); + }); +}); diff --git a/src/infra/message-approval-forwarder.ts b/src/infra/message-approval-forwarder.ts new file mode 100644 index 000000000..43327be10 --- /dev/null +++ b/src/infra/message-approval-forwarder.ts @@ -0,0 +1,295 @@ +import type { ClawdbotConfig } from "../config/config.js"; +import { loadConfig } from "../config/config.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import type { + ExecApprovalForwardTarget, + MessageApprovalForwardingConfig, +} from "../config/types.approvals.js"; +import type { MessageApprovalDecision } from "../gateway/message-approval-manager.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { parseAgentSessionKey } from "../routing/session-key.js"; +import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; +import { deliverOutboundPayloads } from "./outbound/deliver.js"; +import { resolveSessionDeliveryTarget } from "./outbound/targets.js"; + +const log = createSubsystemLogger("gateway/message-approvals"); + +export type MessageApprovalRequest = { + id: string; + request: { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; + }; + createdAtMs: number; + expiresAtMs: number; +}; + +export type MessageApprovalResolved = { + id: string; + decision: MessageApprovalDecision; + resolvedBy?: string | null; + ts: number; +}; + +type ForwardTarget = ExecApprovalForwardTarget & { source: "session" | "target" }; + +type PendingApproval = { + request: MessageApprovalRequest; + targets: ForwardTarget[]; + timeoutId: NodeJS.Timeout | null; +}; + +export type MessageApprovalForwarder = { + handleRequested: (request: MessageApprovalRequest) => Promise; + handleResolved: (resolved: MessageApprovalResolved) => Promise; + stop: () => void; +}; + +export type MessageApprovalForwarderDeps = { + getConfig?: () => ClawdbotConfig; + deliver?: typeof deliverOutboundPayloads; + nowMs?: () => number; + resolveSessionTarget?: (params: { + cfg: ClawdbotConfig; + request: MessageApprovalRequest; + }) => ExecApprovalForwardTarget | null; +}; + +const DEFAULT_MODE = "session" as const; + +function normalizeMode(mode?: MessageApprovalForwardingConfig["mode"]) { + return mode ?? DEFAULT_MODE; +} + +function matchSessionFilter(sessionKey: string, patterns: string[]): boolean { + return patterns.some((pattern) => { + try { + return sessionKey.includes(pattern) || new RegExp(pattern).test(sessionKey); + } catch { + return sessionKey.includes(pattern); + } + }); +} + +function shouldForward(params: { + config?: MessageApprovalForwardingConfig; + request: MessageApprovalRequest; +}): boolean { + const config = params.config; + if (!config?.enabled) return false; + if (config.actions?.length) { + if (!config.actions.includes(params.request.request.action)) return false; + } + if (config.channels?.length && !config.channels.includes("*")) { + const channel = normalizeMessageChannel(params.request.request.channel); + if (!channel || !config.channels.includes(channel)) return false; + } + if (config.agentFilter?.length) { + const agentId = + params.request.request.agentId ?? + parseAgentSessionKey(params.request.request.sessionKey)?.agentId; + if (!agentId) return false; + if (!config.agentFilter.includes(agentId)) return false; + } + if (config.sessionFilter?.length) { + const sessionKey = params.request.request.sessionKey; + if (!sessionKey) return false; + if (!matchSessionFilter(sessionKey, config.sessionFilter)) return false; + } + return true; +} + +function buildTargetKey(target: ExecApprovalForwardTarget): string { + const channel = normalizeMessageChannel(target.channel) ?? target.channel; + const accountId = target.accountId ?? ""; + const threadId = target.threadId ?? ""; + return [channel, target.to, accountId, threadId].join(":"); +} + +function truncateMessage(message: string | null | undefined, maxLen = 200): string { + if (!message) return "(empty)"; + if (message.length <= maxLen) return message; + return `${message.slice(0, maxLen)}...`; +} + +function buildRequestMessage(request: MessageApprovalRequest, nowMs: number) { + const lines: string[] = ["📬 Message approval required", `ID: ${request.id}`]; + lines.push(`Action: ${request.request.action}`); + lines.push(`Channel: ${request.request.channel}`); + lines.push(`To: ${request.request.to}`); + if (request.request.message) { + lines.push(`Message: ${truncateMessage(request.request.message)}`); + } + if (request.request.mediaUrl) lines.push(`Media: ${request.request.mediaUrl}`); + if (request.request.agentId) lines.push(`Agent: ${request.request.agentId}`); + const expiresIn = Math.max(0, Math.round((request.expiresAtMs - nowMs) / 1000)); + lines.push(`Expires in: ${expiresIn}s`); + lines.push("Reply with: /approve allow|deny"); + return lines.join("\n"); +} + +function decisionLabel(decision: MessageApprovalDecision): string { + if (decision === "allow") return "allowed"; + return "denied"; +} + +function buildResolvedMessage(resolved: MessageApprovalResolved) { + const base = `✅ Message approval ${decisionLabel(resolved.decision)}.`; + const by = resolved.resolvedBy ? ` Resolved by ${resolved.resolvedBy}.` : ""; + return `${base}${by} ID: ${resolved.id}`; +} + +function buildExpiredMessage(request: MessageApprovalRequest) { + return `⏱️ Message approval expired. ID: ${request.id}`; +} + +function defaultResolveSessionTarget(params: { + cfg: ClawdbotConfig; + request: MessageApprovalRequest; +}): ExecApprovalForwardTarget | null { + const sessionKey = params.request.request.sessionKey?.trim(); + if (!sessionKey) return null; + const parsed = parseAgentSessionKey(sessionKey); + const agentId = parsed?.agentId ?? params.request.request.agentId ?? "main"; + const storePath = resolveStorePath(params.cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + if (!entry) return null; + const target = resolveSessionDeliveryTarget({ entry, requestedChannel: "last" }); + if (!target.channel || !target.to) return null; + if (!isDeliverableMessageChannel(target.channel)) return null; + return { + channel: target.channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + }; +} + +async function deliverToTargets(params: { + cfg: ClawdbotConfig; + targets: ForwardTarget[]; + text: string; + deliver: typeof deliverOutboundPayloads; + shouldSend?: () => boolean; +}) { + const deliveries = params.targets.map(async (target) => { + if (params.shouldSend && !params.shouldSend()) return; + const channel = normalizeMessageChannel(target.channel) ?? target.channel; + if (!isDeliverableMessageChannel(channel)) return; + try { + await params.deliver({ + cfg: params.cfg, + channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + payloads: [{ text: params.text }], + }); + } catch (err) { + log.error(`message approvals: failed to deliver to ${channel}:${target.to}: ${String(err)}`); + } + }); + await Promise.allSettled(deliveries); +} + +export function createMessageApprovalForwarder( + deps: MessageApprovalForwarderDeps = {}, +): MessageApprovalForwarder { + const getConfig = deps.getConfig ?? loadConfig; + const deliver = deps.deliver ?? deliverOutboundPayloads; + const nowMs = deps.nowMs ?? Date.now; + const resolveSessionTarget = deps.resolveSessionTarget ?? defaultResolveSessionTarget; + const pending = new Map(); + + const handleRequested = async (request: MessageApprovalRequest) => { + const cfg = getConfig(); + const config = cfg.approvals?.message; + if (!shouldForward({ config, request })) return; + + const mode = normalizeMode(config?.mode); + const targets: ForwardTarget[] = []; + const seen = new Set(); + + if (mode === "session" || mode === "both") { + const sessionTarget = resolveSessionTarget({ cfg, request }); + if (sessionTarget) { + const key = buildTargetKey(sessionTarget); + if (!seen.has(key)) { + seen.add(key); + targets.push({ ...sessionTarget, source: "session" }); + } + } + } + + if (mode === "targets" || mode === "both") { + const explicitTargets = config?.targets ?? []; + for (const target of explicitTargets) { + const key = buildTargetKey(target); + if (seen.has(key)) continue; + seen.add(key); + targets.push({ ...target, source: "target" }); + } + } + + if (targets.length === 0) return; + + const expiresInMs = Math.max(0, request.expiresAtMs - nowMs()); + const timeoutId = setTimeout(() => { + void (async () => { + const entry = pending.get(request.id); + if (!entry) return; + pending.delete(request.id); + const expiredText = buildExpiredMessage(request); + await deliverToTargets({ cfg, targets: entry.targets, text: expiredText, deliver }); + })(); + }, expiresInMs); + timeoutId.unref?.(); + + const pendingEntry: PendingApproval = { request, targets, timeoutId }; + pending.set(request.id, pendingEntry); + + if (pending.get(request.id) !== pendingEntry) return; + + const text = buildRequestMessage(request, nowMs()); + await deliverToTargets({ + cfg, + targets, + text, + deliver, + shouldSend: () => pending.get(request.id) === pendingEntry, + }); + }; + + const handleResolved = async (resolved: MessageApprovalResolved) => { + const entry = pending.get(resolved.id); + if (!entry) return; + if (entry.timeoutId) clearTimeout(entry.timeoutId); + pending.delete(resolved.id); + + const cfg = getConfig(); + const text = buildResolvedMessage(resolved); + await deliverToTargets({ cfg, targets: entry.targets, text, deliver }); + }; + + const stop = () => { + for (const entry of pending.values()) { + if (entry.timeoutId) clearTimeout(entry.timeoutId); + } + pending.clear(); + }; + + return { handleRequested, handleResolved, stop }; +} + +export function shouldForwardMessageApproval(params: { + config?: MessageApprovalForwardingConfig; + request: MessageApprovalRequest; +}): boolean { + return shouldForward(params); +} diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 98beaa828..aa18ab90a 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -43,6 +43,8 @@ import { resolveChannelTarget, type ResolvedMessagingTarget } from "./target-res import { loadWebMedia } from "../../web/media.js"; import { extensionForMime } from "../../media/mime.js"; import { parseSlackTarget } from "../../slack/targets.js"; +import type { GatewayClient } from "../../gateway/client.js"; +import { requestMessageApproval, shouldRequireMessageApproval } from "./message-approval-check.js"; export type MessageActionRunnerGateway = { url?: string; @@ -60,11 +62,15 @@ export type RunMessageActionParams = { defaultAccountId?: string; toolContext?: ChannelThreadingToolContext; gateway?: MessageActionRunnerGateway; + /** Connected gateway client for RPC calls (e.g., message approval requests). */ + gatewayClient?: GatewayClient; deps?: OutboundSendDeps; sessionKey?: string; agentId?: string; dryRun?: boolean; abortSignal?: AbortSignal; + /** Skip message approval check (used for internal/already-approved calls). */ + skipApproval?: boolean; }; export type MessageActionRunResult = @@ -114,6 +120,14 @@ export type MessageActionRunResult = payload: unknown; toolResult?: AgentToolResult; dryRun: boolean; + } + | { + kind: "approval-denied"; + channel: ChannelId; + action: ChannelMessageActionName; + handledBy: "approval-denied"; + payload: { denied: true; reason: "timeout" | "user-denied" | "error"; error?: string }; + dryRun: false; }; export function getToolResult( @@ -950,6 +964,51 @@ export async function runMessageAction( const gateway = resolveGateway(input); + // Check if message approval is required + if (!input.skipApproval && input.gatewayClient) { + const targetTo = resolvedTarget?.to ?? readStringParam(params, "to") ?? ""; + if ( + shouldRequireMessageApproval({ + cfg, + action, + channel, + agentId: resolvedAgentId, + sessionKey: input.sessionKey, + }) + ) { + const message = readStringParam(params, "message") ?? readStringParam(params, "text") ?? null; + const mediaUrl = + readStringParam(params, "mediaUrl") ?? readStringParam(params, "url") ?? null; + const approvalResult = await requestMessageApproval({ + cfg, + gateway: input.gatewayClient, + action, + channel, + to: targetTo, + message, + mediaUrl, + agentId: resolvedAgentId, + sessionKey: input.sessionKey, + }); + if (approvalResult.decision !== "allow") { + // Distinguish between timeout (null decision, no error), denial (deny decision), and error (null decision with error) + const reason = approvalResult.error + ? "error" + : approvalResult.decision === null + ? "timeout" + : "user-denied"; + return { + kind: "approval-denied", + channel: channel as ChannelId, + action, + handledBy: "approval-denied", + payload: { denied: true, reason, error: approvalResult.error }, + dryRun: false, + }; + } + } + } + if (action === "send") { return handleSendAction({ cfg, diff --git a/src/infra/outbound/message-approval-check.ts b/src/infra/outbound/message-approval-check.ts new file mode 100644 index 000000000..c7fed971c --- /dev/null +++ b/src/infra/outbound/message-approval-check.ts @@ -0,0 +1,114 @@ +import type { ClawdbotConfig } from "../../config/config.js"; +import type { MessageApprovalForwardingConfig } from "../../config/types.approvals.js"; +import type { GatewayClient } from "../../gateway/client.js"; +import type { MessageApprovalDecision } from "../../gateway/message-approval-manager.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { parseAgentSessionKey } from "../../routing/session-key.js"; +import { normalizeMessageChannel } from "../../utils/message-channel.js"; + +const log = createSubsystemLogger("message-approval-check"); + +export type MessageApprovalCheckParams = { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export function resolveMessageApprovalConfig( + cfg: ClawdbotConfig, +): MessageApprovalForwardingConfig | undefined { + return cfg.approvals?.message; +} + +export function shouldRequireMessageApproval(params: { + cfg: ClawdbotConfig; + action: string; + channel: string; + agentId?: string | null; + sessionKey?: string | null; +}): boolean { + const config = resolveMessageApprovalConfig(params.cfg); + if (!config?.enabled) return false; + + if (config.actions?.length) { + if (!config.actions.includes(params.action)) return false; + } + + if (config.channels?.length && !config.channels.includes("*")) { + const channel = normalizeMessageChannel(params.channel); + if (!channel || !config.channels.includes(channel)) return false; + } + + if (config.agentFilter?.length) { + const agentId = params.agentId ?? parseAgentSessionKey(params.sessionKey)?.agentId; + if (!agentId) return false; + if (!config.agentFilter.includes(agentId)) return false; + } + + if (config.sessionFilter?.length) { + const sessionKey = params.sessionKey; + if (!sessionKey) return false; + const matched = config.sessionFilter.some((pattern) => { + try { + return sessionKey.includes(pattern) || new RegExp(pattern).test(sessionKey); + } catch { + return sessionKey.includes(pattern); + } + }); + if (!matched) return false; + } + + return true; +} + +export type RequestMessageApprovalParams = { + cfg: ClawdbotConfig; + gateway: GatewayClient; + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export type RequestMessageApprovalResult = { + decision: MessageApprovalDecision | null; + id: string; + /** Error message if the request failed (distinct from timeout which has null decision but no error). */ + error?: string; +}; + +export async function requestMessageApproval( + params: RequestMessageApprovalParams, +): Promise { + const config = resolveMessageApprovalConfig(params.cfg); + const timeoutMs = (config?.timeout ?? 120) * 1000; + + try { + const result = await params.gateway.request<{ + id: string; + decision: MessageApprovalDecision | null; + }>("message.approval.request", { + action: params.action, + channel: params.channel, + to: params.to, + message: params.message ?? null, + mediaUrl: params.mediaUrl ?? null, + agentId: params.agentId ?? null, + sessionKey: params.sessionKey ?? null, + timeoutMs, + }); + + return { decision: result.decision, id: result.id }; + } catch (err) { + const errorMsg = String(err); + log.error(`message approval request error: ${errorMsg}`); + return { decision: null, id: "", error: errorMsg }; + } +} From cd0aea0f8c69e5d606b99e982876d325603bcc04 Mon Sep 17 00:00:00 2001 From: Glucksberg Date: Mon, 26 Jan 2026 05:25:43 +0000 Subject: [PATCH 3/4] fix(webchat): set command flag on slash command responses (#2030) Webchat slash commands (/status, /help, /new) were not working because command responses were missing the message.command flag. This caused the Control UI to not recognize them as command responses. - Always set message.command=true when agent wasn't started - Fix missing agentCommand import in E2E test - Add webchat commands unit tests --- src/gateway/server-methods/chat.ts | 6 +- ...erver.chat.gateway-server-chat.e2e.test.ts | 1 + .../server.chat.webchat-commands.test.ts | 123 ++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 src/gateway/server.chat.webchat-commands.test.ts diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 9010a6f21..edbc487ef 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -506,12 +506,13 @@ export const chatHandlers: GatewayRequestHandlers = { }) .then(() => { if (!agentRunStarted) { + // No agent was started, meaning this was handled as a command const combinedReply = finalReplyParts .map((part) => part.trim()) .filter(Boolean) .join("\n\n") .trim(); - let message: Record | undefined; + let message: Record = { command: true }; if (combinedReply) { const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry( p.sessionKey, @@ -525,7 +526,7 @@ export const chatHandlers: GatewayRequestHandlers = { createIfMissing: true, }); if (appended.ok) { - message = appended.message; + message = { ...appended.message, command: true }; } else { context.logGateway.warn( `webchat transcript append failed: ${appended.error ?? "unknown error"}`, @@ -537,6 +538,7 @@ export const chatHandlers: GatewayRequestHandlers = { timestamp: now, stopReason: "injected", usage: { input: 0, output: 0, totalTokens: 0 }, + command: true, }; } } diff --git a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts index 0cc9cee93..7c44475c1 100644 --- a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts @@ -6,6 +6,7 @@ import { WebSocket } from "ws"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; import { + agentCommand, connectOk, getReplyFromConfig, installGatewayTestHooks, diff --git a/src/gateway/server.chat.webchat-commands.test.ts b/src/gateway/server.chat.webchat-commands.test.ts new file mode 100644 index 000000000..94147cf54 --- /dev/null +++ b/src/gateway/server.chat.webchat-commands.test.ts @@ -0,0 +1,123 @@ +import { describe, expect, it, vi, beforeEach } from "vitest"; + +import type { MsgContext } from "../auto-reply/templating.js"; +import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; +import { buildCommandContext, handleCommands } from "../auto-reply/reply/commands.js"; +import { parseInlineDirectives } from "../auto-reply/reply/directive-handling.js"; +import type { ClawdbotConfig } from "../config/config.js"; + +// Test that webchat commands work with CommandAuthorized: true +describe("webchat slash commands", () => { + const workspaceDir = "/tmp/clawdbot-test"; + + function buildWebchatParams(commandBody: string, cfg: ClawdbotConfig) { + const ctx = { + Body: commandBody, + BodyForAgent: commandBody, + BodyForCommands: commandBody, + RawBody: commandBody, + CommandBody: commandBody, + SessionKey: "agent:main:webchat:test", + Provider: INTERNAL_MESSAGE_CHANNEL, + Surface: INTERNAL_MESSAGE_CHANNEL, + OriginatingChannel: INTERNAL_MESSAGE_CHANNEL, + ChatType: "direct", + CommandAuthorized: true, + CommandSource: undefined, + } as MsgContext; + + const command = buildCommandContext({ + ctx, + cfg, + isGroup: false, + triggerBodyNormalized: commandBody.trim(), + commandAuthorized: true, + }); + + return { + ctx, + cfg, + command, + directives: parseInlineDirectives(commandBody), + elevated: { enabled: true, allowed: true, failures: [] }, + sessionKey: "agent:main:webchat:test", + workspaceDir, + defaultGroupActivation: () => "mention" as const, + resolvedVerboseLevel: "off" as const, + resolvedReasoningLevel: "off" as const, + resolveDefaultThinkingLevel: async () => undefined, + provider: "webchat", + model: "test-model", + contextTokens: 0, + isGroup: false, + }; + } + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("/status returns a reply and does not continue to agent", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + console.log("Command context:", { + commandBodyNormalized: params.command.commandBodyNormalized, + isAuthorizedSender: params.command.isAuthorizedSender, + surface: params.command.surface, + }); + + const result = await handleCommands(params); + + console.log("Result:", { + shouldContinue: result.shouldContinue, + hasReply: Boolean(result.reply), + replyPreview: result.reply?.text?.slice(0, 100), + }); + + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + expect(result.reply?.text).toContain("Clawdbot"); + }); + + it("/help returns a reply and does not continue to agent", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/help", cfg); + + const result = await handleCommands(params); + + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + expect(result.reply?.text).toContain("Help"); + }); + + it("/new continues to agent (session reset)", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/new", cfg); + + const result = await handleCommands(params); + + // /new triggers session reset but continues to agent for greeting + expect(result.shouldContinue).toBe(true); + }); + + it("commands work with commands.text: false (webchat is not native)", async () => { + const cfg = { commands: { text: false } } as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + const result = await handleCommands(params); + + // Even with commands.text: false, webchat should still handle commands + // because webchat doesn't have native command support + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + }); + + it("verifies isAuthorizedSender is true for webchat", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + expect(params.command.isAuthorizedSender).toBe(true); + expect(params.command.surface).toBe(INTERNAL_MESSAGE_CHANNEL); + }); +}); From 182dcd4c8eff8bca60a717a09d453c3a15b7585e Mon Sep 17 00:00:00 2001 From: Glucksberg Date: Mon, 26 Jan 2026 23:47:39 +0000 Subject: [PATCH 4/4] fix(ci): regenerate Swift protocol models and fix test expectations - Regenerate GatewayModels.swift with new MessageApproval types - Update commands-registry tests to match new ResolvedCommandArgChoice format (choices now return {label, value} objects instead of plain strings) Co-Authored-By: Claude Opus 4.5 --- .../OpenClawProtocol/GatewayModels.swift | 62 +++++++++++++++++++ .../OpenClawProtocol/GatewayModels.swift | 62 +++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index 9d2ca5ed4..d4aaff54b 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -2068,6 +2068,68 @@ public struct ExecApprovalResolveParams: Codable, Sendable { } } +public struct MessageApprovalRequestParams: Codable, Sendable { + public let id: String? + public let action: String + public let channel: String + public let to: String + public let message: AnyCodable? + public let mediaurl: AnyCodable? + public let agentid: AnyCodable? + public let sessionkey: AnyCodable? + public let timeoutms: Int? + + public init( + id: String?, + action: String, + channel: String, + to: String, + message: AnyCodable?, + mediaurl: AnyCodable?, + agentid: AnyCodable?, + sessionkey: AnyCodable?, + timeoutms: Int? + ) { + self.id = id + self.action = action + self.channel = channel + self.to = to + self.message = message + self.mediaurl = mediaurl + self.agentid = agentid + self.sessionkey = sessionkey + self.timeoutms = timeoutms + } + private enum CodingKeys: String, CodingKey { + case id + case action + case channel + case to + case message + case mediaurl = "mediaUrl" + case agentid = "agentId" + case sessionkey = "sessionKey" + case timeoutms = "timeoutMs" + } +} + +public struct MessageApprovalResolveParams: Codable, Sendable { + public let id: String + public let decision: String + + public init( + id: String, + decision: String + ) { + self.id = id + self.decision = decision + } + private enum CodingKeys: String, CodingKey { + case id + case decision + } +} + public struct DevicePairListParams: Codable, Sendable { } diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 9d2ca5ed4..d4aaff54b 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -2068,6 +2068,68 @@ public struct ExecApprovalResolveParams: Codable, Sendable { } } +public struct MessageApprovalRequestParams: Codable, Sendable { + public let id: String? + public let action: String + public let channel: String + public let to: String + public let message: AnyCodable? + public let mediaurl: AnyCodable? + public let agentid: AnyCodable? + public let sessionkey: AnyCodable? + public let timeoutms: Int? + + public init( + id: String?, + action: String, + channel: String, + to: String, + message: AnyCodable?, + mediaurl: AnyCodable?, + agentid: AnyCodable?, + sessionkey: AnyCodable?, + timeoutms: Int? + ) { + self.id = id + self.action = action + self.channel = channel + self.to = to + self.message = message + self.mediaurl = mediaurl + self.agentid = agentid + self.sessionkey = sessionkey + self.timeoutms = timeoutms + } + private enum CodingKeys: String, CodingKey { + case id + case action + case channel + case to + case message + case mediaurl = "mediaUrl" + case agentid = "agentId" + case sessionkey = "sessionKey" + case timeoutms = "timeoutMs" + } +} + +public struct MessageApprovalResolveParams: Codable, Sendable { + public let id: String + public let decision: String + + public init( + id: String, + decision: String + ) { + self.id = id + self.decision = decision + } + private enum CodingKeys: String, CodingKey { + case id + case decision + } +} + public struct DevicePairListParams: Codable, Sendable { }