telegram-user: avoid crash on config reload

This commit is contained in:
Muhammed Mukhthar CM 2026-01-27 10:47:45 +00:00
parent 65c3718c96
commit b649960e7c
2 changed files with 44 additions and 5 deletions

View File

@ -22,6 +22,7 @@ type TelegramUserHandlerParams = {
runtime: RuntimeEnv; runtime: RuntimeEnv;
accountId: string; accountId: string;
accountConfig: TelegramUserAccountConfig; accountConfig: TelegramUserAccountConfig;
abortSignal?: AbortSignal;
self?: { id: number; username?: string | null }; self?: { id: number; username?: string | null };
}; };
@ -89,8 +90,10 @@ async function safeSendTyping(params: {
status: Parameters<TelegramClient["sendTyping"]>[1]; status: Parameters<TelegramClient["sendTyping"]>[1];
typingParams?: Parameters<TelegramClient["sendTyping"]>[2]; typingParams?: Parameters<TelegramClient["sendTyping"]>[2];
runtime: TelegramUserHandlerParams["runtime"]; runtime: TelegramUserHandlerParams["runtime"];
abortSignal?: AbortSignal;
logLabel: string; logLabel: string;
}) { }) {
if (params.abortSignal?.aborted) return;
if (isClientDestroyed(params.client)) return; if (isClientDestroyed(params.client)) return;
try { try {
await params.client.sendTyping(params.target, params.status, params.typingParams); await params.client.sendTyping(params.target, params.status, params.typingParams);
@ -279,7 +282,7 @@ async function resolveMediaAttachments(params: {
} }
export function createTelegramUserMessageHandler(params: TelegramUserHandlerParams) { export function createTelegramUserMessageHandler(params: TelegramUserHandlerParams) {
const { client, cfg, runtime, accountId, accountConfig, self } = params; const { client, cfg, runtime, accountId, accountConfig, self, abortSignal } = params;
const core = getTelegramUserRuntime(); const core = getTelegramUserRuntime();
const textLimit = accountConfig.textChunkLimit ?? DEFAULT_TEXT_LIMIT; const textLimit = accountConfig.textChunkLimit ?? DEFAULT_TEXT_LIMIT;
const mediaMaxMb = accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; const mediaMaxMb = accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
@ -288,6 +291,8 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara
const groupAllowFrom = accountConfig.groupAllowFrom ?? allowFrom; const groupAllowFrom = accountConfig.groupAllowFrom ?? allowFrom;
return async (msg: MessageContext) => { return async (msg: MessageContext) => {
if (abortSignal?.aborted) return;
if (isClientDestroyed(client)) return;
try { try {
if (msg.isOutgoing || msg.isService) return; if (msg.isOutgoing || msg.isService) return;
const messageGroup = msg.isMessageGroup ? msg.messages : [msg]; const messageGroup = msg.isMessageGroup ? msg.messages : [msg];
@ -642,10 +647,14 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara
.responsePrefix, .responsePrefix,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload) => { deliver: async (payload) => {
if (abortSignal?.aborted) return;
if (isClientDestroyed(client)) return;
const replyToId = hasReplied ? undefined : msg.id; const replyToId = hasReplied ? undefined : msg.id;
const replyText = payload.text ?? ""; const replyText = payload.text ?? "";
const mediaUrl = payload.mediaUrl; const mediaUrl = payload.mediaUrl;
if (mediaUrl) { if (mediaUrl) {
if (abortSignal?.aborted) return;
if (isClientDestroyed(client)) return;
if (payload.audioAsVoice) { if (payload.audioAsVoice) {
await safeSendTyping({ await safeSendTyping({
client, client,
@ -653,6 +662,7 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara
status: "record_voice", status: "record_voice",
typingParams, typingParams,
runtime, runtime,
abortSignal,
logLabel: "voice typing", logLabel: "voice typing",
}); });
} }
@ -680,6 +690,8 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara
} }
if (replyText) { if (replyText) {
for (const chunk of core.channel.text.chunkMarkdownText(replyText, textLimit)) { for (const chunk of core.channel.text.chunkMarkdownText(replyText, textLimit)) {
if (abortSignal?.aborted) return;
if (isClientDestroyed(client)) return;
const trimmed = chunk.trim(); const trimmed = chunk.trim();
if (!trimmed) continue; if (!trimmed) continue;
try { try {
@ -709,6 +721,7 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara
status: "typing", status: "typing",
typingParams, typingParams,
runtime, runtime,
abortSignal,
logLabel: "typing", logLabel: "typing",
}); });
}, },

View File

@ -18,6 +18,11 @@ async function loadMtcuteDispatcher(): Promise<MtcuteDispatcher> {
return mtcuteDispatcherPromise; return mtcuteDispatcherPromise;
} }
function isDestroyedClientError(err: unknown): boolean {
const message = err instanceof Error ? err.message : String(err);
return /client is destroyed/i.test(message);
}
export type MonitorTelegramUserOpts = { export type MonitorTelegramUserOpts = {
runtime?: RuntimeEnv; runtime?: RuntimeEnv;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
@ -33,6 +38,8 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts
}); });
if (!account.enabled) return; if (!account.enabled) return;
let shuttingDown = false;
const apiId = account.credentials.apiId; const apiId = account.credentials.apiId;
const apiHash = account.credentials.apiHash; const apiHash = account.credentials.apiHash;
if (!apiId || !apiHash) { if (!apiId || !apiHash) {
@ -60,6 +67,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts
setActiveTelegramUserClient(client); setActiveTelegramUserClient(client);
const stop = async () => { const stop = async () => {
shuttingDown = true;
setActiveTelegramUserClient(null); setActiveTelegramUserClient(null);
await client.destroy().catch(() => undefined); await client.destroy().catch(() => undefined);
}; };
@ -67,6 +75,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts
opts.abortSignal?.addEventListener( opts.abortSignal?.addEventListener(
"abort", "abort",
() => { () => {
shuttingDown = true;
void stop(); void stop();
}, },
{ once: true }, { once: true },
@ -83,6 +92,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts
runtime, runtime,
accountId: account.accountId, accountId: account.accountId,
accountConfig: account.config, accountConfig: account.config,
abortSignal: opts.abortSignal,
self: self self: self
? { id: self.id, username: "username" in self ? self.username : undefined } ? { id: self.id, username: "username" in self ? self.username : undefined }
: undefined, : undefined,
@ -99,15 +109,31 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts
); );
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
client.onError.add((err) => { let settled = false;
runtime.error?.(`telegram-user client error: ${String(err)}`); const settleResolve = () => {
if (settled) return;
settled = true;
resolve();
};
const settleReject = (err: unknown) => {
if (settled) return;
settled = true;
reject(err); reject(err);
};
client.onError.add((err) => {
if (shuttingDown || opts.abortSignal?.aborted || isDestroyedClientError(err)) {
settleResolve();
return;
}
runtime.error?.(`telegram-user client error: ${String(err)}`);
settleReject(err);
}); });
if (opts.abortSignal?.aborted) { if (opts.abortSignal?.aborted) {
resolve(); settleResolve();
return; return;
} }
opts.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); opts.abortSignal?.addEventListener("abort", () => settleResolve(), { once: true });
}); });
await stop(); await stop();