fix: prevent cross-context decoration on direct message tool sends
Two fixes: 1. Cross-context decoration (e.g., '[from +19257864429]' prefix) was being added to ALL messages sent to a different target, even when the agent was just composing a new message via the message tool. This decoration should only be applied when forwarding/relaying messages between chats. Fix: Added skipCrossContextDecoration flag to ChannelThreadingToolContext. The message tool now sets this flag to true, so direct sends don't get decorated. The buildCrossContextDecoration function checks this flag and returns null when set. 2. Aborted requests were still completing because the abort signal wasn't being passed through the message tool execution chain. Fix: Added abortSignal propagation from message tool → runMessageAction → executeSendAction → sendMessage → deliverOutboundPayloads. Added abort checks at key points in the chain to fail fast when aborted. Files changed: - src/channels/plugins/types.core.ts: Added skipCrossContextDecoration field - src/infra/outbound/outbound-policy.ts: Check skip flag before decorating - src/agents/tools/message-tool.ts: Set skip flag, accept and pass abort signal - src/infra/outbound/message-action-runner.ts: Pass abort signal through - src/infra/outbound/outbound-send-service.ts: Check and pass abort signal - src/infra/outbound/message.ts: Pass abort signal to delivery
This commit is contained in:
parent
00e00460c1
commit
bea6b3eb5b
@ -333,7 +333,13 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool {
|
||||
name: "message",
|
||||
description,
|
||||
parameters: schema,
|
||||
execute: async (_toolCallId, args) => {
|
||||
execute: async (_toolCallId, args, signal) => {
|
||||
// Check if already aborted before doing any work
|
||||
if (signal?.aborted) {
|
||||
const err = new Error("Message send aborted");
|
||||
err.name = "AbortError";
|
||||
throw err;
|
||||
}
|
||||
const params = args as Record<string, unknown>;
|
||||
const cfg = options?.config ?? loadConfig();
|
||||
const action = readStringParam(params, "action", {
|
||||
@ -366,6 +372,9 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool {
|
||||
currentThreadTs: options?.currentThreadTs,
|
||||
replyToMode: options?.replyToMode,
|
||||
hasRepliedRef: options?.hasRepliedRef,
|
||||
// Direct tool invocations should not add cross-context decoration.
|
||||
// The agent is composing a message, not forwarding from another chat.
|
||||
skipCrossContextDecoration: true,
|
||||
}
|
||||
: undefined;
|
||||
|
||||
@ -379,6 +388,7 @@ export function createMessageTool(options?: MessageToolOptions): AnyAgentTool {
|
||||
agentId: options?.agentSessionKey
|
||||
? resolveSessionAgentId({ sessionKey: options.agentSessionKey, config: cfg })
|
||||
: undefined,
|
||||
abortSignal: signal,
|
||||
});
|
||||
|
||||
const toolResult = getToolResult(result);
|
||||
|
||||
@ -240,6 +240,12 @@ export type ChannelThreadingToolContext = {
|
||||
currentThreadTs?: string;
|
||||
replyToMode?: "off" | "first" | "all";
|
||||
hasRepliedRef?: { value: boolean };
|
||||
/**
|
||||
* When true, skip cross-context decoration (e.g., "[from X]" prefix).
|
||||
* Use this for direct tool invocations where the agent is composing a new message,
|
||||
* not forwarding/relaying a message from another conversation.
|
||||
*/
|
||||
skipCrossContextDecoration?: boolean;
|
||||
};
|
||||
|
||||
export type ChannelMessagingAdapter = {
|
||||
|
||||
@ -64,6 +64,7 @@ export type RunMessageActionParams = {
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
dryRun?: boolean;
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
export type MessageActionRunResult =
|
||||
@ -507,6 +508,7 @@ type ResolvedActionContext = {
|
||||
input: RunMessageActionParams;
|
||||
agentId?: string;
|
||||
resolvedTarget?: ResolvedMessagingTarget;
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
function resolveGateway(input: RunMessageActionParams): MessageActionRunnerGateway | undefined {
|
||||
if (!input.gateway) return undefined;
|
||||
@ -592,8 +594,28 @@ async function handleBroadcastAction(
|
||||
};
|
||||
}
|
||||
|
||||
function throwIfAborted(abortSignal?: AbortSignal): void {
|
||||
if (abortSignal?.aborted) {
|
||||
const err = new Error("Message send aborted");
|
||||
err.name = "AbortError";
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
|
||||
const { cfg, params, channel, accountId, dryRun, gateway, input, agentId, resolvedTarget } = ctx;
|
||||
const {
|
||||
cfg,
|
||||
params,
|
||||
channel,
|
||||
accountId,
|
||||
dryRun,
|
||||
gateway,
|
||||
input,
|
||||
agentId,
|
||||
resolvedTarget,
|
||||
abortSignal,
|
||||
} = ctx;
|
||||
throwIfAborted(abortSignal);
|
||||
const action: ChannelMessageActionName = "send";
|
||||
const to = readStringParam(params, "to", { required: true });
|
||||
// Support media, path, and filePath parameters for attachments
|
||||
@ -676,6 +698,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
|
||||
}
|
||||
const mirrorMediaUrls =
|
||||
mergedMediaUrls.length > 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined;
|
||||
throwIfAborted(abortSignal);
|
||||
const send = await executeSendAction({
|
||||
ctx: {
|
||||
cfg,
|
||||
@ -695,6 +718,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
|
||||
mediaUrls: mirrorMediaUrls,
|
||||
}
|
||||
: undefined,
|
||||
abortSignal,
|
||||
},
|
||||
to,
|
||||
message,
|
||||
@ -718,7 +742,8 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
|
||||
}
|
||||
|
||||
async function handlePollAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
|
||||
const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx;
|
||||
const { cfg, params, channel, accountId, dryRun, gateway, input, abortSignal } = ctx;
|
||||
throwIfAborted(abortSignal);
|
||||
const action: ChannelMessageActionName = "poll";
|
||||
const to = readStringParam(params, "to", { required: true });
|
||||
const question = readStringParam(params, "pollQuestion", {
|
||||
@ -777,7 +802,8 @@ async function handlePollAction(ctx: ResolvedActionContext): Promise<MessageActi
|
||||
}
|
||||
|
||||
async function handlePluginAction(ctx: ResolvedActionContext): Promise<MessageActionRunResult> {
|
||||
const { cfg, params, channel, accountId, dryRun, gateway, input } = ctx;
|
||||
const { cfg, params, channel, accountId, dryRun, gateway, input, abortSignal } = ctx;
|
||||
throwIfAborted(abortSignal);
|
||||
const action = input.action as Exclude<ChannelMessageActionName, "send" | "poll" | "broadcast">;
|
||||
if (dryRun) {
|
||||
return {
|
||||
@ -930,6 +956,7 @@ export async function runMessageAction(
|
||||
input,
|
||||
agentId: resolvedAgentId,
|
||||
resolvedTarget,
|
||||
abortSignal: input.abortSignal,
|
||||
});
|
||||
}
|
||||
|
||||
@ -942,6 +969,7 @@ export async function runMessageAction(
|
||||
dryRun,
|
||||
gateway,
|
||||
input,
|
||||
abortSignal: input.abortSignal,
|
||||
});
|
||||
}
|
||||
|
||||
@ -953,5 +981,6 @@ export async function runMessageAction(
|
||||
dryRun,
|
||||
gateway,
|
||||
input,
|
||||
abortSignal: input.abortSignal,
|
||||
});
|
||||
}
|
||||
|
||||
@ -50,6 +50,7 @@ type MessageSendParams = {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
export type MessageSendResult = {
|
||||
@ -167,6 +168,7 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
|
||||
gifPlayback: params.gifPlayback,
|
||||
deps: params.deps,
|
||||
bestEffort: params.bestEffort,
|
||||
abortSignal: params.abortSignal,
|
||||
mirror: params.mirror
|
||||
? {
|
||||
...params.mirror,
|
||||
|
||||
@ -119,6 +119,8 @@ export async function buildCrossContextDecoration(params: {
|
||||
accountId?: string | null;
|
||||
}): Promise<CrossContextDecoration | null> {
|
||||
if (!params.toolContext?.currentChannelId) return null;
|
||||
// Skip decoration for direct tool sends (agent composing, not forwarding)
|
||||
if (params.toolContext.skipCrossContextDecoration) return null;
|
||||
if (!isCrossContextTarget(params)) return null;
|
||||
|
||||
const markerConfig = params.cfg.tools?.message?.crossContext?.marker;
|
||||
|
||||
@ -32,6 +32,7 @@ export type OutboundSendContext = {
|
||||
text?: string;
|
||||
mediaUrls?: string[];
|
||||
};
|
||||
abortSignal?: AbortSignal;
|
||||
};
|
||||
|
||||
function extractToolPayload(result: AgentToolResult<unknown>): unknown {
|
||||
@ -56,6 +57,14 @@ function extractToolPayload(result: AgentToolResult<unknown>): unknown {
|
||||
return result.content ?? result;
|
||||
}
|
||||
|
||||
function throwIfAborted(abortSignal?: AbortSignal): void {
|
||||
if (abortSignal?.aborted) {
|
||||
const err = new Error("Message send aborted");
|
||||
err.name = "AbortError";
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeSendAction(params: {
|
||||
ctx: OutboundSendContext;
|
||||
to: string;
|
||||
@ -70,6 +79,7 @@ export async function executeSendAction(params: {
|
||||
toolResult?: AgentToolResult<unknown>;
|
||||
sendResult?: MessageSendResult;
|
||||
}> {
|
||||
throwIfAborted(params.ctx.abortSignal);
|
||||
if (!params.ctx.dryRun) {
|
||||
const handled = await dispatchChannelMessageAction({
|
||||
channel: params.ctx.channel,
|
||||
@ -103,6 +113,7 @@ export async function executeSendAction(params: {
|
||||
}
|
||||
}
|
||||
|
||||
throwIfAborted(params.ctx.abortSignal);
|
||||
const result: MessageSendResult = await sendMessage({
|
||||
cfg: params.ctx.cfg,
|
||||
to: params.to,
|
||||
@ -117,6 +128,7 @@ export async function executeSendAction(params: {
|
||||
deps: params.ctx.deps,
|
||||
gateway: params.ctx.gateway,
|
||||
mirror: params.ctx.mirror,
|
||||
abortSignal: params.ctx.abortSignal,
|
||||
});
|
||||
|
||||
return {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user