diff --git a/extensions/telegram-user/src/monitor/handler.ts b/extensions/telegram-user/src/monitor/handler.ts index acbb2d7fd..1cffa5a5a 100644 --- a/extensions/telegram-user/src/monitor/handler.ts +++ b/extensions/telegram-user/src/monitor/handler.ts @@ -22,6 +22,7 @@ type TelegramUserHandlerParams = { runtime: RuntimeEnv; accountId: string; accountConfig: TelegramUserAccountConfig; + abortSignal?: AbortSignal; self?: { id: number; username?: string | null }; }; @@ -89,8 +90,10 @@ async function safeSendTyping(params: { status: Parameters[1]; typingParams?: Parameters[2]; runtime: TelegramUserHandlerParams["runtime"]; + abortSignal?: AbortSignal; logLabel: string; }) { + if (params.abortSignal?.aborted) return; if (isClientDestroyed(params.client)) return; try { await params.client.sendTyping(params.target, params.status, params.typingParams); @@ -279,7 +282,7 @@ async function resolveMediaAttachments(params: { } export function createTelegramUserMessageHandler(params: TelegramUserHandlerParams) { - const { client, cfg, runtime, accountId, accountConfig, self } = params; + const { client, cfg, runtime, accountId, accountConfig, self, abortSignal } = params; const core = getTelegramUserRuntime(); const textLimit = accountConfig.textChunkLimit ?? DEFAULT_TEXT_LIMIT; const mediaMaxMb = accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB; @@ -288,6 +291,8 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara const groupAllowFrom = accountConfig.groupAllowFrom ?? allowFrom; return async (msg: MessageContext) => { + if (abortSignal?.aborted) return; + if (isClientDestroyed(client)) return; try { if (msg.isOutgoing || msg.isService) return; const messageGroup = msg.isMessageGroup ? msg.messages : [msg]; @@ -642,10 +647,14 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara .responsePrefix, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload) => { + if (abortSignal?.aborted) return; + if (isClientDestroyed(client)) return; const replyToId = hasReplied ? undefined : msg.id; const replyText = payload.text ?? ""; const mediaUrl = payload.mediaUrl; if (mediaUrl) { + if (abortSignal?.aborted) return; + if (isClientDestroyed(client)) return; if (payload.audioAsVoice) { await safeSendTyping({ client, @@ -653,6 +662,7 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara status: "record_voice", typingParams, runtime, + abortSignal, logLabel: "voice typing", }); } @@ -680,6 +690,8 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara } if (replyText) { for (const chunk of core.channel.text.chunkMarkdownText(replyText, textLimit)) { + if (abortSignal?.aborted) return; + if (isClientDestroyed(client)) return; const trimmed = chunk.trim(); if (!trimmed) continue; try { @@ -709,6 +721,7 @@ export function createTelegramUserMessageHandler(params: TelegramUserHandlerPara status: "typing", typingParams, runtime, + abortSignal, logLabel: "typing", }); }, diff --git a/extensions/telegram-user/src/monitor/index.ts b/extensions/telegram-user/src/monitor/index.ts index 5e7486089..c9742c7dc 100644 --- a/extensions/telegram-user/src/monitor/index.ts +++ b/extensions/telegram-user/src/monitor/index.ts @@ -18,6 +18,11 @@ async function loadMtcuteDispatcher(): Promise { return mtcuteDispatcherPromise; } +function isDestroyedClientError(err: unknown): boolean { + const message = err instanceof Error ? err.message : String(err); + return /client is destroyed/i.test(message); +} + export type MonitorTelegramUserOpts = { runtime?: RuntimeEnv; abortSignal?: AbortSignal; @@ -33,6 +38,8 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts }); if (!account.enabled) return; + let shuttingDown = false; + const apiId = account.credentials.apiId; const apiHash = account.credentials.apiHash; if (!apiId || !apiHash) { @@ -60,6 +67,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts setActiveTelegramUserClient(client); const stop = async () => { + shuttingDown = true; setActiveTelegramUserClient(null); await client.destroy().catch(() => undefined); }; @@ -67,6 +75,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts opts.abortSignal?.addEventListener( "abort", () => { + shuttingDown = true; void stop(); }, { once: true }, @@ -83,6 +92,7 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts runtime, accountId: account.accountId, accountConfig: account.config, + abortSignal: opts.abortSignal, self: self ? { id: self.id, username: "username" in self ? self.username : undefined } : undefined, @@ -99,15 +109,31 @@ export async function monitorTelegramUserProvider(opts: MonitorTelegramUserOpts ); await new Promise((resolve, reject) => { - client.onError.add((err) => { - runtime.error?.(`telegram-user client error: ${String(err)}`); + let settled = false; + const settleResolve = () => { + if (settled) return; + settled = true; + resolve(); + }; + const settleReject = (err: unknown) => { + if (settled) return; + settled = true; reject(err); + }; + + client.onError.add((err) => { + if (shuttingDown || opts.abortSignal?.aborted || isDestroyedClientError(err)) { + settleResolve(); + return; + } + runtime.error?.(`telegram-user client error: ${String(err)}`); + settleReject(err); }); if (opts.abortSignal?.aborted) { - resolve(); + settleResolve(); return; } - opts.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); + opts.abortSignal?.addEventListener("abort", () => settleResolve(), { once: true }); }); await stop();