From 96149d1f71c12bef594173505d439361cf48921a Mon Sep 17 00:00:00 2001 From: Mauro Bolis Date: Fri, 9 Jan 2026 14:59:36 +0100 Subject: [PATCH] fix: honor slack reply threading --- src/auto-reply/reply/agent-runner.ts | 4 +- src/auto-reply/reply/reply-payloads.ts | 3 +- src/auto-reply/reply/reply-threading.test.ts | 7 ++ src/auto-reply/reply/reply-threading.ts | 6 +- src/auto-reply/types.ts | 1 + src/slack/monitor.tool-result.test.ts | 110 +++++++++++++++++++ src/slack/monitor.ts | 23 +++- 7 files changed, 145 insertions(+), 9 deletions(-) diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 48bfc7cbc..43c4183b3 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -260,7 +260,9 @@ export async function runReplyAgent(params: { followupRun.run.config, replyToChannel, ); - const applyReplyToMode = createReplyToModeFilter(replyToMode); + const applyReplyToMode = createReplyToModeFilter(replyToMode, { + allowTagsWhenOff: replyToChannel === "slack", + }); const cfg = followupRun.run.config; if (shouldSteer && isStreaming) { diff --git a/src/auto-reply/reply/reply-payloads.ts b/src/auto-reply/reply/reply-payloads.ts index ad1b78309..15fcd6931 100644 --- a/src/auto-reply/reply/reply-payloads.ts +++ b/src/auto-reply/reply/reply-payloads.ts @@ -10,7 +10,7 @@ export function applyReplyTagsToPayload( currentMessageId?: string, ): ReplyPayload { if (typeof payload.text !== "string") return payload; - const { cleaned, replyToId } = extractReplyToTag( + const { cleaned, replyToId, hasTag } = extractReplyToTag( payload.text, currentMessageId, ); @@ -18,6 +18,7 @@ export function applyReplyTagsToPayload( ...payload, text: cleaned ? cleaned : undefined, replyToId: replyToId ?? payload.replyToId, + replyToTag: hasTag || payload.replyToTag, }; } diff --git a/src/auto-reply/reply/reply-threading.test.ts b/src/auto-reply/reply/reply-threading.test.ts index 19b0ea3a0..d6fb935d0 100644 --- a/src/auto-reply/reply/reply-threading.test.ts +++ b/src/auto-reply/reply/reply-threading.test.ts @@ -40,6 +40,13 @@ describe("createReplyToModeFilter", () => { expect(filter({ text: "hi", replyToId: "1" }).replyToId).toBeUndefined(); }); + it("keeps replyToId when mode is off and reply tags are allowed", () => { + const filter = createReplyToModeFilter("off", { allowTagsWhenOff: true }); + expect( + filter({ text: "hi", replyToId: "1", replyToTag: true }).replyToId, + ).toBe("1"); + }); + it("keeps replyToId when mode is all", () => { const filter = createReplyToModeFilter("all"); expect(filter({ text: "hi", replyToId: "1" }).replyToId).toBe("1"); diff --git a/src/auto-reply/reply/reply-threading.ts b/src/auto-reply/reply/reply-threading.ts index af84bcb7d..3dafa325f 100644 --- a/src/auto-reply/reply/reply-threading.ts +++ b/src/auto-reply/reply/reply-threading.ts @@ -19,11 +19,15 @@ export function resolveReplyToMode( } } -export function createReplyToModeFilter(mode: ReplyToMode) { +export function createReplyToModeFilter( + mode: ReplyToMode, + opts: { allowTagsWhenOff?: boolean } = {}, +) { let hasThreaded = false; return (payload: ReplyPayload): ReplyPayload => { if (!payload.replyToId) return payload; if (mode === "off") { + if (opts.allowTagsWhenOff && payload.replyToTag) return payload; return { ...payload, replyToId: undefined }; } if (mode === "all") return payload; diff --git a/src/auto-reply/types.ts b/src/auto-reply/types.ts index 7f69aeff9..a276fe66d 100644 --- a/src/auto-reply/types.ts +++ b/src/auto-reply/types.ts @@ -28,6 +28,7 @@ export type ReplyPayload = { mediaUrl?: string; mediaUrls?: string[]; replyToId?: string; + replyToTag?: boolean; /** Send audio as voice message (bubble) instead of audio file. Defaults to false. */ audioAsVoice?: boolean; isError?: boolean; diff --git a/src/slack/monitor.tool-result.test.ts b/src/slack/monitor.tool-result.test.ts index 77cf26e9a..bd21f2701 100644 --- a/src/slack/monitor.tool-result.test.ts +++ b/src/slack/monitor.tool-result.test.ts @@ -283,6 +283,17 @@ describe("monitorSlackProvider tool results", () => { it("threads replies when incoming message is in a thread", async () => { replyMock.mockResolvedValue({ text: "thread reply" }); + config = { + messages: { + responsePrefix: "PFX", + ackReaction: "👀", + ackReactionScope: "group-mentions", + }, + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + replyToMode: "first", + }, + }; const controller = new AbortController(); const run = monitorSlackProvider({ @@ -315,6 +326,50 @@ describe("monitorSlackProvider tool results", () => { expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "456" }); }); + it("threads top-level replies when replyToMode is all", async () => { + replyMock.mockResolvedValue({ text: "thread reply" }); + config = { + messages: { + responsePrefix: "PFX", + ackReaction: "👀", + ackReactionScope: "group-mentions", + }, + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + replyToMode: "all", + }, + }; + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "hello", + ts: "123", + channel: "C1", + channel_type: "im", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "123" }); + }); + it("treats parent_user_id as a thread reply even when thread_ts matches ts", async () => { replyMock.mockResolvedValue({ text: "thread reply" }); @@ -484,6 +539,17 @@ describe("monitorSlackProvider tool results", () => { it("keeps replies in channel root when message is not threaded", async () => { replyMock.mockResolvedValue({ text: "root reply" }); + config = { + messages: { + responsePrefix: "PFX", + ackReaction: "👀", + ackReactionScope: "group-mentions", + }, + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + replyToMode: "first", + }, + }; const controller = new AbortController(); const run = monitorSlackProvider({ @@ -515,6 +581,50 @@ describe("monitorSlackProvider tool results", () => { expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: undefined }); }); + it("forces thread replies when replyToId is set", async () => { + replyMock.mockResolvedValue({ text: "forced reply", replyToId: "555" }); + config = { + messages: { + responsePrefix: "PFX", + ackReaction: "👀", + ackReactionScope: "group-mentions", + }, + slack: { + dm: { enabled: true, policy: "open", allowFrom: ["*"] }, + replyToMode: "off", + }, + }; + + const controller = new AbortController(); + const run = monitorSlackProvider({ + botToken: "bot-token", + appToken: "app-token", + abortSignal: controller.signal, + }); + + await waitForEvent("message"); + const handler = getSlackHandlers()?.get("message"); + if (!handler) throw new Error("Slack message handler not registered"); + + await handler({ + event: { + type: "message", + user: "U1", + text: "hello", + ts: "789", + channel: "C1", + channel_type: "im", + }, + }); + + await flush(); + controller.abort(); + await run; + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock.mock.calls[0][2]).toMatchObject({ threadTs: "555" }); + }); + it("reacts to mention-gated room messages when ackReaction is enabled", async () => { replyMock.mockResolvedValue(undefined); const client = getSlackClient(); diff --git a/src/slack/monitor.ts b/src/slack/monitor.ts index f7d35cd33..ffcbd105c 100644 --- a/src/slack/monitor.ts +++ b/src/slack/monitor.ts @@ -86,6 +86,7 @@ type SlackMessageEvent = { text?: string; ts?: string; thread_ts?: string; + event_ts?: string; parent_user_id?: string; channel: string; channel_type?: "im" | "mpim" | "channel" | "group"; @@ -100,6 +101,7 @@ type SlackAppMentionEvent = { text?: string; ts?: string; thread_ts?: string; + event_ts?: string; parent_user_id?: string; channel: string; channel_type?: "im" | "mpim" | "channel" | "group"; @@ -506,6 +508,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const useAccessGroups = cfg.commands?.useAccessGroups !== false; const reactionMode = slackCfg.reactionNotifications ?? "own"; const reactionAllowlist = slackCfg.reactionAllowlist ?? []; + const replyToMode = slackCfg.replyToMode ?? "off"; const slashCommand = resolveSlackSlashCommandConfig( opts.slashCommand ?? slackCfg.slashCommand, ); @@ -1096,9 +1099,16 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { ); } - // Only thread replies if the incoming message was in a thread. const incomingThreadTs = message.thread_ts; - const statusThreadTs = message.thread_ts ?? message.ts; + const eventTs = message.event_ts; + const replyThreadTs = + replyToMode === "all" + ? (incomingThreadTs ?? message.ts ?? eventTs) + : replyToMode === "first" + ? incomingThreadTs + : undefined; + const statusThreadTs = + replyThreadTs ?? incomingThreadTs ?? message.ts ?? eventTs; let didSetStatus = false; const onReplyStart = async () => { didSetStatus = true; @@ -1119,7 +1129,7 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { accountId: account.accountId, runtime, textLimit, - threadTs: incomingThreadTs, + replyThreadTs, }); }, onError: (err, info) => { @@ -1922,10 +1932,11 @@ async function deliverReplies(params: { accountId?: string; runtime: RuntimeEnv; textLimit: number; - threadTs?: string; + replyThreadTs?: string; }) { const chunkLimit = Math.min(params.textLimit, 4000); for (const payload of params.replies) { + const threadTs = payload.replyToId ?? params.replyThreadTs; const mediaList = payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []); const text = payload.text ?? ""; @@ -1937,7 +1948,7 @@ async function deliverReplies(params: { if (!trimmed || trimmed === SILENT_REPLY_TOKEN) continue; await sendMessageSlack(params.target, trimmed, { token: params.token, - threadTs: params.threadTs, + threadTs, accountId: params.accountId, }); } @@ -1949,7 +1960,7 @@ async function deliverReplies(params: { await sendMessageSlack(params.target, caption, { token: params.token, mediaUrl, - threadTs: params.threadTs, + threadTs, accountId: params.accountId, }); }