import { Kafka } from 'kafkajs'; import nodemailer from 'nodemailer'; import { SmtpServer, DeliveryLog, Subscriber } from '../models/index.js'; import { topicManager } from './topicManager.js'; const kafka = new Kafka({ clientId: process.env.KAFKA_CLIENT_ID || 'dynamic-consumer', brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], }); export class DynamicConsumer { constructor() { this.consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group' }); this.subscribedTopics = new Set(); this.isRunning = false; this.messageHandler = null; } async connect() { try { await this.consumer.connect(); console.log('[DynamicConsumer] Connected to Kafka'); return true; } catch (error) { console.error('[DynamicConsumer] Connection error:', error); return false; } } async disconnect() { try { await this.consumer.disconnect(); this.subscribedTopics.clear(); this.isRunning = false; console.log('[DynamicConsumer] Disconnected from Kafka'); } catch (error) { console.error('[DynamicConsumer] Disconnect error:', error); } } // Установка обработчика сообщений setMessageHandler(handler) { this.messageHandler = handler; } // Подписка на конкретный топик async subscribeToTopic(topicName) { if (this.subscribedTopics.has(topicName)) { console.log(`[DynamicConsumer] Already subscribed to topic: ${topicName}`); return true; } try { await this.consumer.subscribe({ topic: topicName, fromBeginning: true }); this.subscribedTopics.add(topicName); console.log(`[DynamicConsumer] Subscribed to topic: ${topicName}`); // Если consumer еще не запущен, запускаем его if (!this.isRunning) { await this.startConsuming(); } return true; } catch (error) { console.error(`[DynamicConsumer] Error subscribing to ${topicName}:`, error); return false; } } // Отписка от топика async unsubscribeFromTopic(topicName) { if (!this.subscribedTopics.has(topicName)) { console.log(`[DynamicConsumer] Not subscribed to topic: ${topicName}`); return true; } try { await this.consumer.stop(); this.subscribedTopics.delete(topicName); // Переподписываемся на оставшиеся топики if (this.subscribedTopics.size > 0) { await this.consumer.subscribe( Array.from(this.subscribedTopics).map(topic => ({ topic, fromBeginning: true })) ); await this.startConsuming(); } console.log(`[DynamicConsumer] Unsubscribed from topic: ${topicName}`); return true; } catch (error) { console.error(`[DynamicConsumer] Error unsubscribing from ${topicName}:`, error); return false; } } // Подписка на несколько топиков async subscribeToTopics(topicNames) { const results = []; for (const topicName of topicNames) { const result = await this.subscribeToTopic(topicName); results.push({ topic: topicName, success: result }); } return results; } // Получение списка подписанных топиков getSubscribedTopics() { return Array.from(this.subscribedTopics); } // Запуск обработки сообщений async startConsuming() { if (this.isRunning) { console.log('[DynamicConsumer] Already consuming messages'); return; } try { await this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { const messageId = `${topic}-${partition}-${message.offset}`; console.log(`[DynamicConsumer] Received message: ${messageId}`); if (this.messageHandler) { try { const task = JSON.parse(message.value.toString()); console.log(`[DynamicConsumer] Processing task:`, { campaignId: task.campaignId, subscriberId: task.subscriberId, email: task.email, hasSubject: !!task.subject, hasText: !!task.text, hasHtml: !!task.html, smtpServerId: task.smtpServerId }); await this.messageHandler(task, topic); console.log(`[DynamicConsumer] Successfully processed message: ${messageId}`); } catch (error) { console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error); // Не подтверждаем сообщение при ошибке, чтобы оно было переотправлено throw error; } } }, }); this.isRunning = true; console.log('[DynamicConsumer] Started consuming messages'); } catch (error) { console.error('[DynamicConsumer] Error starting consumer:', error); } } // Остановка обработки сообщений async stopConsuming() { try { await this.consumer.stop(); this.isRunning = false; console.log('[DynamicConsumer] Stopped consuming messages'); } catch (error) { console.error('[DynamicConsumer] Error stopping consumer:', error); } } // Автоматическое обнаружение и подписка на новые топики async autoSubscribeToNewTopics() { try { const allTopics = await topicManager.getAllTopics(); const newTopics = allTopics.filter(topic => !this.subscribedTopics.has(topic)); if (newTopics.length > 0) { console.log(`[DynamicConsumer] Found ${newTopics.length} new topics:`, newTopics); await this.subscribeToTopics(newTopics); } } catch (error) { console.error('[DynamicConsumer] Error in auto-subscribe:', error); } } } // Обработчик сообщений для отправки email async function processEmailTask(task, topic) { let deliveryLog = null; // Проверяем наличие всех необходимых полей if (!task.subject || !task.text || !task.html) { console.error(`[DynamicConsumer] Missing required fields in task:`, { hasSubject: !!task.subject, hasText: !!task.text, hasHtml: !!task.html, task: task }); return; } // Проверяем, не был ли уже отправлен email для этого подписчика в этой кампании const existingLog = await DeliveryLog.findOne({ where: { campaign_id: task.campaignId, subscriber_id: task.subscriberId } }); if (existingLog) { console.log(`[DynamicConsumer] Email already sent for campaign ${task.campaignId}, subscriber ${task.subscriberId}, status: ${existingLog.status}`); return; } // Создаем запись в DeliveryLog со статусом "sent" сразу (так как ENUM не поддерживает 'sending') console.log(`[DynamicConsumer] Creating DeliveryLog record for campaign ${task.campaignId}, subscriber ${task.subscriberId}`); deliveryLog = await DeliveryLog.create({ campaign_id: task.campaignId, subscriber_id: task.subscriberId, status: 'sent', sent_at: new Date(), error_message: null }); console.log(`[DynamicConsumer] DeliveryLog record created with ID: ${deliveryLog.id}`); try { console.log(`[DynamicConsumer] Getting SMTP server with ID: ${task.smtpServerId}`); // Получаем SMTP-сервер из БД const smtp = await SmtpServer.findByPk(task.smtpServerId); if (!smtp) { const errorMsg = `SMTP server not found for id ${task.smtpServerId}`; console.error(errorMsg); // Обновляем запись в DeliveryLog с ошибкой await deliveryLog.update({ status: 'failed', error_message: errorMsg }); // Устанавливаем статус подписчика как "unsubscribed" await updateSubscriberStatus(task.subscriberId, 'unsubscribed', errorMsg); return; } console.log(`[DynamicConsumer] Creating transporter for ${smtp.host}:${smtp.port}, secure: ${smtp.secure}`); const transporter = nodemailer.createTransport({ host: smtp.host, host: smtp.host, port: smtp.port, secure: smtp.secure, auth: { user: smtp.username, pass: smtp.password, }, }); console.log(`[DynamicConsumer] Transporter created successfully`); const mailOptions = { from: smtp.from_email, to: task.email, subject: task.subject, text: task.text, html: task.html, headers: { 'List-Unsubscribe': ``, } }; const info = await transporter.sendMail(mailOptions); console.log('Email sent:', info.messageId, 'to', task.email); // Обновляем запись в DeliveryLog с успешным статусом await deliveryLog.update({ status: 'sent', sent_at: new Date(), error_message: null }); console.log(`[DynamicConsumer] DeliveryLog updated for campaign ${task.campaignId}, subscriber ${task.subscriberId}`); } catch (err) { console.error('Error sending email:', err, 'task:', task); // Определяем тип ошибки и соответствующий статус const { status, reason } = analyzeSmtpError(err); // Обновляем запись в DeliveryLog с ошибкой if (deliveryLog) { await deliveryLog.update({ status: 'failed', error_message: err.message }); } // Обновляем статус подписчика в зависимости от типа ошибки await updateSubscriberStatus(task.subscriberId, status, reason); } } // Функция для анализа SMTP ошибок и определения статуса подписчика function analyzeSmtpError(error) { const errorMessage = error.message.toLowerCase(); const errorCode = error.code || ''; // Ошибки, указывающие на недействительный email или отписку const unsubscribeErrors = [ '550', '553', '554', // SMTP коды для недействительных адресов 'user not found', 'mailbox not found', 'address not found', 'recipient not found', 'user unknown', 'mailbox unavailable', 'address rejected', 'recipient rejected', 'bounce', 'hard bounce', 'permanent failure' ]; // Ошибки, указывающие на временные проблемы const temporaryErrors = [ '421', '450', '451', '452', // SMTP коды для временных ошибок 'temporary failure', 'temporarily unavailable', 'try again later', 'quota exceeded', 'rate limit', 'throttled' ]; // Проверяем на ошибки отписки for (const unsubscribeError of unsubscribeErrors) { if (errorMessage.includes(unsubscribeError) || errorCode.includes(unsubscribeError)) { return { status: 'unsubscribed', reason: `SMTP error: ${error.message}` }; } } // Проверяем на временные ошибки for (const tempError of temporaryErrors) { if (errorMessage.includes(tempError) || errorCode.includes(tempError)) { return { status: 'bounced', reason: `Temporary SMTP error: ${error.message}` }; } } // По умолчанию устанавливаем статус "unsubscribed" для любых других ошибок return { status: 'unsubscribed', reason: `SMTP error: ${error.message}` }; } // Функция для обновления статуса подписчика async function updateSubscriberStatus(subscriberId, status, reason = null) { try { const subscriber = await Subscriber.findByPk(subscriberId); if (subscriber) { await subscriber.update({ status: status, unsubscribed_at: status === 'unsubscribed' ? new Date() : null }); console.log(`[DynamicConsumer] Updated subscriber ${subscriberId} status to "${status}"${reason ? ` (reason: ${reason})` : ''}`); } else { console.error(`[DynamicConsumer] Subscriber ${subscriberId} not found`); } } catch (error) { console.error(`[DynamicConsumer] Error updating subscriber ${subscriberId} status:`, error); } } // Экспортируем экземпляр с установленным обработчиком export const dynamicConsumer = new DynamicConsumer(); dynamicConsumer.setMessageHandler(processEmailTask);