diff --git a/src/agents/tools/message-tool.ts b/src/agents/tools/message-tool.ts index 6552564e9..eae4356db 100644 --- a/src/agents/tools/message-tool.ts +++ b/src/agents/tools/message-tool.ts @@ -333,7 +333,13 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { name: "message", description, parameters: schema, - execute: async (_toolCallId, args) => { + execute: async (_toolCallId, args, signal) => { + // Check if already aborted before doing any work + if (signal?.aborted) { + const err = new Error("Message send aborted"); + err.name = "AbortError"; + throw err; + } const params = args as Record; const cfg = options?.config ?? loadConfig(); const action = readStringParam(params, "action", { @@ -366,6 +372,9 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { currentThreadTs: options?.currentThreadTs, replyToMode: options?.replyToMode, hasRepliedRef: options?.hasRepliedRef, + // Direct tool invocations should not add cross-context decoration. + // The agent is composing a message, not forwarding from another chat. + skipCrossContextDecoration: true, } : undefined; @@ -379,6 +388,7 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool { agentId: options?.agentSessionKey ? resolveSessionAgentId({ sessionKey: options.agentSessionKey, config: cfg }) : undefined, + abortSignal: signal, }); const toolResult = getToolResult(result); diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 99f847fda..6a76743f2 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -240,6 +240,12 @@ export type ChannelThreadingToolContext = { currentThreadTs?: string; replyToMode?: "off" | "first" | "all"; hasRepliedRef?: { value: boolean }; + /** + * When true, skip cross-context decoration (e.g., "[from X]" prefix). + * Use this for direct tool invocations where the agent is composing a new message, + * not forwarding/relaying a message from another conversation. + */ + skipCrossContextDecoration?: boolean; }; export type ChannelMessagingAdapter = { diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 50ddce227..2fe4cfeb6 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -64,6 +64,7 @@ export type RunMessageActionParams = { sessionKey?: string; agentId?: string; dryRun?: boolean; + abortSignal?: AbortSignal; }; export type MessageActionRunResult = @@ -507,6 +508,7 @@ type ResolvedActionContext = { input: RunMessageActionParams; agentId?: string; resolvedTarget?: ResolvedMessagingTarget; + abortSignal?: AbortSignal; }; function resolveGateway(input: RunMessageActionParams): MessageActionRunnerGateway | undefined { if (!input.gateway) return undefined; @@ -592,8 +594,28 @@ async function handleBroadcastAction( }; } +function throwIfAborted(abortSignal?: AbortSignal): void { + if (abortSignal?.aborted) { + const err = new Error("Message send aborted"); + err.name = "AbortError"; + throw err; + } +} + async function handleSendAction(ctx: ResolvedActionContext): Promise { - const { cfg, params, channel, accountId, dryRun, gateway, input, agentId, resolvedTarget } = ctx; + const { + cfg, + params, + channel, + accountId, + dryRun, + gateway, + input, + agentId, + resolvedTarget, + abortSignal, + } = ctx; + throwIfAborted(abortSignal); const action: ChannelMessageActionName = "send"; const to = readStringParam(params, "to", { required: true }); // Support media, path, and filePath parameters for attachments @@ -676,6 +698,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined; + throwIfAborted(abortSignal); const send = await executeSendAction({ ctx: { cfg, @@ -695,6 +718,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise { - const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx; + const { cfg, params, channel, accountId, dryRun, gateway, input, abortSignal } = ctx; + throwIfAborted(abortSignal); const action: ChannelMessageActionName = "poll"; const to = readStringParam(params, "to", { required: true }); const question = readStringParam(params, "pollQuestion", { @@ -777,7 +802,8 @@ async function handlePollAction(ctx: ResolvedActionContext): Promise { - const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx; + const { cfg, params, channel, accountId, dryRun, gateway, input, abortSignal } = ctx; + throwIfAborted(abortSignal); const action = input.action as Exclude; if (dryRun) { return { @@ -930,6 +956,7 @@ export async function runMessageAction( input, agentId: resolvedAgentId, resolvedTarget, + abortSignal: input.abortSignal, }); } @@ -942,6 +969,7 @@ export async function runMessageAction( dryRun, gateway, input, + abortSignal: input.abortSignal, }); } @@ -953,5 +981,6 @@ export async function runMessageAction( dryRun, gateway, input, + abortSignal: input.abortSignal, }); } diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index fcb90c295..6f5f88bd2 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -50,6 +50,7 @@ type MessageSendParams = { text?: string; mediaUrls?: string[]; }; + abortSignal?: AbortSignal; }; export type MessageSendResult = { @@ -167,6 +168,7 @@ export async function sendMessage(params: MessageSendParams): Promise { if (!params.toolContext?.currentChannelId) return null; + // Skip decoration for direct tool sends (agent composing, not forwarding) + if (params.toolContext.skipCrossContextDecoration) return null; if (!isCrossContextTarget(params)) return null; const markerConfig = params.cfg.tools?.message?.crossContext?.marker; diff --git a/src/infra/outbound/outbound-send-service.ts b/src/infra/outbound/outbound-send-service.ts index dd5dfd5e6..88a64d251 100644 --- a/src/infra/outbound/outbound-send-service.ts +++ b/src/infra/outbound/outbound-send-service.ts @@ -32,6 +32,7 @@ export type OutboundSendContext = { text?: string; mediaUrls?: string[]; }; + abortSignal?: AbortSignal; }; function extractToolPayload(result: AgentToolResult): unknown { @@ -56,6 +57,14 @@ function extractToolPayload(result: AgentToolResult): unknown { return result.content ?? result; } +function throwIfAborted(abortSignal?: AbortSignal): void { + if (abortSignal?.aborted) { + const err = new Error("Message send aborted"); + err.name = "AbortError"; + throw err; + } +} + export async function executeSendAction(params: { ctx: OutboundSendContext; to: string; @@ -70,6 +79,7 @@ export async function executeSendAction(params: { toolResult?: AgentToolResult; sendResult?: MessageSendResult; }> { + throwIfAborted(params.ctx.abortSignal); if (!params.ctx.dryRun) { const handled = await dispatchChannelMessageAction({ channel: params.ctx.channel, @@ -103,6 +113,7 @@ export async function executeSendAction(params: { } } + throwIfAborted(params.ctx.abortSignal); const result: MessageSendResult = await sendMessage({ cfg: params.ctx.cfg, to: params.to, @@ -117,6 +128,7 @@ export async function executeSendAction(params: { deps: params.ctx.deps, gateway: params.ctx.gateway, mirror: params.ctx.mirror, + abortSignal: params.ctx.abortSignal, }); return {