diff --git a/CHANGELOG.md b/CHANGELOG.md index e72e9b7ef..6c84644b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ Docs: https://docs.clawd.bot - Docs: add Bedrock EC2 instance role setup + IAM steps. (#1625) Thanks @sergical. https://docs.clawd.bot/bedrock - Exec approvals: forward approval prompts to chat with `/approve` for all channels (including plugins). (#1621) Thanks @czekaj. https://docs.clawd.bot/tools/exec-approvals https://docs.clawd.bot/tools/slash-commands - Gateway: expose config.patch in the gateway tool with safe partial updates + restart sentinel. (#1653) Thanks @Glucksberg. +- Telegram: treat DM topics as separate sessions and keep DM history limits stable with thread suffixes. +- Telegram: add verbose raw-update logging for inbound Telegram updates. ### Fixes - BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing. diff --git a/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts b/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts index 4b6f082b1..ec2b58f79 100644 --- a/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts +++ b/src/agents/pi-embedded-runner.get-dm-history-limit-from-session-key.returns-undefined-sessionkey-is-undefined.test.ts @@ -121,6 +121,16 @@ describe("getDmHistoryLimitFromSessionKey", () => { } as ClawdbotConfig; expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config)).toBe(10); }); + it("strips thread suffix from dm session keys", () => { + const config = { + channels: { telegram: { dmHistoryLimit: 10, dms: { "123": { historyLimit: 7 } } } }, + } as ClawdbotConfig; + expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:thread:999", config)).toBe( + 7, + ); + expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:topic:555", config)).toBe(7); + expect(getDmHistoryLimitFromSessionKey("telegram:dm:123:thread:999", config)).toBe(7); + }); it("returns undefined for non-dm session kinds", () => { const config = { channels: { diff --git a/src/agents/pi-embedded-runner/history.ts b/src/agents/pi-embedded-runner/history.ts index bcc0625c7..37e3e0edf 100644 --- a/src/agents/pi-embedded-runner/history.ts +++ b/src/agents/pi-embedded-runner/history.ts @@ -2,6 +2,19 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core"; import type { ClawdbotConfig } from "../../config/config.js"; +const THREAD_SUFFIX_MARKERS = [":thread:", ":topic:"]; + +function stripThreadSuffix(value: string): string { + const lower = value.toLowerCase(); + let idx = -1; + for (const marker of THREAD_SUFFIX_MARKERS) { + const pos = lower.lastIndexOf(marker); + if (pos > idx) idx = pos; + } + if (idx <= 0) return value; + return value.slice(0, idx); +} + /** * Limits conversation history to the last N user turns (and their associated * assistant responses). This reduces token usage for long-running DM sessions. @@ -44,7 +57,8 @@ export function getDmHistoryLimitFromSessionKey( if (!provider) return undefined; const kind = providerParts[1]?.toLowerCase(); - const userId = providerParts.slice(2).join(":"); + const userIdRaw = providerParts.slice(2).join(":"); + const userId = stripThreadSuffix(userIdRaw); if (kind !== "dm") return undefined; const getLimit = ( diff --git a/src/telegram/bot-message-context.dm-threads.test.ts b/src/telegram/bot-message-context.dm-threads.test.ts new file mode 100644 index 000000000..ff6a8a837 --- /dev/null +++ b/src/telegram/bot-message-context.dm-threads.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it, vi } from "vitest"; + +import { buildTelegramMessageContext } from "./bot-message-context.js"; + +describe("buildTelegramMessageContext dm thread sessions", () => { + const baseConfig = { + agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" } }, + channels: { telegram: {} }, + messages: { groupChat: { mentionPatterns: [] } }, + } as never; + + const buildContext = async (message: Record) => + await buildTelegramMessageContext({ + primaryCtx: { + message, + me: { id: 7, username: "bot" }, + } as never, + allMedia: [], + storeAllowFrom: [], + options: {}, + bot: { + api: { + sendChatAction: vi.fn(), + setMessageReaction: vi.fn(), + }, + } as never, + cfg: baseConfig, + account: { accountId: "default" } as never, + historyLimit: 0, + groupHistories: new Map(), + dmPolicy: "open", + allowFrom: [], + groupAllowFrom: [], + ackReactionScope: "off", + logger: { info: vi.fn() }, + resolveGroupActivation: () => undefined, + resolveGroupRequireMention: () => false, + resolveTelegramGroupConfig: () => ({ + groupConfig: { requireMention: false }, + topicConfig: undefined, + }), + }); + + it("uses thread session key for dm topics", async () => { + const ctx = await buildContext({ + message_id: 1, + chat: { id: 1234, type: "private" }, + date: 1700000000, + text: "hello", + message_thread_id: 42, + from: { id: 42, first_name: "Alice" }, + }); + + expect(ctx).not.toBeNull(); + expect(ctx?.ctxPayload?.MessageThreadId).toBe(42); + expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:42"); + }); + + it("keeps legacy dm session key when no thread id", async () => { + const ctx = await buildContext({ + message_id: 2, + chat: { id: 1234, type: "private" }, + date: 1700000001, + text: "hello", + from: { id: 42, first_name: "Alice" }, + }); + + expect(ctx).not.toBeNull(); + expect(ctx?.ctxPayload?.MessageThreadId).toBeUndefined(); + expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main"); + }); +}); diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 85d21228e..d90b6ffea 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -20,6 +20,7 @@ import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../conf import { logVerbose, shouldLogVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js"; import { resolveMentionGatingWithBypass } from "../channels/mention-gating.js"; import { resolveControlCommandGate } from "../channels/command-gating.js"; @@ -136,6 +137,13 @@ export const buildTelegramMessageContext = async ({ id: peerId, }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom, storeAllowFrom }); const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); @@ -325,7 +333,7 @@ export const buildTelegramMessageContext = async ({ const activationOverride = resolveGroupActivation({ chatId, messageThreadId: resolvedThreadId, - sessionKey: route.sessionKey, + sessionKey: sessionKey, agentId: route.agentId, }); const baseRequireMention = resolveGroupRequireMention(chatId); @@ -432,7 +440,7 @@ export const buildTelegramMessageContext = async ({ const envelopeOptions = resolveEnvelopeFormatOptions(cfg); const previousTimestamp = readSessionUpdatedAt({ storePath, - sessionKey: route.sessionKey, + sessionKey: sessionKey, }); const body = formatInboundEnvelope({ channel: "Telegram", @@ -482,7 +490,7 @@ export const buildTelegramMessageContext = async ({ CommandBody: commandBody, From: isGroup ? buildTelegramGroupFrom(chatId, resolvedThreadId) : `telegram:${chatId}`, To: `telegram:${chatId}`, - SessionKey: route.sessionKey, + SessionKey: sessionKey, AccountId: route.accountId, ChatType: isGroup ? "group" : "direct", ConversationLabel: conversationLabel, @@ -526,7 +534,7 @@ export const buildTelegramMessageContext = async ({ await recordInboundSession({ storePath, - sessionKey: ctxPayload.SessionKey ?? route.sessionKey, + sessionKey: ctxPayload.SessionKey ?? sessionKey, ctx: ctxPayload, updateLastRoute: !isGroup ? { diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index a983452ba..fc28e58e3 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -18,6 +18,7 @@ import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { danger, logVerbose } from "../globals.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js"; import type { ChannelGroupPolicy } from "../config/group-policy.js"; import type { @@ -271,6 +272,13 @@ export const registerTelegramNativeCommands = ({ id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId), }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const tableMode = resolveMarkdownTableMode({ cfg, channel: "telegram", @@ -309,7 +317,7 @@ export const registerTelegramNativeCommands = ({ CommandAuthorized: commandAuthorized, CommandSource: "native" as const, SessionKey: `telegram:slash:${senderId || chatId}`, - CommandTargetSessionKey: route.sessionKey, + CommandTargetSessionKey: sessionKey, MessageThreadId: resolvedThreadId, IsForum: isForum, // Originating context for sub-agent announce routing diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index ab2b55505..486741f53 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -2163,6 +2163,46 @@ describe("createTelegramBot", () => { expect.objectContaining({ message_thread_id: 99 }), ); }); + it("sets command target session key for dm topic commands", async () => { + onSpy.mockReset(); + sendMessageSpy.mockReset(); + commandSpy.mockReset(); + const replySpy = replyModule.__replySpy as unknown as ReturnType; + replySpy.mockReset(); + replySpy.mockResolvedValue({ text: "response" }); + + loadConfig.mockReturnValue({ + commands: { native: true }, + channels: { + telegram: { + dmPolicy: "pairing", + }, + }, + }); + readTelegramAllowFromStore.mockResolvedValueOnce(["12345"]); + + createTelegramBot({ token: "tok" }); + const handler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as + | ((ctx: Record) => Promise) + | undefined; + if (!handler) throw new Error("status command handler missing"); + + await handler({ + message: { + chat: { id: 12345, type: "private" }, + from: { id: 12345, username: "testuser" }, + text: "/status", + date: 1736380800, + message_id: 42, + message_thread_id: 99, + }, + match: "", + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0][0]; + expect(payload.CommandTargetSessionKey).toBe("agent:main:main:thread:99"); + }); it("allows native DM commands for paired users", async () => { onSpy.mockReset(); @@ -2789,4 +2829,41 @@ describe("createTelegramBot", () => { const sessionKey = enqueueSystemEvent.mock.calls[0][1].sessionKey; expect(sessionKey).not.toContain(":topic:"); }); + it("uses thread session key for dm reactions with topic id", async () => { + onSpy.mockReset(); + enqueueSystemEvent.mockReset(); + + loadConfig.mockReturnValue({ + channels: { + telegram: { dmPolicy: "open", reactionNotifications: "all" }, + }, + }); + + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("message_reaction") as ( + ctx: Record, + ) => Promise; + + await handler({ + update: { update_id: 508 }, + messageReaction: { + chat: { id: 1234, type: "private" }, + message_id: 300, + message_thread_id: 42, + user: { id: 12, first_name: "Dana" }, + date: 1736380800, + old_reaction: [], + new_reaction: [{ type: "emoji", emoji: "🔥" }], + }, + }); + + expect(enqueueSystemEvent).toHaveBeenCalledTimes(1); + expect(enqueueSystemEvent).toHaveBeenCalledWith( + "Telegram reaction added: 🔥 by Dana on msg 300", + expect.objectContaining({ + sessionKey: expect.stringContaining(":thread:42"), + contextKey: expect.stringContaining("telegram:reaction:add:1234:300:12"), + }), + ); + }); }); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index de7d715ab..652aaa4e7 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -20,9 +20,11 @@ import { } from "../config/group-policy.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../routing/session-key.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; import { @@ -161,7 +163,18 @@ export function createTelegramBot(opts: TelegramBotOptions) { return skipped; }; + const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update"); + bot.use(async (ctx, next) => { + if (shouldLogVerbose()) { + try { + const raw = JSON.stringify(ctx.update ?? null); + const preview = raw.length > 8000 ? raw.slice(0, 8000) + "…" : raw; + rawUpdateLogger.debug(`telegram update: ${preview}`); + } catch (err) { + rawUpdateLogger.debug(`telegram update log failed: ${String(err)}`); + } + } await next(); recordUpdateId(ctx); }); @@ -372,13 +385,20 @@ export function createTelegramBot(opts: TelegramBotOptions) { accountId: account.accountId, peer: { kind: isGroup ? "group" : "dm", id: peerId }, }); + const baseSessionKey = route.sessionKey; + const dmThreadId = !isGroup ? resolvedThreadId : undefined; + const threadKeys = + dmThreadId != null + ? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) }) + : null; + const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; // Enqueue system event for each added reaction for (const r of addedReactions) { const emoji = r.emoji; const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`; enqueueSystemEvent(text, { - sessionKey: route.sessionKey, + sessionKey: sessionKey, contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`, }); logVerbose(`telegram: reaction event enqueued: ${text}`);