ads-marketing/mail-service/src/service/dynamicConsumer.js
romantarkin 9fa3e76810 fix
2025-07-29 15:13:54 +05:00

379 lines
12 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Kafka } from 'kafkajs';
import nodemailer from 'nodemailer';
import { SmtpServer, DeliveryLog, Subscriber } from '../models/index.js';
import { topicManager } from './topicManager.js';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'dynamic-consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
export class DynamicConsumer {
constructor() {
this.consumer = kafka.consumer({
groupId: process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group'
});
this.subscribedTopics = new Set();
this.isRunning = false;
this.messageHandler = null;
}
async connect() {
try {
await this.consumer.connect();
console.log('[DynamicConsumer] Connected to Kafka');
return true;
} catch (error) {
console.error('[DynamicConsumer] Connection error:', error);
return false;
}
}
async disconnect() {
try {
await this.consumer.disconnect();
this.subscribedTopics.clear();
this.isRunning = false;
console.log('[DynamicConsumer] Disconnected from Kafka');
} catch (error) {
console.error('[DynamicConsumer] Disconnect error:', error);
}
}
// Установка обработчика сообщений
setMessageHandler(handler) {
this.messageHandler = handler;
}
// Подписка на конкретный топик
async subscribeToTopic(topicName) {
if (this.subscribedTopics.has(topicName)) {
console.log(`[DynamicConsumer] Already subscribed to topic: ${topicName}`);
return true;
}
try {
await this.consumer.subscribe({
topic: topicName,
fromBeginning: false
});
this.subscribedTopics.add(topicName);
console.log(`[DynamicConsumer] Subscribed to topic: ${topicName}`);
// Если consumer еще не запущен, запускаем его
if (!this.isRunning) {
await this.startConsuming();
}
return true;
} catch (error) {
console.error(`[DynamicConsumer] Error subscribing to ${topicName}:`, error);
return false;
}
}
// Отписка от топика
async unsubscribeFromTopic(topicName) {
if (!this.subscribedTopics.has(topicName)) {
console.log(`[DynamicConsumer] Not subscribed to topic: ${topicName}`);
return true;
}
try {
await this.consumer.stop();
this.subscribedTopics.delete(topicName);
// Переподписываемся на оставшиеся топики
if (this.subscribedTopics.size > 0) {
await this.consumer.subscribe(
Array.from(this.subscribedTopics).map(topic => ({
topic,
fromBeginning: false
}))
);
await this.startConsuming();
}
console.log(`[DynamicConsumer] Unsubscribed from topic: ${topicName}`);
return true;
} catch (error) {
console.error(`[DynamicConsumer] Error unsubscribing from ${topicName}:`, error);
return false;
}
}
// Подписка на несколько топиков
async subscribeToTopics(topicNames) {
const results = [];
for (const topicName of topicNames) {
const result = await this.subscribeToTopic(topicName);
results.push({ topic: topicName, success: result });
}
return results;
}
// Получение списка подписанных топиков
getSubscribedTopics() {
return Array.from(this.subscribedTopics);
}
// Запуск обработки сообщений
async startConsuming() {
if (this.isRunning) {
console.log('[DynamicConsumer] Already consuming messages');
return;
}
try {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const messageId = `${topic}-${partition}-${message.offset}`;
console.log(`[DynamicConsumer] Received message: ${messageId}`);
if (this.messageHandler) {
try {
const task = JSON.parse(message.value.toString());
console.log(`[DynamicConsumer] Processing task:`, {
campaignId: task.campaignId,
subscriberId: task.subscriberId,
email: task.email,
hasSubject: !!task.subject,
hasText: !!task.text,
hasHtml: !!task.html,
smtpServerId: task.smtpServerId
});
await this.messageHandler(task, topic);
console.log(`[DynamicConsumer] Successfully processed message: ${messageId}`);
} catch (error) {
console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error);
// Не подтверждаем сообщение при ошибке, чтобы оно было переотправлено
throw error;
}
}
},
});
this.isRunning = true;
console.log('[DynamicConsumer] Started consuming messages');
} catch (error) {
console.error('[DynamicConsumer] Error starting consumer:', error);
}
}
// Остановка обработки сообщений
async stopConsuming() {
try {
await this.consumer.stop();
this.isRunning = false;
console.log('[DynamicConsumer] Stopped consuming messages');
} catch (error) {
console.error('[DynamicConsumer] Error stopping consumer:', error);
}
}
// Автоматическое обнаружение и подписка на новые топики
async autoSubscribeToNewTopics() {
try {
const allTopics = await topicManager.getAllTopics();
const newTopics = allTopics.filter(topic => !this.subscribedTopics.has(topic));
if (newTopics.length > 0) {
console.log(`[DynamicConsumer] Found ${newTopics.length} new topics:`, newTopics);
await this.subscribeToTopics(newTopics);
}
} catch (error) {
console.error('[DynamicConsumer] Error in auto-subscribe:', error);
}
}
}
// Обработчик сообщений для отправки email
async function processEmailTask(task, topic) {
let deliveryLog = null;
// Проверяем наличие всех необходимых полей
if (!task.subject || !task.text || !task.html) {
console.error(`[DynamicConsumer] Missing required fields in task:`, {
hasSubject: !!task.subject,
hasText: !!task.text,
hasHtml: !!task.html,
task: task
});
return;
}
// Проверяем, не был ли уже отправлен email для этого подписчика в этой кампании
const existingLog = await DeliveryLog.findOne({
where: {
campaign_id: task.campaignId,
subscriber_id: task.subscriberId
}
});
if (existingLog) {
console.log(`[DynamicConsumer] Email already sent for campaign ${task.campaignId}, subscriber ${task.subscriberId}, status: ${existingLog.status}`);
return;
}
try {
await new Promise((resolve) => setTimeout(resolve, 60_000));
// Создаем запись в DeliveryLog со статусом "sending"
deliveryLog = await DeliveryLog.create({
campaign_id: task.campaignId,
subscriber_id: task.subscriberId,
status: 'sent',
sent_at: new Date(),
error_message: null
});
// Получаем SMTP-сервер из БД
const smtp = await SmtpServer.findByPk(task.smtpServerId);
if (!smtp) {
const errorMsg = `SMTP server not found for id ${task.smtpServerId}`;
console.error(errorMsg);
// Обновляем запись в DeliveryLog с ошибкой
await deliveryLog.update({
status: 'failed',
error_message: errorMsg
});
// Устанавливаем статус подписчика как "unsubscribed"
await updateSubscriberStatus(task.subscriberId, 'unsubscribed', errorMsg);
return;
}
const transporter = nodemailer.createTransport({
host: smtp.host,
port: smtp.port,
secure: smtp.secure,
auth: {
user: smtp.username,
pass: smtp.password,
},
});
const mailOptions = {
from: smtp.from_email,
to: task.email,
subject: task.subject,
text: task.text,
html: task.html,
headers: {
'List-Unsubscribe': `<mailto:${smtp.from_email}?subject=unsubscribe>`,
}
};
const info = await transporter.sendMail(mailOptions);
console.log('Email sent:', info.messageId, 'to', task.email);
// Обновляем запись в DeliveryLog с успешным статусом
await deliveryLog.update({
status: 'sent',
sent_at: new Date(),
error_message: null
});
console.log(`[DynamicConsumer] DeliveryLog updated for campaign ${task.campaignId}, subscriber ${task.subscriberId}`);
} catch (err) {
console.error('Error sending email:', err, 'task:', task);
// Определяем тип ошибки и соответствующий статус
const { status, reason } = analyzeSmtpError(err);
// Обновляем запись в DeliveryLog с ошибкой
if (deliveryLog) {
await deliveryLog.update({
status: 'failed',
error_message: err.message
});
}
// Обновляем статус подписчика в зависимости от типа ошибки
await updateSubscriberStatus(task.subscriberId, status, reason);
}
}
// Функция для анализа SMTP ошибок и определения статуса подписчика
function analyzeSmtpError(error) {
const errorMessage = error.message.toLowerCase();
const errorCode = error.code || '';
// Ошибки, указывающие на недействительный email или отписку
const unsubscribeErrors = [
'550', '553', '554', // SMTP коды для недействительных адресов
'user not found',
'mailbox not found',
'address not found',
'recipient not found',
'user unknown',
'mailbox unavailable',
'address rejected',
'recipient rejected',
'bounce',
'hard bounce',
'permanent failure'
];
// Ошибки, указывающие на временные проблемы
const temporaryErrors = [
'421', '450', '451', '452', // SMTP коды для временных ошибок
'temporary failure',
'temporarily unavailable',
'try again later',
'quota exceeded',
'rate limit',
'throttled'
];
// Проверяем на ошибки отписки
for (const unsubscribeError of unsubscribeErrors) {
if (errorMessage.includes(unsubscribeError) || errorCode.includes(unsubscribeError)) {
return {
status: 'unsubscribed',
reason: `SMTP error: ${error.message}`
};
}
}
// Проверяем на временные ошибки
for (const tempError of temporaryErrors) {
if (errorMessage.includes(tempError) || errorCode.includes(tempError)) {
return {
status: 'bounced',
reason: `Temporary SMTP error: ${error.message}`
};
}
}
// По умолчанию устанавливаем статус "unsubscribed" для любых других ошибок
return {
status: 'unsubscribed',
reason: `SMTP error: ${error.message}`
};
}
// Функция для обновления статуса подписчика
async function updateSubscriberStatus(subscriberId, status, reason = null) {
try {
const subscriber = await Subscriber.findByPk(subscriberId);
if (subscriber) {
await subscriber.update({
status: status,
unsubscribed_at: status === 'unsubscribed' ? new Date() : null
});
console.log(`[DynamicConsumer] Updated subscriber ${subscriberId} status to "${status}"${reason ? ` (reason: ${reason})` : ''}`);
} else {
console.error(`[DynamicConsumer] Subscriber ${subscriberId} not found`);
}
} catch (error) {
console.error(`[DynamicConsumer] Error updating subscriber ${subscriberId} status:`, error);
}
}
// Экспортируем экземпляр с установленным обработчиком
export const dynamicConsumer = new DynamicConsumer();
dynamicConsumer.setMessageHandler(processEmailTask);