diff --git a/extensions/slack/src/channel.ts b/extensions/slack/src/channel.ts index 8323a9ce0..b738c9c79 100644 --- a/extensions/slack/src/channel.ts +++ b/extensions/slack/src/channel.ts @@ -263,7 +263,11 @@ export const slackPlugin: ChannelPlugin = { const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const replyToRaw = typeof args.replyTo === "string" ? args.replyTo.trim() : ""; + const threadTsRaw = typeof args.threadTs === "string" ? args.threadTs.trim() : ""; + const threadId = threadIdRaw || replyToRaw || threadTsRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async ({ action, params, cfg, accountId, toolContext }) => { const resolveChannelId = () => diff --git a/src/agents/pi-embedded-messaging.ts b/src/agents/pi-embedded-messaging.ts index 5aae66fd4..8d91f7486 100644 --- a/src/agents/pi-embedded-messaging.ts +++ b/src/agents/pi-embedded-messaging.ts @@ -5,6 +5,7 @@ export type MessagingToolSend = { provider: string; accountId?: string; to?: string; + threadId?: string | number; }; const CORE_MESSAGING_TOOLS = new Set(["sessions_send", "message"]); diff --git a/src/agents/pi-embedded-subscribe.tools.ts b/src/agents/pi-embedded-subscribe.tools.ts index 195a70a64..232872e40 100644 --- a/src/agents/pi-embedded-subscribe.tools.ts +++ b/src/agents/pi-embedded-subscribe.tools.ts @@ -118,6 +118,14 @@ export function extractMessagingToolSend( toolName: string, args: Record, ): MessagingToolSend | undefined { + const readThreadId = (value: unknown): string | number | undefined => { + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; + } + if (typeof value === "number" && Number.isFinite(value)) return value; + return undefined; + }; // Provider docking: new provider tools must implement plugin.actions.extractToolSend. const action = typeof args.action === "string" ? args.action.trim() : ""; const accountIdRaw = typeof args.accountId === "string" ? args.accountId.trim() : undefined; @@ -132,7 +140,11 @@ export function extractMessagingToolSend( const providerId = providerHint ? normalizeChannelId(providerHint) : null; const provider = providerId ?? (providerHint ? providerHint.toLowerCase() : "message"); const to = normalizeTargetForProvider(provider, toRaw); - return to ? { tool: toolName, provider, accountId, to } : undefined; + let threadId = readThreadId(args.threadId); + if (!threadId && provider === "slack") { + threadId = readThreadId(args.replyTo ?? args.threadTs); + } + return to ? { tool: toolName, provider, accountId, to, threadId } : undefined; } const providerId = normalizeChannelId(toolName); if (!providerId) return undefined; @@ -146,6 +158,7 @@ export function extractMessagingToolSend( provider: providerId, accountId: extracted.accountId ?? accountId, to, + threadId: extracted.threadId, } : undefined; } diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 60d6fa763..6e03e37b8 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -31,6 +31,7 @@ export function buildReplyPayloads(params: { typeof shouldSuppressMessagingToolReplies >[0]["messagingToolSentTargets"]; originatingTo?: string; + originatingThreadId?: string | number; accountId?: string; }): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } { let didLogHeartbeatStrip = params.didLogHeartbeatStrip; @@ -94,6 +95,7 @@ export function buildReplyPayloads(params: { messageProvider: params.messageProvider, messagingToolSentTargets, originatingTo: params.originatingTo, + originatingThreadId: params.originatingThreadId, accountId: params.accountId, }); const dedupedPayloads = filterMessagingToolDuplicates({ diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 227e6f17e..95897b4bb 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -408,6 +408,7 @@ export async function runReplyAgent(params: { messagingToolSentTexts: runResult.messagingToolSentTexts, messagingToolSentTargets: runResult.messagingToolSentTargets, originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To, + originatingThreadId: sessionCtx.MessageThreadId, accountId: sessionCtx.AccountId, }); const { replyPayloads } = payloadResult; diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index ecb28cf00..6afe2cbb4 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -82,12 +82,22 @@ export function shouldSuppressMessagingToolReplies(params: { messageProvider?: string; messagingToolSentTargets?: MessagingToolSend[]; originatingTo?: string; + originatingThreadId?: string | number; accountId?: string; }): boolean { const provider = params.messageProvider?.trim().toLowerCase(); if (!provider) return false; const originTarget = normalizeTargetForProvider(provider, params.originatingTo); if (!originTarget) return false; + const normalizeThreadId = (value?: string | number) => { + if (value === undefined || value === null) return undefined; + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; + } + return Number.isFinite(value) ? String(value) : undefined; + }; + const originThreadId = normalizeThreadId(params.originatingThreadId); const originAccount = normalizeAccountId(params.accountId); const sentTargets = params.messagingToolSentTargets ?? []; if (sentTargets.length === 0) return false; @@ -95,11 +105,15 @@ export function shouldSuppressMessagingToolReplies(params: { if (!target?.provider) return false; if (target.provider.trim().toLowerCase() !== provider) return false; const targetKey = normalizeTargetForProvider(provider, target.to); - if (!targetKey) return false; + if (!targetKey || targetKey !== originTarget) return false; const targetAccount = normalizeAccountId(target.accountId); if (originAccount && targetAccount && originAccount !== targetAccount) { return false; } - return targetKey === originTarget; + const targetThreadId = normalizeThreadId(target.threadId); + if (originThreadId || targetThreadId) { + return originThreadId === targetThreadId; + } + return true; }); } diff --git a/src/channels/plugins/actions/telegram.ts b/src/channels/plugins/actions/telegram.ts index 364707e0a..cf7b7b335 100644 --- a/src/channels/plugins/actions/telegram.ts +++ b/src/channels/plugins/actions/telegram.ts @@ -62,7 +62,9 @@ export const telegramMessageActions: ChannelMessageActionAdapter = { const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const threadId = threadIdRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async ({ action, params, cfg, accountId }) => { if (action === "send") { diff --git a/src/channels/plugins/slack.actions.ts b/src/channels/plugins/slack.actions.ts index ca8aa6fb8..fbb8ff617 100644 --- a/src/channels/plugins/slack.actions.ts +++ b/src/channels/plugins/slack.actions.ts @@ -54,7 +54,11 @@ export function createSlackActions(providerId: string): ChannelMessageActionAdap const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const replyToRaw = typeof args.replyTo === "string" ? args.replyTo.trim() : ""; + const threadTsRaw = typeof args.threadTs === "string" ? args.threadTs.trim() : ""; + const threadId = threadIdRaw || replyToRaw || threadTsRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async (ctx: ChannelMessageActionContext) => { const { action, params, cfg } = ctx; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 6a76743f2..790e01ab0 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -300,6 +300,7 @@ export type ChannelMessageActionContext = { export type ChannelToolSend = { to: string; accountId?: string | null; + threadId?: string | number; }; export type ChannelMessageActionAdapter = { diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 38b69f049..1df3deaee 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -44,6 +44,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const messageTs = message.ts ?? message.event_ts; const incomingThreadTs = message.thread_ts; + const toolThreadTs = incomingThreadTs ?? (ctx.replyToMode === "all" ? messageTs : undefined); let didSetStatus = false; // Shared mutable ref for "replyToMode=first". Both tool + auto-reply flows @@ -101,8 +102,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag responsePrefix: prefixContext.responsePrefix, responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload) => { - const replyThreadTs = replyPlan.nextThreadTs(); + deliver: async (payload, info) => { + const replyThreadTs = info.kind === "tool" ? toolThreadTs : replyPlan.nextThreadTs(); + if (shouldLogVerbose()) { + logVerbose( + `slack threading: deliver kind=${info.kind} target=${prepared.replyTarget} thread_ts=${replyThreadTs ?? "none"} payload.replyToId=${payload.replyToId ?? "none"}`, + ); + } await deliverReplies({ replies: [payload], target: prepared.replyTarget, @@ -112,7 +118,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag textLimit: ctx.textLimit, replyThreadTs, }); - replyPlan.markSent(); + if (info.kind !== "tool") { + replyPlan.markSent(); + } }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 8a2a9e111..5073d262e 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -197,14 +197,17 @@ export async function prepareSlackMessage(params: { const threadContext = resolveSlackThreadContext({ message, replyToMode: ctx.replyToMode }); const threadTs = threadContext.incomingThreadTs; const isThreadReply = threadContext.isThreadReply; + const autoThreadId = + !isThreadReply && ctx.replyToMode === "all" ? threadContext.messageThreadId : undefined; + const threadSessionId = isThreadReply ? threadTs : autoThreadId; const threadKeys = resolveThreadSessionKeys({ baseSessionKey, - threadId: isThreadReply ? threadTs : undefined, - parentSessionKey: isThreadReply && ctx.threadInheritParent ? baseSessionKey : undefined, + threadId: threadSessionId, + parentSessionKey: threadSessionId && ctx.threadInheritParent ? baseSessionKey : undefined, }); const sessionKey = threadKeys.sessionKey; const historyKey = - isThreadReply && ctx.threadHistoryScope === "thread" ? sessionKey : message.channel; + threadSessionId && ctx.threadHistoryScope === "thread" ? sessionKey : message.channel; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const hasAnyMention = /<@[^>]+>/.test(message.text ?? ""); @@ -402,7 +405,7 @@ export async function prepareSlackMessage(params: { const envelopeOptions = resolveEnvelopeFormatOptions(ctx.cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey, }); const body = formatInboundEnvelope({ channel: "Slack",