diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index 9d2ca5ed4..d4aaff54b 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -2068,6 +2068,68 @@ public struct ExecApprovalResolveParams: Codable, Sendable { } } +public struct MessageApprovalRequestParams: Codable, Sendable { + public let id: String? + public let action: String + public let channel: String + public let to: String + public let message: AnyCodable? + public let mediaurl: AnyCodable? + public let agentid: AnyCodable? + public let sessionkey: AnyCodable? + public let timeoutms: Int? + + public init( + id: String?, + action: String, + channel: String, + to: String, + message: AnyCodable?, + mediaurl: AnyCodable?, + agentid: AnyCodable?, + sessionkey: AnyCodable?, + timeoutms: Int? + ) { + self.id = id + self.action = action + self.channel = channel + self.to = to + self.message = message + self.mediaurl = mediaurl + self.agentid = agentid + self.sessionkey = sessionkey + self.timeoutms = timeoutms + } + private enum CodingKeys: String, CodingKey { + case id + case action + case channel + case to + case message + case mediaurl = "mediaUrl" + case agentid = "agentId" + case sessionkey = "sessionKey" + case timeoutms = "timeoutMs" + } +} + +public struct MessageApprovalResolveParams: Codable, Sendable { + public let id: String + public let decision: String + + public init( + id: String, + decision: String + ) { + self.id = id + self.decision = decision + } + private enum CodingKeys: String, CodingKey { + case id + case decision + } +} + public struct DevicePairListParams: Codable, Sendable { } diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 9d2ca5ed4..d4aaff54b 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -2068,6 +2068,68 @@ public struct ExecApprovalResolveParams: Codable, Sendable { } } +public struct MessageApprovalRequestParams: Codable, Sendable { + public let id: String? + public let action: String + public let channel: String + public let to: String + public let message: AnyCodable? + public let mediaurl: AnyCodable? + public let agentid: AnyCodable? + public let sessionkey: AnyCodable? + public let timeoutms: Int? + + public init( + id: String?, + action: String, + channel: String, + to: String, + message: AnyCodable?, + mediaurl: AnyCodable?, + agentid: AnyCodable?, + sessionkey: AnyCodable?, + timeoutms: Int? + ) { + self.id = id + self.action = action + self.channel = channel + self.to = to + self.message = message + self.mediaurl = mediaurl + self.agentid = agentid + self.sessionkey = sessionkey + self.timeoutms = timeoutms + } + private enum CodingKeys: String, CodingKey { + case id + case action + case channel + case to + case message + case mediaurl = "mediaUrl" + case agentid = "agentId" + case sessionkey = "sessionKey" + case timeoutms = "timeoutMs" + } +} + +public struct MessageApprovalResolveParams: Codable, Sendable { + public let id: String + public let decision: String + + public init( + id: String, + decision: String + ) { + self.id = id + self.decision = decision + } + private enum CodingKeys: String, CodingKey { + case id + case decision + } +} + public struct DevicePairListParams: Codable, Sendable { } diff --git a/src/auto-reply/reply/commands-approve.ts b/src/auto-reply/reply/commands-approve.ts index a34e4b31c..8ed3600a5 100644 --- a/src/auto-reply/reply/commands-approve.ts +++ b/src/auto-reply/reply/commands-approve.ts @@ -5,7 +5,8 @@ import type { CommandHandler } from "./commands-types.js"; const COMMAND = "/approve"; -const DECISION_ALIASES: Record = { +// Exec approval decisions +const EXEC_DECISION_ALIASES: Record = { allow: "allow-once", once: "allow-once", "allow-once": "allow-once", @@ -18,39 +19,86 @@ const DECISION_ALIASES: Record = block: "deny", }; -type ParsedApproveCommand = - | { ok: true; id: string; decision: "allow-once" | "allow-always" | "deny" } +// Message approval decisions (simpler - just allow/deny) +const MESSAGE_DECISION_ALIASES: Record = { + allow: "allow", + yes: "allow", + ok: "allow", + approve: "allow", + deny: "deny", + no: "deny", + reject: "deny", + block: "deny", +}; + +type ParsedExecApproveCommand = + | { ok: true; type: "exec"; id: string; decision: "allow-once" | "allow-always" | "deny" } | { ok: false; error: string }; +type ParsedMessageApproveCommand = + | { ok: true; type: "message"; id: string; decision: "allow" | "deny" } + | { ok: false; error: string }; + +type ParsedApproveCommand = ParsedExecApproveCommand | ParsedMessageApproveCommand; + +function isMessageApprovalId(id: string): boolean { + // Message approval IDs start with "msg-" + return id.toLowerCase().startsWith("msg-"); +} + function parseApproveCommand(raw: string): ParsedApproveCommand | null { const trimmed = raw.trim(); if (!trimmed.toLowerCase().startsWith(COMMAND)) return null; const rest = trimmed.slice(COMMAND.length).trim(); if (!rest) { - return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; + return { + ok: false, + error: + "Usage: /approve allow|deny (for message) or allow-once|allow-always|deny (for exec)", + }; } const tokens = rest.split(/\s+/).filter(Boolean); if (tokens.length < 2) { - return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; + return { + ok: false, + error: + "Usage: /approve allow|deny (for message) or allow-once|allow-always|deny (for exec)", + }; } const first = tokens[0].toLowerCase(); const second = tokens[1].toLowerCase(); - if (DECISION_ALIASES[first]) { - return { - ok: true, - decision: DECISION_ALIASES[first], - id: tokens.slice(1).join(" ").trim(), - }; + // Determine if this is a message approval based on ID prefix + const idFirst = !EXEC_DECISION_ALIASES[first] && !MESSAGE_DECISION_ALIASES[first]; + const id = idFirst ? tokens[0] : tokens.slice(1).join(" ").trim(); + const decisionToken = idFirst ? second : first; + + if (isMessageApprovalId(id)) { + // Message approval + const decision = MESSAGE_DECISION_ALIASES[decisionToken]; + if (!decision) { + return { ok: false, error: "Usage: /approve allow|deny" }; + } + return { ok: true, type: "message", id, decision }; } - if (DECISION_ALIASES[second]) { - return { - ok: true, - decision: DECISION_ALIASES[second], - id: tokens[0], - }; + + // Exec approval + const execDecision = EXEC_DECISION_ALIASES[decisionToken]; + if (execDecision) { + return { ok: true, type: "exec", id, decision: execDecision }; } + + // If the decision token matches message decisions but ID doesn't have msg- prefix, + // still try to resolve as exec with allow -> allow-once mapping + const msgDecision = MESSAGE_DECISION_ALIASES[decisionToken]; + if (msgDecision === "allow") { + return { ok: true, type: "exec", id, decision: "allow-once" }; + } + if (msgDecision === "deny") { + return { ok: true, type: "exec", id, decision: "deny" }; + } + return { ok: false, error: "Usage: /approve allow-once|allow-always|deny" }; } @@ -77,6 +125,45 @@ export const handleApproveCommand: CommandHandler = async (params, allowTextComm } const resolvedBy = buildResolvedByLabel(params); + + if (parsed.type === "message") { + // Handle message approval + try { + await callGateway({ + method: "message.approval.resolve", + params: { id: parsed.id, decision: parsed.decision }, + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientDisplayName: `Chat message approval (${resolvedBy})`, + mode: GATEWAY_CLIENT_MODES.BACKEND, + }); + } catch { + // If message approval fails, try exec approval as fallback + try { + const execDecision = parsed.decision === "allow" ? "allow-once" : "deny"; + await callGateway({ + method: "exec.approval.resolve", + params: { id: parsed.id, decision: execDecision }, + clientName: GATEWAY_CLIENT_NAMES.GATEWAY_CLIENT, + clientDisplayName: `Chat approval (${resolvedBy})`, + mode: GATEWAY_CLIENT_MODES.BACKEND, + }); + } catch (execErr) { + return { + shouldContinue: false, + reply: { + text: `❌ Failed to submit approval: ${String(execErr)}`, + }, + }; + } + } + + return { + shouldContinue: false, + reply: { text: `✅ Message approval ${parsed.decision} submitted for ${parsed.id}.` }, + }; + } + + // Handle exec approval try { await callGateway({ method: "exec.approval.resolve", diff --git a/src/auto-reply/reply/commands-tts.ts b/src/auto-reply/reply/commands-tts.ts index 04b60a4e9..bba7e2b02 100644 --- a/src/auto-reply/reply/commands-tts.ts +++ b/src/auto-reply/reply/commands-tts.ts @@ -25,11 +25,11 @@ type ParsedTtsCommand = { }; function parseTtsCommand(normalized: string): ParsedTtsCommand | null { - // Accept `/tts` and `/tts [args]` as a single control surface. - if (normalized === "/tts") return { action: "status", args: "" }; + // Accept `/tts [args]` - return null for `/tts` alone to trigger inline menu. + if (normalized === "/tts") return null; if (!normalized.startsWith("/tts ")) return null; const rest = normalized.slice(5).trim(); - if (!rest) return { action: "status", args: "" }; + if (!rest) return null; const [action, ...tail] = rest.split(/\s+/); return { action: action.toLowerCase(), args: tail.join(" ").trim() }; } diff --git a/src/commands/message-format.ts b/src/commands/message-format.ts index e22baa85f..4b30d1e76 100644 --- a/src/commands/message-format.ts +++ b/src/commands/message-format.ts @@ -32,7 +32,7 @@ export type MessageCliJsonEnvelope = { action: ChannelMessageActionName; channel: ChannelId; dryRun: boolean; - handledBy: "plugin" | "core" | "dry-run"; + handledBy: "plugin" | "core" | "dry-run" | "approval-denied"; payload: unknown; }; diff --git a/src/config/types.approvals.ts b/src/config/types.approvals.ts index d86d05b8e..d84af81d7 100644 --- a/src/config/types.approvals.ts +++ b/src/config/types.approvals.ts @@ -24,6 +24,28 @@ export type ExecApprovalForwardingConfig = { targets?: ExecApprovalForwardTarget[]; }; +export type MessageApprovalForwardingMode = "session" | "targets" | "both"; + +export type MessageApprovalForwardingConfig = { + /** Enable requiring human approval for outbound messages. Default: false. */ + enabled?: boolean; + /** Delivery mode (session=origin chat, targets=config targets, both=both). Default: session. */ + mode?: MessageApprovalForwardingMode; + /** Only require approval for these actions (e.g. ["send", "broadcast"]). Omit = all actions. */ + actions?: string[]; + /** Only require approval for these channels. Omit or ["*"] = all channels. */ + channels?: string[]; + /** Only require approval for these agent IDs. Omit = all agents. */ + agentFilter?: string[]; + /** Only require approval matching these session key patterns (substring or regex). */ + sessionFilter?: string[]; + /** Explicit delivery targets (used when mode includes targets). */ + targets?: ExecApprovalForwardTarget[]; + /** Approval timeout in seconds. Default: 120. */ + timeout?: number; +}; + export type ApprovalsConfig = { exec?: ExecApprovalForwardingConfig; + message?: MessageApprovalForwardingConfig; }; diff --git a/src/config/zod-schema.approvals.ts b/src/config/zod-schema.approvals.ts index e5276d19a..84368738a 100644 --- a/src/config/zod-schema.approvals.ts +++ b/src/config/zod-schema.approvals.ts @@ -20,9 +20,24 @@ const ExecApprovalForwardingSchema = z .strict() .optional(); -export const ApprovalsSchema = z +const MessageApprovalForwardingSchema = z .object({ - exec: ExecApprovalForwardingSchema, + enabled: z.boolean().optional(), + mode: z.union([z.literal("session"), z.literal("targets"), z.literal("both")]).optional(), + actions: z.array(z.string()).optional(), + channels: z.array(z.string()).optional(), + agentFilter: z.array(z.string()).optional(), + sessionFilter: z.array(z.string()).optional(), + targets: z.array(ExecApprovalForwardTargetSchema).optional(), + timeout: z.number().optional(), + }) + .strict() + .optional(); + +export const ApprovalsSchema = z + .object({ + exec: ExecApprovalForwardingSchema, + message: MessageApprovalForwardingSchema, }) .strict() .optional(); diff --git a/src/gateway/message-approval-manager.ts b/src/gateway/message-approval-manager.ts new file mode 100644 index 000000000..fb4573e59 --- /dev/null +++ b/src/gateway/message-approval-manager.ts @@ -0,0 +1,84 @@ +import { randomUUID } from "node:crypto"; + +export type MessageApprovalDecision = "allow" | "deny"; + +export type MessageApprovalRequestPayload = { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export type MessageApprovalRecord = { + id: string; + request: MessageApprovalRequestPayload; + createdAtMs: number; + expiresAtMs: number; + resolvedAtMs?: number; + decision?: MessageApprovalDecision; + resolvedBy?: string | null; +}; + +type PendingEntry = { + record: MessageApprovalRecord; + resolve: (decision: MessageApprovalDecision | null) => void; + reject: (err: Error) => void; + timer: ReturnType; +}; + +export class MessageApprovalManager { + private pending = new Map(); + + create( + request: MessageApprovalRequestPayload, + timeoutMs: number, + id?: string | null, + ): MessageApprovalRecord { + const now = Date.now(); + const resolvedId = id && id.trim().length > 0 ? id.trim() : `msg-${randomUUID()}`; + const record: MessageApprovalRecord = { + id: resolvedId, + request, + createdAtMs: now, + expiresAtMs: now + timeoutMs, + }; + return record; + } + + async waitForDecision( + record: MessageApprovalRecord, + timeoutMs: number, + ): Promise { + return await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(record.id); + resolve(null); + }, timeoutMs); + this.pending.set(record.id, { record, resolve, reject, timer }); + }); + } + + resolve( + recordId: string, + decision: MessageApprovalDecision, + resolvedBy?: string | null, + ): boolean { + const pending = this.pending.get(recordId); + if (!pending) return false; + clearTimeout(pending.timer); + pending.record.resolvedAtMs = Date.now(); + pending.record.decision = decision; + pending.record.resolvedBy = resolvedBy ?? null; + this.pending.delete(recordId); + pending.resolve(decision); + return true; + } + + getSnapshot(recordId: string): MessageApprovalRecord | null { + const entry = this.pending.get(recordId); + return entry?.record ?? null; + } +} diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 6e5a862d1..332f9d8a6 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -83,6 +83,10 @@ import { ExecApprovalRequestParamsSchema, type ExecApprovalResolveParams, ExecApprovalResolveParamsSchema, + type MessageApprovalRequestParams, + MessageApprovalRequestParamsSchema, + type MessageApprovalResolveParams, + MessageApprovalResolveParamsSchema, ErrorCodes, type ErrorShape, ErrorShapeSchema, @@ -303,6 +307,12 @@ export const validateExecApprovalRequestParams = ajv.compile( ExecApprovalResolveParamsSchema, ); +export const validateMessageApprovalRequestParams = ajv.compile( + MessageApprovalRequestParamsSchema, +); +export const validateMessageApprovalResolveParams = ajv.compile( + MessageApprovalResolveParamsSchema, +); export const validateExecApprovalsNodeGetParams = ajv.compile( ExecApprovalsNodeGetParamsSchema, ); @@ -512,6 +522,8 @@ export type { ExecApprovalsGetParams, ExecApprovalsSetParams, ExecApprovalsSnapshot, + MessageApprovalRequestParams, + MessageApprovalResolveParams, LogsTailParams, LogsTailResult, PollParams, diff --git a/src/gateway/protocol/schema.ts b/src/gateway/protocol/schema.ts index 614942008..a77cf577f 100644 --- a/src/gateway/protocol/schema.ts +++ b/src/gateway/protocol/schema.ts @@ -5,6 +5,7 @@ export * from "./schema/config.js"; export * from "./schema/cron.js"; export * from "./schema/error-codes.js"; export * from "./schema/exec-approvals.js"; +export * from "./schema/message-approvals.js"; export * from "./schema/devices.js"; export * from "./schema/frames.js"; export * from "./schema/logs-chat.js"; diff --git a/src/gateway/protocol/schema/message-approvals.ts b/src/gateway/protocol/schema/message-approvals.ts new file mode 100644 index 000000000..68353c737 --- /dev/null +++ b/src/gateway/protocol/schema/message-approvals.ts @@ -0,0 +1,26 @@ +import { Type } from "@sinclair/typebox"; + +import { NonEmptyString } from "./primitives.js"; + +export const MessageApprovalRequestParamsSchema = Type.Object( + { + id: Type.Optional(NonEmptyString), + action: NonEmptyString, + channel: NonEmptyString, + to: NonEmptyString, + message: Type.Optional(Type.Union([Type.String(), Type.Null()])), + mediaUrl: Type.Optional(Type.Union([Type.String(), Type.Null()])), + agentId: Type.Optional(Type.Union([Type.String(), Type.Null()])), + sessionKey: Type.Optional(Type.Union([Type.String(), Type.Null()])), + timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })), + }, + { additionalProperties: false }, +); + +export const MessageApprovalResolveParamsSchema = Type.Object( + { + id: NonEmptyString, + decision: NonEmptyString, + }, + { additionalProperties: false }, +); diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index e92f114e2..d101f201c 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -60,6 +60,10 @@ import { ExecApprovalRequestParamsSchema, ExecApprovalResolveParamsSchema, } from "./exec-approvals.js"; +import { + MessageApprovalRequestParamsSchema, + MessageApprovalResolveParamsSchema, +} from "./message-approvals.js"; import { DevicePairApproveParamsSchema, DevicePairListParamsSchema, @@ -211,6 +215,8 @@ export const ProtocolSchemas: Record = { ExecApprovalsSnapshot: ExecApprovalsSnapshotSchema, ExecApprovalRequestParams: ExecApprovalRequestParamsSchema, ExecApprovalResolveParams: ExecApprovalResolveParamsSchema, + MessageApprovalRequestParams: MessageApprovalRequestParamsSchema, + MessageApprovalResolveParams: MessageApprovalResolveParamsSchema, DevicePairListParams: DevicePairListParamsSchema, DevicePairApproveParams: DevicePairApproveParamsSchema, DevicePairRejectParams: DevicePairRejectParamsSchema, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 696503721..b6359132a 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -58,6 +58,10 @@ import type { ExecApprovalRequestParamsSchema, ExecApprovalResolveParamsSchema, } from "./exec-approvals.js"; +import type { + MessageApprovalRequestParamsSchema, + MessageApprovalResolveParamsSchema, +} from "./message-approvals.js"; import type { DevicePairApproveParamsSchema, DevicePairListParamsSchema, @@ -200,6 +204,8 @@ export type ExecApprovalsNodeSetParams = Static; export type ExecApprovalRequestParams = Static; export type ExecApprovalResolveParams = Static; +export type MessageApprovalRequestParams = Static; +export type MessageApprovalResolveParams = Static; export type DevicePairListParams = Static; export type DevicePairApproveParams = Static; export type DevicePairRejectParams = Static; diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 9010a6f21..edbc487ef 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -506,12 +506,13 @@ export const chatHandlers: GatewayRequestHandlers = { }) .then(() => { if (!agentRunStarted) { + // No agent was started, meaning this was handled as a command const combinedReply = finalReplyParts .map((part) => part.trim()) .filter(Boolean) .join("\n\n") .trim(); - let message: Record | undefined; + let message: Record = { command: true }; if (combinedReply) { const { storePath: latestStorePath, entry: latestEntry } = loadSessionEntry( p.sessionKey, @@ -525,7 +526,7 @@ export const chatHandlers: GatewayRequestHandlers = { createIfMissing: true, }); if (appended.ok) { - message = appended.message; + message = { ...appended.message, command: true }; } else { context.logGateway.warn( `webchat transcript append failed: ${appended.error ?? "unknown error"}`, @@ -537,6 +538,7 @@ export const chatHandlers: GatewayRequestHandlers = { timestamp: now, stopReason: "injected", usage: { input: 0, output: 0, totalTokens: 0 }, + command: true, }; } } diff --git a/src/gateway/server-methods/message-approval.test.ts b/src/gateway/server-methods/message-approval.test.ts new file mode 100644 index 000000000..8b2ef98ee --- /dev/null +++ b/src/gateway/server-methods/message-approval.test.ts @@ -0,0 +1,305 @@ +import { describe, expect, it, vi } from "vitest"; +import { MessageApprovalManager } from "../message-approval-manager.js"; +import { createMessageApprovalHandlers } from "./message-approval.js"; +import { validateMessageApprovalRequestParams } from "../protocol/index.js"; + +const noop = () => {}; + +describe("message approval handlers", () => { + describe("MessageApprovalRequestParams validation", () => { + it("accepts valid request params", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Hello world", + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("accepts request with optional fields omitted", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("accepts request with null optional fields", () => { + const params = { + action: "send", + channel: "telegram", + to: "+1234567890", + message: null, + mediaUrl: null, + agentId: null, + sessionKey: null, + }; + expect(validateMessageApprovalRequestParams(params)).toBe(true); + }); + + it("rejects request missing required fields", () => { + const params = { + action: "send", + channel: "telegram", + // missing 'to' + }; + expect(validateMessageApprovalRequestParams(params)).toBe(false); + }); + }); + + it("broadcasts request + resolve", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + + const respond = vi.fn(); + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + expect(requested).toBeTruthy(); + const id = (requested?.payload as { id?: string })?.id ?? ""; + expect(id).not.toBe(""); + expect(id.startsWith("msg-")).toBe(true); + + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + + expect(resolveRespond).toHaveBeenCalledWith(true, { ok: true }, undefined); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ id, decision: "allow" }), + undefined, + ); + expect(broadcasts.some((entry) => entry.event === "message.approval.resolved")).toBe(true); + }); + + it("accepts resolve during broadcast", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respond = vi.fn(); + const resolveRespond = vi.fn(); + + const resolveContext = { + broadcast: () => {}, + }; + + const context = { + broadcast: (event: string, payload: unknown) => { + if (event !== "message.approval.requested") return; + const id = (payload as { id?: string })?.id ?? ""; + void handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: resolveContext as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + }, + }; + + await handlers["message.approval.request"]({ + params: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + expect(resolveRespond).toHaveBeenCalledWith(true, { ok: true }, undefined); + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ decision: "allow" }), + undefined, + ); + }); + + it("accepts explicit approval ids", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + + const respond = vi.fn(); + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + id: "msg-approval-123", + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Test message", + timeoutMs: 2000, + }, + respond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + const id = (requested?.payload as { id?: string })?.id ?? ""; + expect(id).toBe("msg-approval-123"); + + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "allow" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-2", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + expect(respond).toHaveBeenCalledWith( + true, + expect.objectContaining({ id: "msg-approval-123", decision: "allow" }), + undefined, + ); + }); + + it("rejects duplicate approval ids", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respondA = vi.fn(); + const respondB = vi.fn(); + const broadcasts: Array<{ event: string; payload: unknown }> = []; + const context = { + broadcast: (event: string, payload: unknown) => { + broadcasts.push({ event, payload }); + }, + }; + + const requestPromise = handlers["message.approval.request"]({ + params: { + id: "msg-dup-1", + action: "send", + channel: "telegram", + to: "+1234567890", + }, + respond: respondA, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + await handlers["message.approval.request"]({ + params: { + id: "msg-dup-1", + action: "send", + channel: "slack", + to: "U1234", + }, + respond: respondB, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.request"] + >[0]["context"], + client: null, + req: { id: "req-2", type: "req", method: "message.approval.request" }, + isWebchatConnect: noop, + }); + + expect(respondB).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ message: "approval id already pending" }), + ); + + const requested = broadcasts.find((entry) => entry.event === "message.approval.requested"); + const id = (requested?.payload as { id?: string })?.id ?? ""; + const resolveRespond = vi.fn(); + await handlers["message.approval.resolve"]({ + params: { id, decision: "deny" }, + respond: resolveRespond, + context: context as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: { connect: { client: { id: "cli", displayName: "CLI" } } }, + req: { id: "req-3", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + await requestPromise; + }); + + it("rejects invalid decision", async () => { + const manager = new MessageApprovalManager(); + const handlers = createMessageApprovalHandlers(manager); + const respond = vi.fn(); + + await handlers["message.approval.resolve"]({ + params: { id: "msg-123", decision: "allow-always" }, // invalid for message approvals + respond, + context: { broadcast: () => {} } as unknown as Parameters< + (typeof handlers)["message.approval.resolve"] + >[0]["context"], + client: null, + req: { id: "req-1", type: "req", method: "message.approval.resolve" }, + isWebchatConnect: noop, + }); + + expect(respond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ message: "invalid decision" }), + ); + }); +}); diff --git a/src/gateway/server-methods/message-approval.ts b/src/gateway/server-methods/message-approval.ts new file mode 100644 index 000000000..9fc39068b --- /dev/null +++ b/src/gateway/server-methods/message-approval.ts @@ -0,0 +1,137 @@ +import type { MessageApprovalForwarder } from "../../infra/message-approval-forwarder.js"; +import type { + MessageApprovalDecision, + MessageApprovalManager, +} from "../message-approval-manager.js"; +import { + ErrorCodes, + errorShape, + formatValidationErrors, + validateMessageApprovalRequestParams, + validateMessageApprovalResolveParams, +} from "../protocol/index.js"; +import type { GatewayRequestHandlers } from "./types.js"; + +export function createMessageApprovalHandlers( + manager: MessageApprovalManager, + opts?: { forwarder?: MessageApprovalForwarder }, +): GatewayRequestHandlers { + return { + "message.approval.request": async ({ params, respond, context }) => { + if (!validateMessageApprovalRequestParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid message.approval.request params: ${formatValidationErrors( + validateMessageApprovalRequestParams.errors, + )}`, + ), + ); + return; + } + const p = params as { + id?: string; + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; + timeoutMs?: number; + }; + const timeoutMs = typeof p.timeoutMs === "number" ? p.timeoutMs : 120_000; + const explicitId = typeof p.id === "string" && p.id.trim().length > 0 ? p.id.trim() : null; + if (explicitId && manager.getSnapshot(explicitId)) { + respond( + false, + undefined, + errorShape(ErrorCodes.INVALID_REQUEST, "approval id already pending"), + ); + return; + } + const request = { + action: p.action, + channel: p.channel, + to: p.to, + message: p.message ?? null, + mediaUrl: p.mediaUrl ?? null, + agentId: p.agentId ?? null, + sessionKey: p.sessionKey ?? null, + }; + const record = manager.create(request, timeoutMs, explicitId); + const decisionPromise = manager.waitForDecision(record, timeoutMs); + context.broadcast( + "message.approval.requested", + { + id: record.id, + request: record.request, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }, + { dropIfSlow: true }, + ); + void opts?.forwarder + ?.handleRequested({ + id: record.id, + request: record.request, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }) + .catch((err) => { + context.logGateway?.error?.(`message approvals: forward request failed: ${String(err)}`); + }); + const decision = await decisionPromise; + respond( + true, + { + id: record.id, + decision, + createdAtMs: record.createdAtMs, + expiresAtMs: record.expiresAtMs, + }, + undefined, + ); + }, + "message.approval.resolve": async ({ params, respond, client, context }) => { + if (!validateMessageApprovalResolveParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid message.approval.resolve params: ${formatValidationErrors( + validateMessageApprovalResolveParams.errors, + )}`, + ), + ); + return; + } + const p = params as { id: string; decision: string }; + const decision = p.decision as MessageApprovalDecision; + if (decision !== "allow" && decision !== "deny") { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "invalid decision")); + return; + } + const resolvedBy = client?.connect?.client?.displayName ?? client?.connect?.client?.id; + const ok = manager.resolve(p.id, decision, resolvedBy ?? null); + if (!ok) { + respond(false, undefined, errorShape(ErrorCodes.INVALID_REQUEST, "unknown approval id")); + return; + } + context.broadcast( + "message.approval.resolved", + { id: p.id, decision, resolvedBy, ts: Date.now() }, + { dropIfSlow: true }, + ); + void opts?.forwarder + ?.handleResolved({ id: p.id, decision, resolvedBy, ts: Date.now() }) + .catch((err) => { + context.logGateway?.error?.(`message approvals: forward resolve failed: ${String(err)}`); + }); + respond(true, { ok: true }, undefined); + }, + }; +} diff --git a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts index 0cc9cee93..7c44475c1 100644 --- a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts @@ -6,6 +6,7 @@ import { WebSocket } from "ws"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; import { + agentCommand, connectOk, getReplyFromConfig, installGatewayTestHooks, diff --git a/src/gateway/server.chat.webchat-commands.test.ts b/src/gateway/server.chat.webchat-commands.test.ts new file mode 100644 index 000000000..94147cf54 --- /dev/null +++ b/src/gateway/server.chat.webchat-commands.test.ts @@ -0,0 +1,123 @@ +import { describe, expect, it, vi, beforeEach } from "vitest"; + +import type { MsgContext } from "../auto-reply/templating.js"; +import { INTERNAL_MESSAGE_CHANNEL } from "../utils/message-channel.js"; +import { buildCommandContext, handleCommands } from "../auto-reply/reply/commands.js"; +import { parseInlineDirectives } from "../auto-reply/reply/directive-handling.js"; +import type { ClawdbotConfig } from "../config/config.js"; + +// Test that webchat commands work with CommandAuthorized: true +describe("webchat slash commands", () => { + const workspaceDir = "/tmp/clawdbot-test"; + + function buildWebchatParams(commandBody: string, cfg: ClawdbotConfig) { + const ctx = { + Body: commandBody, + BodyForAgent: commandBody, + BodyForCommands: commandBody, + RawBody: commandBody, + CommandBody: commandBody, + SessionKey: "agent:main:webchat:test", + Provider: INTERNAL_MESSAGE_CHANNEL, + Surface: INTERNAL_MESSAGE_CHANNEL, + OriginatingChannel: INTERNAL_MESSAGE_CHANNEL, + ChatType: "direct", + CommandAuthorized: true, + CommandSource: undefined, + } as MsgContext; + + const command = buildCommandContext({ + ctx, + cfg, + isGroup: false, + triggerBodyNormalized: commandBody.trim(), + commandAuthorized: true, + }); + + return { + ctx, + cfg, + command, + directives: parseInlineDirectives(commandBody), + elevated: { enabled: true, allowed: true, failures: [] }, + sessionKey: "agent:main:webchat:test", + workspaceDir, + defaultGroupActivation: () => "mention" as const, + resolvedVerboseLevel: "off" as const, + resolvedReasoningLevel: "off" as const, + resolveDefaultThinkingLevel: async () => undefined, + provider: "webchat", + model: "test-model", + contextTokens: 0, + isGroup: false, + }; + } + + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("/status returns a reply and does not continue to agent", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + console.log("Command context:", { + commandBodyNormalized: params.command.commandBodyNormalized, + isAuthorizedSender: params.command.isAuthorizedSender, + surface: params.command.surface, + }); + + const result = await handleCommands(params); + + console.log("Result:", { + shouldContinue: result.shouldContinue, + hasReply: Boolean(result.reply), + replyPreview: result.reply?.text?.slice(0, 100), + }); + + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + expect(result.reply?.text).toContain("Clawdbot"); + }); + + it("/help returns a reply and does not continue to agent", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/help", cfg); + + const result = await handleCommands(params); + + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + expect(result.reply?.text).toContain("Help"); + }); + + it("/new continues to agent (session reset)", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/new", cfg); + + const result = await handleCommands(params); + + // /new triggers session reset but continues to agent for greeting + expect(result.shouldContinue).toBe(true); + }); + + it("commands work with commands.text: false (webchat is not native)", async () => { + const cfg = { commands: { text: false } } as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + const result = await handleCommands(params); + + // Even with commands.text: false, webchat should still handle commands + // because webchat doesn't have native command support + expect(result.shouldContinue).toBe(false); + expect(result.reply).toBeDefined(); + }); + + it("verifies isAuthorizedSender is true for webchat", async () => { + const cfg = {} as ClawdbotConfig; + const params = buildWebchatParams("/status", cfg); + + expect(params.command.isAuthorizedSender).toBe(true); + expect(params.command.surface).toBe(INTERNAL_MESSAGE_CHANNEL); + }); +}); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index efa91be76..3910a1a94 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -45,6 +45,9 @@ import { startGatewayDiscovery } from "./server-discovery-runtime.js"; import { ExecApprovalManager } from "./exec-approval-manager.js"; import { createExecApprovalHandlers } from "./server-methods/exec-approval.js"; import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js"; +import { MessageApprovalManager } from "./message-approval-manager.js"; +import { createMessageApprovalHandlers } from "./server-methods/message-approval.js"; +import { createMessageApprovalForwarder } from "../infra/message-approval-forwarder.js"; import type { startBrowserControlServerIfEnabled } from "./server-browser.js"; import { createChannelManager } from "./server-channels.js"; import { createAgentEventHandler } from "./server-chat.js"; @@ -417,6 +420,12 @@ export async function startGatewayServer( forwarder: execApprovalForwarder, }); + const messageApprovalManager = new MessageApprovalManager(); + const messageApprovalForwarder = createMessageApprovalForwarder(); + const messageApprovalHandlers = createMessageApprovalHandlers(messageApprovalManager, { + forwarder: messageApprovalForwarder, + }); + const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; attachGatewayWsHandlers({ @@ -435,6 +444,7 @@ export async function startGatewayServer( extraHandlers: { ...pluginRegistry.gatewayHandlers, ...execApprovalHandlers, + ...messageApprovalHandlers, }, broadcast, context: { diff --git a/src/infra/message-approval-forwarder.test.ts b/src/infra/message-approval-forwarder.test.ts new file mode 100644 index 000000000..784a1909b --- /dev/null +++ b/src/infra/message-approval-forwarder.test.ts @@ -0,0 +1,196 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +import type { ClawdbotConfig } from "../config/config.js"; +import { createMessageApprovalForwarder } from "./message-approval-forwarder.js"; + +const baseRequest = { + id: "msg-req-1", + request: { + action: "send", + channel: "telegram", + to: "+1234567890", + message: "Hello world", + agentId: "main", + sessionKey: "agent:main:main", + }, + createdAtMs: 1000, + expiresAtMs: 6000, +}; + +afterEach(() => { + vi.useRealTimers(); +}); + +describe("message approval forwarder", () => { + it("forwards to session target and resolves", async () => { + vi.useFakeTimers(); + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + await forwarder.handleResolved({ + id: baseRequest.id, + decision: "allow", + resolvedBy: "slack:U1", + ts: 2000, + }); + expect(deliver).toHaveBeenCalledTimes(2); + + await vi.runAllTimersAsync(); + expect(deliver).toHaveBeenCalledTimes(2); + }); + + it("forwards to explicit targets and expires", async () => { + vi.useFakeTimers(); + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "targets", + targets: [{ channel: "telegram", to: "123" }], + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => null, + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + await vi.runAllTimersAsync(); + expect(deliver).toHaveBeenCalledTimes(2); + }); + + it("filters by action", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + actions: ["broadcast"], // only broadcast, not send + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // action is "send" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("filters by channel", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + channels: ["slack"], // only slack, not telegram + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // channel is "telegram" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("filters by agentId", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { + message: { + enabled: true, + mode: "session", + agentFilter: ["other-agent"], // not main + }, + }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); // agentId is "main" + expect(deliver).not.toHaveBeenCalled(); + }); + + it("includes message content in forwarded notification", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + await forwarder.handleRequested(baseRequest); + expect(deliver).toHaveBeenCalledTimes(1); + + const call = deliver.mock.calls[0][0]; + expect(call.payloads[0].text).toContain("Message approval required"); + expect(call.payloads[0].text).toContain("Hello world"); + expect(call.payloads[0].text).toContain("send"); + expect(call.payloads[0].text).toContain("telegram"); + }); + + it("truncates long messages", async () => { + const deliver = vi.fn().mockResolvedValue([]); + const cfg = { + approvals: { message: { enabled: true, mode: "session" } }, + } as ClawdbotConfig; + + const forwarder = createMessageApprovalForwarder({ + getConfig: () => cfg, + deliver, + nowMs: () => 1000, + resolveSessionTarget: () => ({ channel: "slack", to: "U1" }), + }); + + const longMessage = "x".repeat(500); + await forwarder.handleRequested({ + ...baseRequest, + request: { ...baseRequest.request, message: longMessage }, + }); + + const call = deliver.mock.calls[0][0]; + expect(call.payloads[0].text).toContain("..."); + expect(call.payloads[0].text.length).toBeLessThan(longMessage.length + 200); + }); +}); diff --git a/src/infra/message-approval-forwarder.ts b/src/infra/message-approval-forwarder.ts new file mode 100644 index 000000000..43327be10 --- /dev/null +++ b/src/infra/message-approval-forwarder.ts @@ -0,0 +1,295 @@ +import type { ClawdbotConfig } from "../config/config.js"; +import { loadConfig } from "../config/config.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; +import type { + ExecApprovalForwardTarget, + MessageApprovalForwardingConfig, +} from "../config/types.approvals.js"; +import type { MessageApprovalDecision } from "../gateway/message-approval-manager.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { parseAgentSessionKey } from "../routing/session-key.js"; +import { isDeliverableMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js"; +import { deliverOutboundPayloads } from "./outbound/deliver.js"; +import { resolveSessionDeliveryTarget } from "./outbound/targets.js"; + +const log = createSubsystemLogger("gateway/message-approvals"); + +export type MessageApprovalRequest = { + id: string; + request: { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; + }; + createdAtMs: number; + expiresAtMs: number; +}; + +export type MessageApprovalResolved = { + id: string; + decision: MessageApprovalDecision; + resolvedBy?: string | null; + ts: number; +}; + +type ForwardTarget = ExecApprovalForwardTarget & { source: "session" | "target" }; + +type PendingApproval = { + request: MessageApprovalRequest; + targets: ForwardTarget[]; + timeoutId: NodeJS.Timeout | null; +}; + +export type MessageApprovalForwarder = { + handleRequested: (request: MessageApprovalRequest) => Promise; + handleResolved: (resolved: MessageApprovalResolved) => Promise; + stop: () => void; +}; + +export type MessageApprovalForwarderDeps = { + getConfig?: () => ClawdbotConfig; + deliver?: typeof deliverOutboundPayloads; + nowMs?: () => number; + resolveSessionTarget?: (params: { + cfg: ClawdbotConfig; + request: MessageApprovalRequest; + }) => ExecApprovalForwardTarget | null; +}; + +const DEFAULT_MODE = "session" as const; + +function normalizeMode(mode?: MessageApprovalForwardingConfig["mode"]) { + return mode ?? DEFAULT_MODE; +} + +function matchSessionFilter(sessionKey: string, patterns: string[]): boolean { + return patterns.some((pattern) => { + try { + return sessionKey.includes(pattern) || new RegExp(pattern).test(sessionKey); + } catch { + return sessionKey.includes(pattern); + } + }); +} + +function shouldForward(params: { + config?: MessageApprovalForwardingConfig; + request: MessageApprovalRequest; +}): boolean { + const config = params.config; + if (!config?.enabled) return false; + if (config.actions?.length) { + if (!config.actions.includes(params.request.request.action)) return false; + } + if (config.channels?.length && !config.channels.includes("*")) { + const channel = normalizeMessageChannel(params.request.request.channel); + if (!channel || !config.channels.includes(channel)) return false; + } + if (config.agentFilter?.length) { + const agentId = + params.request.request.agentId ?? + parseAgentSessionKey(params.request.request.sessionKey)?.agentId; + if (!agentId) return false; + if (!config.agentFilter.includes(agentId)) return false; + } + if (config.sessionFilter?.length) { + const sessionKey = params.request.request.sessionKey; + if (!sessionKey) return false; + if (!matchSessionFilter(sessionKey, config.sessionFilter)) return false; + } + return true; +} + +function buildTargetKey(target: ExecApprovalForwardTarget): string { + const channel = normalizeMessageChannel(target.channel) ?? target.channel; + const accountId = target.accountId ?? ""; + const threadId = target.threadId ?? ""; + return [channel, target.to, accountId, threadId].join(":"); +} + +function truncateMessage(message: string | null | undefined, maxLen = 200): string { + if (!message) return "(empty)"; + if (message.length <= maxLen) return message; + return `${message.slice(0, maxLen)}...`; +} + +function buildRequestMessage(request: MessageApprovalRequest, nowMs: number) { + const lines: string[] = ["📬 Message approval required", `ID: ${request.id}`]; + lines.push(`Action: ${request.request.action}`); + lines.push(`Channel: ${request.request.channel}`); + lines.push(`To: ${request.request.to}`); + if (request.request.message) { + lines.push(`Message: ${truncateMessage(request.request.message)}`); + } + if (request.request.mediaUrl) lines.push(`Media: ${request.request.mediaUrl}`); + if (request.request.agentId) lines.push(`Agent: ${request.request.agentId}`); + const expiresIn = Math.max(0, Math.round((request.expiresAtMs - nowMs) / 1000)); + lines.push(`Expires in: ${expiresIn}s`); + lines.push("Reply with: /approve allow|deny"); + return lines.join("\n"); +} + +function decisionLabel(decision: MessageApprovalDecision): string { + if (decision === "allow") return "allowed"; + return "denied"; +} + +function buildResolvedMessage(resolved: MessageApprovalResolved) { + const base = `✅ Message approval ${decisionLabel(resolved.decision)}.`; + const by = resolved.resolvedBy ? ` Resolved by ${resolved.resolvedBy}.` : ""; + return `${base}${by} ID: ${resolved.id}`; +} + +function buildExpiredMessage(request: MessageApprovalRequest) { + return `⏱️ Message approval expired. ID: ${request.id}`; +} + +function defaultResolveSessionTarget(params: { + cfg: ClawdbotConfig; + request: MessageApprovalRequest; +}): ExecApprovalForwardTarget | null { + const sessionKey = params.request.request.sessionKey?.trim(); + if (!sessionKey) return null; + const parsed = parseAgentSessionKey(sessionKey); + const agentId = parsed?.agentId ?? params.request.request.agentId ?? "main"; + const storePath = resolveStorePath(params.cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath); + const entry = store[sessionKey]; + if (!entry) return null; + const target = resolveSessionDeliveryTarget({ entry, requestedChannel: "last" }); + if (!target.channel || !target.to) return null; + if (!isDeliverableMessageChannel(target.channel)) return null; + return { + channel: target.channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + }; +} + +async function deliverToTargets(params: { + cfg: ClawdbotConfig; + targets: ForwardTarget[]; + text: string; + deliver: typeof deliverOutboundPayloads; + shouldSend?: () => boolean; +}) { + const deliveries = params.targets.map(async (target) => { + if (params.shouldSend && !params.shouldSend()) return; + const channel = normalizeMessageChannel(target.channel) ?? target.channel; + if (!isDeliverableMessageChannel(channel)) return; + try { + await params.deliver({ + cfg: params.cfg, + channel, + to: target.to, + accountId: target.accountId, + threadId: target.threadId, + payloads: [{ text: params.text }], + }); + } catch (err) { + log.error(`message approvals: failed to deliver to ${channel}:${target.to}: ${String(err)}`); + } + }); + await Promise.allSettled(deliveries); +} + +export function createMessageApprovalForwarder( + deps: MessageApprovalForwarderDeps = {}, +): MessageApprovalForwarder { + const getConfig = deps.getConfig ?? loadConfig; + const deliver = deps.deliver ?? deliverOutboundPayloads; + const nowMs = deps.nowMs ?? Date.now; + const resolveSessionTarget = deps.resolveSessionTarget ?? defaultResolveSessionTarget; + const pending = new Map(); + + const handleRequested = async (request: MessageApprovalRequest) => { + const cfg = getConfig(); + const config = cfg.approvals?.message; + if (!shouldForward({ config, request })) return; + + const mode = normalizeMode(config?.mode); + const targets: ForwardTarget[] = []; + const seen = new Set(); + + if (mode === "session" || mode === "both") { + const sessionTarget = resolveSessionTarget({ cfg, request }); + if (sessionTarget) { + const key = buildTargetKey(sessionTarget); + if (!seen.has(key)) { + seen.add(key); + targets.push({ ...sessionTarget, source: "session" }); + } + } + } + + if (mode === "targets" || mode === "both") { + const explicitTargets = config?.targets ?? []; + for (const target of explicitTargets) { + const key = buildTargetKey(target); + if (seen.has(key)) continue; + seen.add(key); + targets.push({ ...target, source: "target" }); + } + } + + if (targets.length === 0) return; + + const expiresInMs = Math.max(0, request.expiresAtMs - nowMs()); + const timeoutId = setTimeout(() => { + void (async () => { + const entry = pending.get(request.id); + if (!entry) return; + pending.delete(request.id); + const expiredText = buildExpiredMessage(request); + await deliverToTargets({ cfg, targets: entry.targets, text: expiredText, deliver }); + })(); + }, expiresInMs); + timeoutId.unref?.(); + + const pendingEntry: PendingApproval = { request, targets, timeoutId }; + pending.set(request.id, pendingEntry); + + if (pending.get(request.id) !== pendingEntry) return; + + const text = buildRequestMessage(request, nowMs()); + await deliverToTargets({ + cfg, + targets, + text, + deliver, + shouldSend: () => pending.get(request.id) === pendingEntry, + }); + }; + + const handleResolved = async (resolved: MessageApprovalResolved) => { + const entry = pending.get(resolved.id); + if (!entry) return; + if (entry.timeoutId) clearTimeout(entry.timeoutId); + pending.delete(resolved.id); + + const cfg = getConfig(); + const text = buildResolvedMessage(resolved); + await deliverToTargets({ cfg, targets: entry.targets, text, deliver }); + }; + + const stop = () => { + for (const entry of pending.values()) { + if (entry.timeoutId) clearTimeout(entry.timeoutId); + } + pending.clear(); + }; + + return { handleRequested, handleResolved, stop }; +} + +export function shouldForwardMessageApproval(params: { + config?: MessageApprovalForwardingConfig; + request: MessageApprovalRequest; +}): boolean { + return shouldForward(params); +} diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 98beaa828..aa18ab90a 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -43,6 +43,8 @@ import { resolveChannelTarget, type ResolvedMessagingTarget } from "./target-res import { loadWebMedia } from "../../web/media.js"; import { extensionForMime } from "../../media/mime.js"; import { parseSlackTarget } from "../../slack/targets.js"; +import type { GatewayClient } from "../../gateway/client.js"; +import { requestMessageApproval, shouldRequireMessageApproval } from "./message-approval-check.js"; export type MessageActionRunnerGateway = { url?: string; @@ -60,11 +62,15 @@ export type RunMessageActionParams = { defaultAccountId?: string; toolContext?: ChannelThreadingToolContext; gateway?: MessageActionRunnerGateway; + /** Connected gateway client for RPC calls (e.g., message approval requests). */ + gatewayClient?: GatewayClient; deps?: OutboundSendDeps; sessionKey?: string; agentId?: string; dryRun?: boolean; abortSignal?: AbortSignal; + /** Skip message approval check (used for internal/already-approved calls). */ + skipApproval?: boolean; }; export type MessageActionRunResult = @@ -114,6 +120,14 @@ export type MessageActionRunResult = payload: unknown; toolResult?: AgentToolResult; dryRun: boolean; + } + | { + kind: "approval-denied"; + channel: ChannelId; + action: ChannelMessageActionName; + handledBy: "approval-denied"; + payload: { denied: true; reason: "timeout" | "user-denied" | "error"; error?: string }; + dryRun: false; }; export function getToolResult( @@ -950,6 +964,51 @@ export async function runMessageAction( const gateway = resolveGateway(input); + // Check if message approval is required + if (!input.skipApproval && input.gatewayClient) { + const targetTo = resolvedTarget?.to ?? readStringParam(params, "to") ?? ""; + if ( + shouldRequireMessageApproval({ + cfg, + action, + channel, + agentId: resolvedAgentId, + sessionKey: input.sessionKey, + }) + ) { + const message = readStringParam(params, "message") ?? readStringParam(params, "text") ?? null; + const mediaUrl = + readStringParam(params, "mediaUrl") ?? readStringParam(params, "url") ?? null; + const approvalResult = await requestMessageApproval({ + cfg, + gateway: input.gatewayClient, + action, + channel, + to: targetTo, + message, + mediaUrl, + agentId: resolvedAgentId, + sessionKey: input.sessionKey, + }); + if (approvalResult.decision !== "allow") { + // Distinguish between timeout (null decision, no error), denial (deny decision), and error (null decision with error) + const reason = approvalResult.error + ? "error" + : approvalResult.decision === null + ? "timeout" + : "user-denied"; + return { + kind: "approval-denied", + channel: channel as ChannelId, + action, + handledBy: "approval-denied", + payload: { denied: true, reason, error: approvalResult.error }, + dryRun: false, + }; + } + } + } + if (action === "send") { return handleSendAction({ cfg, diff --git a/src/infra/outbound/message-approval-check.ts b/src/infra/outbound/message-approval-check.ts new file mode 100644 index 000000000..c7fed971c --- /dev/null +++ b/src/infra/outbound/message-approval-check.ts @@ -0,0 +1,114 @@ +import type { ClawdbotConfig } from "../../config/config.js"; +import type { MessageApprovalForwardingConfig } from "../../config/types.approvals.js"; +import type { GatewayClient } from "../../gateway/client.js"; +import type { MessageApprovalDecision } from "../../gateway/message-approval-manager.js"; +import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { parseAgentSessionKey } from "../../routing/session-key.js"; +import { normalizeMessageChannel } from "../../utils/message-channel.js"; + +const log = createSubsystemLogger("message-approval-check"); + +export type MessageApprovalCheckParams = { + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export function resolveMessageApprovalConfig( + cfg: ClawdbotConfig, +): MessageApprovalForwardingConfig | undefined { + return cfg.approvals?.message; +} + +export function shouldRequireMessageApproval(params: { + cfg: ClawdbotConfig; + action: string; + channel: string; + agentId?: string | null; + sessionKey?: string | null; +}): boolean { + const config = resolveMessageApprovalConfig(params.cfg); + if (!config?.enabled) return false; + + if (config.actions?.length) { + if (!config.actions.includes(params.action)) return false; + } + + if (config.channels?.length && !config.channels.includes("*")) { + const channel = normalizeMessageChannel(params.channel); + if (!channel || !config.channels.includes(channel)) return false; + } + + if (config.agentFilter?.length) { + const agentId = params.agentId ?? parseAgentSessionKey(params.sessionKey)?.agentId; + if (!agentId) return false; + if (!config.agentFilter.includes(agentId)) return false; + } + + if (config.sessionFilter?.length) { + const sessionKey = params.sessionKey; + if (!sessionKey) return false; + const matched = config.sessionFilter.some((pattern) => { + try { + return sessionKey.includes(pattern) || new RegExp(pattern).test(sessionKey); + } catch { + return sessionKey.includes(pattern); + } + }); + if (!matched) return false; + } + + return true; +} + +export type RequestMessageApprovalParams = { + cfg: ClawdbotConfig; + gateway: GatewayClient; + action: string; + channel: string; + to: string; + message?: string | null; + mediaUrl?: string | null; + agentId?: string | null; + sessionKey?: string | null; +}; + +export type RequestMessageApprovalResult = { + decision: MessageApprovalDecision | null; + id: string; + /** Error message if the request failed (distinct from timeout which has null decision but no error). */ + error?: string; +}; + +export async function requestMessageApproval( + params: RequestMessageApprovalParams, +): Promise { + const config = resolveMessageApprovalConfig(params.cfg); + const timeoutMs = (config?.timeout ?? 120) * 1000; + + try { + const result = await params.gateway.request<{ + id: string; + decision: MessageApprovalDecision | null; + }>("message.approval.request", { + action: params.action, + channel: params.channel, + to: params.to, + message: params.message ?? null, + mediaUrl: params.mediaUrl ?? null, + agentId: params.agentId ?? null, + sessionKey: params.sessionKey ?? null, + timeoutMs, + }); + + return { decision: result.decision, id: result.id }; + } catch (err) { + const errorMsg = String(err); + log.error(`message approval request error: ${errorMsg}`); + return { decision: null, id: "", error: errorMsg }; + } +}