Telegram: isolate dm topic sessions

This commit is contained in:
rohan nagpal 2026-01-24 12:42:25 +00:00 committed by Peter Steinberger
parent d4f60bf16a
commit 704af78a5d
8 changed files with 218 additions and 7 deletions

View File

@ -18,6 +18,8 @@ Docs: https://docs.clawd.bot
- Docs: add Bedrock EC2 instance role setup + IAM steps. (#1625) Thanks @sergical. https://docs.clawd.bot/bedrock
- Exec approvals: forward approval prompts to chat with `/approve` for all channels (including plugins). (#1621) Thanks @czekaj. https://docs.clawd.bot/tools/exec-approvals https://docs.clawd.bot/tools/slash-commands
- Gateway: expose config.patch in the gateway tool with safe partial updates + restart sentinel. (#1653) Thanks @Glucksberg.
- Telegram: treat DM topics as separate sessions and keep DM history limits stable with thread suffixes.
- Telegram: add verbose raw-update logging for inbound Telegram updates.
### Fixes
- BlueBubbles: keep part-index GUIDs in reply tags when short IDs are missing.

View File

@ -121,6 +121,16 @@ describe("getDmHistoryLimitFromSessionKey", () => {
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123", config)).toBe(10);
});
it("strips thread suffix from dm session keys", () => {
const config = {
channels: { telegram: { dmHistoryLimit: 10, dms: { "123": { historyLimit: 7 } } } },
} as ClawdbotConfig;
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:thread:999", config)).toBe(
7,
);
expect(getDmHistoryLimitFromSessionKey("agent:main:telegram:dm:123:topic:555", config)).toBe(7);
expect(getDmHistoryLimitFromSessionKey("telegram:dm:123:thread:999", config)).toBe(7);
});
it("returns undefined for non-dm session kinds", () => {
const config = {
channels: {

View File

@ -2,6 +2,19 @@ import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { ClawdbotConfig } from "../../config/config.js";
const THREAD_SUFFIX_MARKERS = [":thread:", ":topic:"];
function stripThreadSuffix(value: string): string {
const lower = value.toLowerCase();
let idx = -1;
for (const marker of THREAD_SUFFIX_MARKERS) {
const pos = lower.lastIndexOf(marker);
if (pos > idx) idx = pos;
}
if (idx <= 0) return value;
return value.slice(0, idx);
}
/**
* Limits conversation history to the last N user turns (and their associated
* assistant responses). This reduces token usage for long-running DM sessions.
@ -44,7 +57,8 @@ export function getDmHistoryLimitFromSessionKey(
if (!provider) return undefined;
const kind = providerParts[1]?.toLowerCase();
const userId = providerParts.slice(2).join(":");
const userIdRaw = providerParts.slice(2).join(":");
const userId = stripThreadSuffix(userIdRaw);
if (kind !== "dm") return undefined;
const getLimit = (

View File

@ -0,0 +1,72 @@
import { describe, expect, it, vi } from "vitest";
import { buildTelegramMessageContext } from "./bot-message-context.js";
describe("buildTelegramMessageContext dm thread sessions", () => {
const baseConfig = {
agents: { defaults: { model: "anthropic/claude-opus-4-5", workspace: "/tmp/clawd" } },
channels: { telegram: {} },
messages: { groupChat: { mentionPatterns: [] } },
} as never;
const buildContext = async (message: Record<string, unknown>) =>
await buildTelegramMessageContext({
primaryCtx: {
message,
me: { id: 7, username: "bot" },
} as never,
allMedia: [],
storeAllowFrom: [],
options: {},
bot: {
api: {
sendChatAction: vi.fn(),
setMessageReaction: vi.fn(),
},
} as never,
cfg: baseConfig,
account: { accountId: "default" } as never,
historyLimit: 0,
groupHistories: new Map(),
dmPolicy: "open",
allowFrom: [],
groupAllowFrom: [],
ackReactionScope: "off",
logger: { info: vi.fn() },
resolveGroupActivation: () => undefined,
resolveGroupRequireMention: () => false,
resolveTelegramGroupConfig: () => ({
groupConfig: { requireMention: false },
topicConfig: undefined,
}),
});
it("uses thread session key for dm topics", async () => {
const ctx = await buildContext({
message_id: 1,
chat: { id: 1234, type: "private" },
date: 1700000000,
text: "hello",
message_thread_id: 42,
from: { id: 42, first_name: "Alice" },
});
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBe(42);
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main:thread:42");
});
it("keeps legacy dm session key when no thread id", async () => {
const ctx = await buildContext({
message_id: 2,
chat: { id: 1234, type: "private" },
date: 1700000001,
text: "hello",
from: { id: 42, first_name: "Alice" },
});
expect(ctx).not.toBeNull();
expect(ctx?.ctxPayload?.MessageThreadId).toBeUndefined();
expect(ctx?.ctxPayload?.SessionKey).toBe("agent:main:main");
});
});

View File

@ -20,6 +20,7 @@ import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../conf
import { logVerbose, shouldLogVerbose } from "../globals.js";
import { recordChannelActivity } from "../infra/channel-activity.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { shouldAckReaction as shouldAckReactionGate } from "../channels/ack-reactions.js";
import { resolveMentionGatingWithBypass } from "../channels/mention-gating.js";
import { resolveControlCommandGate } from "../channels/command-gating.js";
@ -136,6 +137,13 @@ export const buildTelegramMessageContext = async ({
id: peerId,
},
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const mentionRegexes = buildMentionRegexes(cfg, route.agentId);
const effectiveDmAllow = normalizeAllowFromWithStore({ allowFrom, storeAllowFrom });
const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom);
@ -325,7 +333,7 @@ export const buildTelegramMessageContext = async ({
const activationOverride = resolveGroupActivation({
chatId,
messageThreadId: resolvedThreadId,
sessionKey: route.sessionKey,
sessionKey: sessionKey,
agentId: route.agentId,
});
const baseRequireMention = resolveGroupRequireMention(chatId);
@ -432,7 +440,7 @@ export const buildTelegramMessageContext = async ({
const envelopeOptions = resolveEnvelopeFormatOptions(cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
sessionKey: sessionKey,
});
const body = formatInboundEnvelope({
channel: "Telegram",
@ -482,7 +490,7 @@ export const buildTelegramMessageContext = async ({
CommandBody: commandBody,
From: isGroup ? buildTelegramGroupFrom(chatId, resolvedThreadId) : `telegram:${chatId}`,
To: `telegram:${chatId}`,
SessionKey: route.sessionKey,
SessionKey: sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: conversationLabel,
@ -526,7 +534,7 @@ export const buildTelegramMessageContext = async ({
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
sessionKey: ctxPayload.SessionKey ?? sessionKey,
ctx: ctxPayload,
updateLastRoute: !isGroup
? {

View File

@ -18,6 +18,7 @@ import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { danger, logVerbose } from "../globals.js";
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import { resolveCommandAuthorizedFromAuthorizers } from "../channels/command-gating.js";
import type { ChannelGroupPolicy } from "../config/group-policy.js";
import type {
@ -271,6 +272,13 @@ export const registerTelegramNativeCommands = ({
id: isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId),
},
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
const tableMode = resolveMarkdownTableMode({
cfg,
channel: "telegram",
@ -309,7 +317,7 @@ export const registerTelegramNativeCommands = ({
CommandAuthorized: commandAuthorized,
CommandSource: "native" as const,
SessionKey: `telegram:slash:${senderId || chatId}`,
CommandTargetSessionKey: route.sessionKey,
CommandTargetSessionKey: sessionKey,
MessageThreadId: resolvedThreadId,
IsForum: isForum,
// Originating context for sub-agent announce routing

View File

@ -2163,6 +2163,46 @@ describe("createTelegramBot", () => {
expect.objectContaining({ message_thread_id: 99 }),
);
});
it("sets command target session key for dm topic commands", async () => {
onSpy.mockReset();
sendMessageSpy.mockReset();
commandSpy.mockReset();
const replySpy = replyModule.__replySpy as unknown as ReturnType<typeof vi.fn>;
replySpy.mockReset();
replySpy.mockResolvedValue({ text: "response" });
loadConfig.mockReturnValue({
commands: { native: true },
channels: {
telegram: {
dmPolicy: "pairing",
},
},
});
readTelegramAllowFromStore.mockResolvedValueOnce(["12345"]);
createTelegramBot({ token: "tok" });
const handler = commandSpy.mock.calls.find((call) => call[0] === "status")?.[1] as
| ((ctx: Record<string, unknown>) => Promise<void>)
| undefined;
if (!handler) throw new Error("status command handler missing");
await handler({
message: {
chat: { id: 12345, type: "private" },
from: { id: 12345, username: "testuser" },
text: "/status",
date: 1736380800,
message_id: 42,
message_thread_id: 99,
},
match: "",
});
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0][0];
expect(payload.CommandTargetSessionKey).toBe("agent:main:main:thread:99");
});
it("allows native DM commands for paired users", async () => {
onSpy.mockReset();
@ -2789,4 +2829,41 @@ describe("createTelegramBot", () => {
const sessionKey = enqueueSystemEvent.mock.calls[0][1].sessionKey;
expect(sessionKey).not.toContain(":topic:");
});
it("uses thread session key for dm reactions with topic id", async () => {
onSpy.mockReset();
enqueueSystemEvent.mockReset();
loadConfig.mockReturnValue({
channels: {
telegram: { dmPolicy: "open", reactionNotifications: "all" },
},
});
createTelegramBot({ token: "tok" });
const handler = getOnHandler("message_reaction") as (
ctx: Record<string, unknown>,
) => Promise<void>;
await handler({
update: { update_id: 508 },
messageReaction: {
chat: { id: 1234, type: "private" },
message_id: 300,
message_thread_id: 42,
user: { id: 12, first_name: "Dana" },
date: 1736380800,
old_reaction: [],
new_reaction: [{ type: "emoji", emoji: "🔥" }],
},
});
expect(enqueueSystemEvent).toHaveBeenCalledTimes(1);
expect(enqueueSystemEvent).toHaveBeenCalledWith(
"Telegram reaction added: 🔥 by Dana on msg 300",
expect.objectContaining({
sessionKey: expect.stringContaining(":thread:42"),
contextKey: expect.stringContaining("telegram:reaction:add:1234:300:12"),
}),
);
});
});

View File

@ -20,9 +20,11 @@ import {
} from "../config/group-policy.js";
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { getChildLogger } from "../logging.js";
import { resolveAgentRoute } from "../routing/resolve-route.js";
import { resolveThreadSessionKeys } from "../routing/session-key.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTelegramAccount } from "./accounts.js";
import {
@ -161,7 +163,18 @@ export function createTelegramBot(opts: TelegramBotOptions) {
return skipped;
};
const rawUpdateLogger = createSubsystemLogger("gateway/channels/telegram/raw-update");
bot.use(async (ctx, next) => {
if (shouldLogVerbose()) {
try {
const raw = JSON.stringify(ctx.update ?? null);
const preview = raw.length > 8000 ? raw.slice(0, 8000) + "…" : raw;
rawUpdateLogger.debug(`telegram update: ${preview}`);
} catch (err) {
rawUpdateLogger.debug(`telegram update log failed: ${String(err)}`);
}
}
await next();
recordUpdateId(ctx);
});
@ -372,13 +385,20 @@ export function createTelegramBot(opts: TelegramBotOptions) {
accountId: account.accountId,
peer: { kind: isGroup ? "group" : "dm", id: peerId },
});
const baseSessionKey = route.sessionKey;
const dmThreadId = !isGroup ? resolvedThreadId : undefined;
const threadKeys =
dmThreadId != null
? resolveThreadSessionKeys({ baseSessionKey, threadId: String(dmThreadId) })
: null;
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
// Enqueue system event for each added reaction
for (const r of addedReactions) {
const emoji = r.emoji;
const text = `Telegram reaction added: ${emoji} by ${senderLabel} on msg ${messageId}`;
enqueueSystemEvent(text, {
sessionKey: route.sessionKey,
sessionKey: sessionKey,
contextKey: `telegram:reaction:add:${chatId}:${messageId}:${user?.id ?? "anon"}:${emoji}`,
});
logVerbose(`telegram: reaction event enqueued: ${text}`);