diff --git a/mail-service/src/index.js b/mail-service/src/index.js index 554735e..8202b8a 100644 --- a/mail-service/src/index.js +++ b/mail-service/src/index.js @@ -54,6 +54,10 @@ setInterval(async () => { // Синхронизируем реестр с существующими топиками await topicManager.syncRegistry(); + // Очищаем старые топики перед началом работы + console.log('[index] Clearing old topics...'); + await topicManager.clearTopics('mail-send-'); + // Подписываемся на существующие топики const existingTopics = await topicManager.getAllTopics(); if (existingTopics.length > 0) { diff --git a/mail-service/src/service/dynamicConsumer.js b/mail-service/src/service/dynamicConsumer.js index 94172f5..f52feef 100644 --- a/mail-service/src/service/dynamicConsumer.js +++ b/mail-service/src/service/dynamicConsumer.js @@ -127,7 +127,8 @@ export class DynamicConsumer { try { await this.consumer.run({ eachMessage: async ({ topic, partition, message }) => { - console.log(`[DynamicConsumer] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}`); + const messageId = `${topic}-${partition}-${message.offset}`; + console.log(`[DynamicConsumer] Received message: ${messageId}`); if (this.messageHandler) { try { @@ -142,8 +143,11 @@ export class DynamicConsumer { smtpServerId: task.smtpServerId }); await this.messageHandler(task, topic); + console.log(`[DynamicConsumer] Successfully processed message: ${messageId}`); } catch (error) { console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error); + // Не подтверждаем сообщение при ошибке, чтобы оно было переотправлено + throw error; } } }, @@ -198,6 +202,19 @@ async function processEmailTask(task, topic) { return; } + // Проверяем, не был ли уже отправлен email для этого подписчика в этой кампании + const existingLog = await DeliveryLog.findOne({ + where: { + campaign_id: task.campaignId, + subscriber_id: task.subscriberId + } + }); + + if (existingLog) { + console.log(`[DynamicConsumer] Email already sent for campaign ${task.campaignId}, subscriber ${task.subscriberId}, status: ${existingLog.status}`); + return; + } + try { await new Promise((resolve) => setTimeout(resolve, 60_000)); diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index 25be98c..b9248c3 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -1,5 +1,7 @@ import dns from 'dns/promises'; import { topicManager } from './topicManager.js'; +import { DeliveryLog, Subscriber } from '../models/index.js'; +import { Op } from 'sequelize'; async function getMxDomain(email) { const domain = email.split('@')[1]; @@ -51,8 +53,48 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { continue; } - // Отправляем сообщения в топик - const messages = subs.map(sub => { + // Проверяем, какие подписчики уже получили email + const existingDeliveryLogs = await DeliveryLog.findAll({ + where: { + campaign_id: campaign.id, + subscriber_id: { [Op.in]: subs.map(s => s.id) } + }, + attributes: ['subscriber_id'], + raw: true + }); + + const sentSubscriberIds = new Set(existingDeliveryLogs.map(log => log.subscriber_id)); + const unsentSubs = subs.filter(sub => !sentSubscriberIds.has(sub.id)); + + // Дополнительно проверяем статус подписчиков + const activeSubs = await Subscriber.findAll({ + where: { + id: { [Op.in]: unsentSubs.map(s => s.id) }, + status: 'active' + }, + attributes: ['id', 'email'], + raw: true + }); + + const activeSubIds = new Set(activeSubs.map(s => s.id)); + const finalSubs = unsentSubs.filter(sub => activeSubIds.has(sub.id)); + + console.log(`[queueFiller] After status check: ${finalSubs.length} active subscribers`); + + if (finalSubs.length === 0) { + console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`); + continue; + } + + console.log(`[queueFiller] Found ${subs.length} subscribers, ${unsentSubs.length} unsent, ${finalSubs.length} active`); + + if (finalSubs.length === 0) { + console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`); + continue; + } + + // Отправляем сообщения в топик только для активных неотправленных подписчиков + const messages = finalSubs.map(sub => { // Проверяем наличие версии шаблона if (!campaign.EmailTemplateVersion) { console.error(`[queueFiller] Campaign ${campaign.id} has no EmailTemplateVersion`); @@ -81,9 +123,17 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { }).filter(msg => msg !== null); for (const message of messages) { + console.log(`[queueFiller] Sending message to topic ${topicName}:`, { + campaignId: message.campaignId, + subscriberId: message.subscriberId, + email: message.email + }); + const sent = await topicManager.sendMessage(topicName, message); if (!sent) { console.error(`[queueFiller] Failed to send message to topic: ${topicName}`); + } else { + console.log(`[queueFiller] Successfully sent message to topic: ${topicName}`); } } diff --git a/mail-service/src/service/queueFillerJob.js b/mail-service/src/service/queueFillerJob.js index 474aa85..53f08ab 100644 --- a/mail-service/src/service/queueFillerJob.js +++ b/mail-service/src/service/queueFillerJob.js @@ -1,4 +1,4 @@ -import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer, EmailTemplateVersion } from '../models/index.js'; +import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer, EmailTemplateVersion, DeliveryLog, sequelize } from '../models/index.js'; import { fillQueueForCampaign } from './queueFiller.js'; import { Op } from 'sequelize'; import { topicManager } from './topicManager.js'; @@ -13,7 +13,17 @@ async function clearKafkaTopics(prefix = 'mail-send-') { export async function processScheduledCampaigns() { const campaigns = await Campaign.findAll({ - where: { status: 'scheduled' }, + where: { + status: 'scheduled', + // Исключаем кампании, которые уже обрабатываются или отправлены + id: { + [Op.notIn]: sequelize.literal(`( + SELECT DISTINCT campaign_id + FROM delivery_logs + WHERE campaign_id = Campaign.id + )`) + } + }, include: [ MailingGroup, SmtpServer, @@ -69,10 +79,36 @@ export async function processScheduledCampaigns() { continue; } + // Проверяем, не завершена ли уже кампания + if (campaign.status === 'sent' || campaign.status === 'failed') { + console.log(`[queueFillerJob] Campaign ${campaign.id} already completed with status: ${campaign.status}`); + continue; + } + + // Проверяем, не обрабатывается ли уже кампания + if (campaign.status === 'sending') { + console.log(`[queueFillerJob] Campaign ${campaign.id} already being processed`); + continue; + } + + // Проверяем, есть ли уже записи в DeliveryLog для этой кампании + const campaignLogs = await DeliveryLog.findAll({ + where: { campaign_id: campaign.id }, + limit: 1 + }); + + if (campaignLogs.length > 0) { + console.log(`[queueFillerJob] Campaign ${campaign.id} already has delivery logs, skipping`); + continue; + } + for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) { const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE); const subscribers = await Subscriber.findAll({ - where: { id: { [Op.in]: batchIds } }, + where: { + id: { [Op.in]: batchIds }, + status: 'active' // Только активные подписчики + }, attributes: ['id', 'email'], raw: true, }); @@ -96,9 +132,10 @@ export async function processScheduledCampaigns() { continue; } - await fillQueueForCampaign(campaign, subscribers, smtpServers); - } - - await campaign.update({ status: 'sending' }); + await fillQueueForCampaign(campaign, subscribers, smtpServers); + } + + // Обновляем статус кампании на 'sending' только если были отправлены сообщения + await campaign.update({ status: 'sending' }); } } \ No newline at end of file