diff --git a/extensions/slack/src/channel.ts b/extensions/slack/src/channel.ts index 8323a9ce0..b738c9c79 100644 --- a/extensions/slack/src/channel.ts +++ b/extensions/slack/src/channel.ts @@ -263,7 +263,11 @@ export const slackPlugin: ChannelPlugin = { const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const replyToRaw = typeof args.replyTo === "string" ? args.replyTo.trim() : ""; + const threadTsRaw = typeof args.threadTs === "string" ? args.threadTs.trim() : ""; + const threadId = threadIdRaw || replyToRaw || threadTsRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async ({ action, params, cfg, accountId, toolContext }) => { const resolveChannelId = () => diff --git a/skills/slack/SKILL.md b/skills/slack/SKILL.md index bd80c4f81..7521fcf41 100644 --- a/skills/slack/SKILL.md +++ b/skills/slack/SKILL.md @@ -16,7 +16,7 @@ Use `slack` to react, manage pins, send/edit/delete messages, and fetch member i - For reactions, an `emoji` (Unicode or `:name:`). - For message sends, a `to` target (`channel:` or `user:`) and `content`. -Message context lines include `slack message id` and `channel` fields you can reuse directly. +Message text no longer includes `slack message id`. Use metadata fields like `MessageSid` or `ReplyToId`. ## Actions diff --git a/src/agents/pi-embedded-messaging.ts b/src/agents/pi-embedded-messaging.ts index 5aae66fd4..8d91f7486 100644 --- a/src/agents/pi-embedded-messaging.ts +++ b/src/agents/pi-embedded-messaging.ts @@ -5,6 +5,7 @@ export type MessagingToolSend = { provider: string; accountId?: string; to?: string; + threadId?: string | number; }; const CORE_MESSAGING_TOOLS = new Set(["sessions_send", "message"]); diff --git a/src/agents/pi-embedded-subscribe.tools.ts b/src/agents/pi-embedded-subscribe.tools.ts index 195a70a64..232872e40 100644 --- a/src/agents/pi-embedded-subscribe.tools.ts +++ b/src/agents/pi-embedded-subscribe.tools.ts @@ -118,6 +118,14 @@ export function extractMessagingToolSend( toolName: string, args: Record, ): MessagingToolSend | undefined { + const readThreadId = (value: unknown): string | number | undefined => { + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; + } + if (typeof value === "number" && Number.isFinite(value)) return value; + return undefined; + }; // Provider docking: new provider tools must implement plugin.actions.extractToolSend. const action = typeof args.action === "string" ? args.action.trim() : ""; const accountIdRaw = typeof args.accountId === "string" ? args.accountId.trim() : undefined; @@ -132,7 +140,11 @@ export function extractMessagingToolSend( const providerId = providerHint ? normalizeChannelId(providerHint) : null; const provider = providerId ?? (providerHint ? providerHint.toLowerCase() : "message"); const to = normalizeTargetForProvider(provider, toRaw); - return to ? { tool: toolName, provider, accountId, to } : undefined; + let threadId = readThreadId(args.threadId); + if (!threadId && provider === "slack") { + threadId = readThreadId(args.replyTo ?? args.threadTs); + } + return to ? { tool: toolName, provider, accountId, to, threadId } : undefined; } const providerId = normalizeChannelId(toolName); if (!providerId) return undefined; @@ -146,6 +158,7 @@ export function extractMessagingToolSend( provider: providerId, accountId: extracted.accountId ?? accountId, to, + threadId: extracted.threadId, } : undefined; } diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 60d6fa763..5694c761c 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -31,6 +31,7 @@ export function buildReplyPayloads(params: { typeof shouldSuppressMessagingToolReplies >[0]["messagingToolSentTargets"]; originatingTo?: string; + originatingThreadId?: string | number; accountId?: string; }): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } { let didLogHeartbeatStrip = params.didLogHeartbeatStrip; @@ -94,7 +95,9 @@ export function buildReplyPayloads(params: { messageProvider: params.messageProvider, messagingToolSentTargets, originatingTo: params.originatingTo, + originatingThreadId: params.originatingThreadId, accountId: params.accountId, + replyToMode: params.replyToMode, }); const dedupedPayloads = filterMessagingToolDuplicates({ payloads: replyTaggedPayloads, diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 227e6f17e..95897b4bb 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -408,6 +408,7 @@ export async function runReplyAgent(params: { messagingToolSentTexts: runResult.messagingToolSentTexts, messagingToolSentTargets: runResult.messagingToolSentTargets, originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To, + originatingThreadId: sessionCtx.MessageThreadId, accountId: sessionCtx.AccountId, }); const { replyPayloads } = payloadResult; diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 77edf66e5..dce3e4441 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -245,7 +245,9 @@ export function createFollowupRunner(params: { messageProvider: queued.run.messageProvider, messagingToolSentTargets: runResult.messagingToolSentTargets, originatingTo: queued.originatingTo, + originatingThreadId: queued.originatingThreadId, accountId: queued.run.agentAccountId, + replyToMode, }); const finalPayloads = suppressMessagingToolReplies ? [] : dedupedPayloads; diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index ecb28cf00..ab2d52fad 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -82,24 +82,46 @@ export function shouldSuppressMessagingToolReplies(params: { messageProvider?: string; messagingToolSentTargets?: MessagingToolSend[]; originatingTo?: string; + originatingThreadId?: string | number; accountId?: string; + replyToMode?: ReplyToMode; }): boolean { const provider = params.messageProvider?.trim().toLowerCase(); if (!provider) return false; const originTarget = normalizeTargetForProvider(provider, params.originatingTo); if (!originTarget) return false; + const normalizeThreadId = (value?: string | number) => { + if (value === undefined || value === null) return undefined; + if (typeof value === "string") { + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; + } + return Number.isFinite(value) ? String(value) : undefined; + }; + const originThreadId = normalizeThreadId(params.originatingThreadId); const originAccount = normalizeAccountId(params.accountId); + const shouldAssumeAutoThread = + provider === "slack" && + (params.replyToMode === "all" || params.replyToMode === "first") && + Boolean(originThreadId); const sentTargets = params.messagingToolSentTargets ?? []; if (sentTargets.length === 0) return false; return sentTargets.some((target) => { if (!target?.provider) return false; if (target.provider.trim().toLowerCase() !== provider) return false; const targetKey = normalizeTargetForProvider(provider, target.to); - if (!targetKey) return false; + if (!targetKey || targetKey !== originTarget) return false; const targetAccount = normalizeAccountId(target.accountId); if (originAccount && targetAccount && originAccount !== targetAccount) { return false; } - return targetKey === originTarget; + let targetThreadId = normalizeThreadId(target.threadId); + if (!targetThreadId && shouldAssumeAutoThread) { + targetThreadId = originThreadId; + } + if (originThreadId || targetThreadId) { + return originThreadId === targetThreadId; + } + return true; }); } diff --git a/src/channels/plugins/actions/telegram.ts b/src/channels/plugins/actions/telegram.ts index 693e94492..4c87f3d62 100644 --- a/src/channels/plugins/actions/telegram.ts +++ b/src/channels/plugins/actions/telegram.ts @@ -69,7 +69,9 @@ export const telegramMessageActions: ChannelMessageActionAdapter = { const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const threadId = threadIdRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async ({ action, params, cfg, accountId }) => { if (action === "send") { diff --git a/src/channels/plugins/slack.actions.ts b/src/channels/plugins/slack.actions.ts index ca8aa6fb8..fbb8ff617 100644 --- a/src/channels/plugins/slack.actions.ts +++ b/src/channels/plugins/slack.actions.ts @@ -54,7 +54,11 @@ export function createSlackActions(providerId: string): ChannelMessageActionAdap const to = typeof args.to === "string" ? args.to : undefined; if (!to) return null; const accountId = typeof args.accountId === "string" ? args.accountId.trim() : undefined; - return { to, accountId }; + const threadIdRaw = typeof args.threadId === "string" ? args.threadId.trim() : ""; + const replyToRaw = typeof args.replyTo === "string" ? args.replyTo.trim() : ""; + const threadTsRaw = typeof args.threadTs === "string" ? args.threadTs.trim() : ""; + const threadId = threadIdRaw || replyToRaw || threadTsRaw || undefined; + return { to, accountId, threadId }; }, handleAction: async (ctx: ChannelMessageActionContext) => { const { action, params, cfg } = ctx; diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index dcedc0a9c..842b2faba 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -304,6 +304,7 @@ export type ChannelMessageActionContext = { export type ChannelToolSend = { to: string; accountId?: string | null; + threadId?: string | number; }; export type ChannelMessageActionAdapter = { diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 1ee736496..82837e881 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -32,7 +32,9 @@ export function createSlackMessageHandler(params: { const senderId = entry.message.user ?? entry.message.bot_id; if (!senderId) return null; const messageTs = entry.message.ts ?? entry.message.event_ts; - // If Slack flags a thread reply but omits thread_ts, isolate it from root debouncing. + // If we get a Slack event that looks like a thread reply (has parent_user_id) + // but is missing thread_ts, avoid debouncing it into the channel root bucket. + // We'll attempt to resolve the missing thread_ts later in prepareSlackMessage. const threadKey = entry.message.thread_ts ? `${entry.message.channel}:${entry.message.thread_ts}` : entry.message.parent_user_id && messageTs diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 38b69f049..892f51c3b 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -44,8 +44,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const messageTs = message.ts ?? message.event_ts; const incomingThreadTs = message.thread_ts; + const toolThreadTs = incomingThreadTs ?? (ctx.replyToMode === "all" ? messageTs : undefined); let didSetStatus = false; + if (shouldLogVerbose()) { + logVerbose( + `slack threading: inbound channel=${message.channel} ts=${messageTs ?? "unknown"} thread_ts=${incomingThreadTs ?? "none"} replyToMode=${ctx.replyToMode}`, + ); + } + // Shared mutable ref for "replyToMode=first". Both tool + auto-reply flows // mark this to ensure only the first reply is threaded. const hasRepliedRef = { value: false }; @@ -101,8 +108,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag responsePrefix: prefixContext.responsePrefix, responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload) => { - const replyThreadTs = replyPlan.nextThreadTs(); + deliver: async (payload, info) => { + const replyThreadTs = info.kind === "tool" ? toolThreadTs : replyPlan.nextThreadTs(); + if (shouldLogVerbose()) { + logVerbose( + `slack threading: deliver kind=${info.kind} target=${prepared.replyTarget} thread_ts=${replyThreadTs ?? "none"} payload.replyToId=${payload.replyToId ?? "none"}`, + ); + } await deliverReplies({ replies: [payload], target: prepared.replyTarget, @@ -112,7 +124,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag textLimit: ctx.textLimit, replyThreadTs, }); - replyPlan.markSent(); + if (info.kind !== "tool") { + replyPlan.markSent(); + } }, onError: (err, info) => { runtime.error?.(danger(`slack ${info.kind} reply failed: ${String(err)}`)); diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 8a2a9e111..259233878 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -1,3 +1,5 @@ +import type { WebClient as SlackWebClient } from "@slack/web-api"; + import { resolveAckReaction } from "../../../agents/identity.js"; import { hasControlCommand } from "../../../auto-reply/command-detection.js"; import { shouldHandleTextCommands } from "../../../auto-reply/commands-registry.js"; @@ -48,13 +50,41 @@ import { resolveSlackMedia, resolveSlackThreadStarter } from "../media.js"; import type { PreparedSlackMessage } from "./types.js"; +async function resolveSlackThreadTsFromHistory(params: { + client: SlackWebClient; + channelId: string; + messageTs: string; +}): Promise { + try { + const response = (await params.client.conversations.history({ + channel: params.channelId, + latest: params.messageTs, + oldest: params.messageTs, + inclusive: true, + limit: 1, + })) as { messages?: Array<{ ts?: string; thread_ts?: string }> }; + const message = + response.messages?.find((entry) => entry.ts === params.messageTs) ?? response.messages?.[0]; + const threadTs = message?.thread_ts?.trim(); + return threadTs || undefined; + } catch (err) { + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: failed to resolve thread_ts via conversations.history for channel=${params.channelId} ts=${params.messageTs}: ${String(err)}`, + ); + } + return undefined; + } +} + export async function prepareSlackMessage(params: { ctx: SlackMonitorContext; account: ResolvedSlackAccount; message: SlackMessageEvent; opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; }): Promise { - const { ctx, account, message, opts } = params; + const { ctx, account, opts } = params; + let message = params.message; const cfg = ctx.cfg; let channelInfo: { @@ -193,18 +223,48 @@ export async function prepareSlackMessage(params: { }, }); + // Slack occasionally delivers events that include parent_user_id (so they're thread replies) + // but omit thread_ts. If we don't recover the thread_ts, replies can end up in the channel root. + if (!message.thread_ts && message.parent_user_id && message.ts) { + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: missing thread_ts for thread reply channel=${message.channel} ts=${message.ts} source=${opts.source}`, + ); + } + const resolved = await resolveSlackThreadTsFromHistory({ + client: ctx.app.client, + channelId: message.channel, + messageTs: message.ts, + }); + if (resolved) { + message = { ...message, thread_ts: resolved }; + if (shouldLogVerbose()) { + logVerbose( + `slack inbound: resolved missing thread_ts channel=${message.channel} ts=${message.ts} -> thread_ts=${resolved}`, + ); + } + } else if (shouldLogVerbose()) { + logVerbose( + `slack inbound: could not resolve missing thread_ts channel=${message.channel} ts=${message.ts}`, + ); + } + } + const baseSessionKey = route.sessionKey; const threadContext = resolveSlackThreadContext({ message, replyToMode: ctx.replyToMode }); const threadTs = threadContext.incomingThreadTs; const isThreadReply = threadContext.isThreadReply; + const autoThreadId = + !isThreadReply && ctx.replyToMode === "all" ? threadContext.messageThreadId : undefined; + const threadSessionId = isThreadReply ? threadTs : autoThreadId; const threadKeys = resolveThreadSessionKeys({ baseSessionKey, - threadId: isThreadReply ? threadTs : undefined, - parentSessionKey: isThreadReply && ctx.threadInheritParent ? baseSessionKey : undefined, + threadId: threadSessionId, + parentSessionKey: threadSessionId && ctx.threadInheritParent ? baseSessionKey : undefined, }); const sessionKey = threadKeys.sessionKey; const historyKey = - isThreadReply && ctx.threadHistoryScope === "thread" ? sessionKey : message.channel; + threadSessionId && ctx.threadHistoryScope === "thread" ? sessionKey : message.channel; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const hasAnyMention = /<@[^>]+>/.test(message.text ?? ""); @@ -395,20 +455,19 @@ export async function prepareSlackMessage(params: { GroupSubject: isRoomish ? roomLabel : undefined, From: slackFrom, }) ?? (isDirectMessage ? senderName : roomLabel); - const textWithId = `${rawBody}\n[slack message id: ${message.ts} channel: ${message.channel}]`; const storePath = resolveStorePath(ctx.cfg.session?.store, { agentId: route.agentId, }); const envelopeOptions = resolveEnvelopeFormatOptions(ctx.cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey, }); const body = formatInboundEnvelope({ channel: "Slack", from: envelopeFrom, timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined, - body: textWithId, + body: rawBody, chatType: isDirectMessage ? "direct" : "channel", sender: { name: senderName, id: senderId }, previousTimestamp, @@ -427,9 +486,7 @@ export async function prepareSlackMessage(params: { channel: "Slack", from: roomLabel, timestamp: entry.timestamp, - body: `${entry.body}${ - entry.messageId ? ` [id:${entry.messageId} channel:${message.channel}]` : "" - }`, + body: entry.body, chatType: "channel", senderLabel: entry.sender, envelope: envelopeOptions, @@ -463,12 +520,11 @@ export async function prepareSlackMessage(params: { if (starter?.text) { const starterUser = starter.userId ? await ctx.resolveUserName(starter.userId) : null; const starterName = starterUser?.name ?? starter.userId ?? "Unknown"; - const starterWithId = `${starter.text}\n[slack message id: ${starter.ts ?? threadTs} channel: ${message.channel}]`; threadStarterBody = formatThreadStarterEnvelope({ channel: "Slack", author: starterName, timestamp: starter.ts ? Math.round(Number(starter.ts) * 1000) : undefined, - body: starterWithId, + body: starter.text, envelope: envelopeOptions, }); const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80); diff --git a/src/slack/monitor/replies.ts b/src/slack/monitor/replies.ts index 314be285f..a10482ba6 100644 --- a/src/slack/monitor/replies.ts +++ b/src/slack/monitor/replies.ts @@ -44,7 +44,7 @@ export async function deliverReplies(params: { }); } } - params.runtime.log?.(`delivered reply to ${params.target}`); + params.runtime.log?.(`delivered reply to ${params.target} threadTs=${threadTs ?? "none"}`); } }