From 30922e00b137870fd726ef33576c670b3c95d72b Mon Sep 17 00:00:00 2001 From: romantarkin Date: Mon, 18 Aug 2025 13:58:55 +0500 Subject: [PATCH] fix --- .../src/controllers/campaignController.js | 12 +- mail-service/src/models/campaign.js | 1 + mail-service/src/models/index.js | 7 +- mail-service/src/service/queueFiller.js | 187 +++++++++--------- mail-service/src/service/queueFillerJob.js | 25 +-- 5 files changed, 112 insertions(+), 120 deletions(-) diff --git a/mail-service/src/controllers/campaignController.js b/mail-service/src/controllers/campaignController.js index 3cdb0fa..eb51ec3 100644 --- a/mail-service/src/controllers/campaignController.js +++ b/mail-service/src/controllers/campaignController.js @@ -3,11 +3,7 @@ import { Campaign, EmailTemplateVersion, MailingGroup, SmtpServer } from '../mod export default { async create(req, res) { try { - const { smtp_server_id, ...campaignData } = req.body; - const campaign = await Campaign.create(campaignData); - if (Array.isArray([smtp_server_id])) { - await campaign.setSmtpServers([smtp_server_id]); - } + const campaign = await Campaign.create(req.body); const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [ { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, @@ -55,13 +51,9 @@ export default { }, async update(req, res) { try { - const { smtp_server_id, ...campaignData } = req.body; const campaign = await Campaign.findByPk(req.params.id); if (!campaign) return res.status(404).json({ error: 'Campaign not found' }); - await campaign.update(campaignData); - if (Array.isArray([smtp_server_id])) { - await campaign.setSmtpServers([smtp_server_id]); - } + await campaign.update(req.body); const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [ { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, diff --git a/mail-service/src/models/campaign.js b/mail-service/src/models/campaign.js index 31b7c1f..710f0b0 100644 --- a/mail-service/src/models/campaign.js +++ b/mail-service/src/models/campaign.js @@ -6,6 +6,7 @@ export default (sequelize) => { user_id: { type: DataTypes.INTEGER, allowNull: false }, template_version_id: { type: DataTypes.INTEGER, allowNull: false }, group_id: { type: DataTypes.INTEGER, allowNull: false }, + smtp_server_id: { type: DataTypes.INTEGER, allowNull: false }, subject_override: { type: DataTypes.STRING }, scheduled_at: { type: DataTypes.DATE }, status: { type: DataTypes.ENUM('draft', 'scheduled', 'sending', 'sent', 'failed'), defaultValue: 'draft' }, diff --git a/mail-service/src/models/index.js b/mail-service/src/models/index.js index 4122ec6..4a3a6b9 100644 --- a/mail-service/src/models/index.js +++ b/mail-service/src/models/index.js @@ -30,7 +30,7 @@ const Campaign = CampaignModel(sequelize); const DeliveryLog = DeliveryLogModel(sequelize); const SmtpServer = SmtpServerModel(sequelize); -// Промежуточная таблица для связи many-to-many +// Промежуточная таблица для связи many-to-many (оставляем для обратной совместимости) const CampaignSmtpServer = sequelize.define('CampaignSmtpServer', {}, { tableName: 'campaign_smtp_servers', timestamps: false }); // Связи @@ -48,6 +48,11 @@ Campaign.belongsTo(MailingGroup, { foreignKey: 'group_id' }); DeliveryLog.belongsTo(Campaign, { foreignKey: 'campaign_id' }); DeliveryLog.belongsTo(Subscriber, { foreignKey: 'subscriber_id' }); +// Связь one-to-many между Campaign и SmtpServer +Campaign.belongsTo(SmtpServer, { foreignKey: 'smtp_server_id' }); +SmtpServer.hasMany(Campaign, { foreignKey: 'smtp_server_id' }); + +// Оставляем старую связь many-to-many для обратной совместимости Campaign.belongsToMany(SmtpServer, { through: CampaignSmtpServer, foreignKey: 'campaign_id', otherKey: 'smtp_server_id' }); SmtpServer.belongsToMany(Campaign, { through: CampaignSmtpServer, foreignKey: 'smtp_server_id', otherKey: 'campaign_id' }); diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index b9248c3..f9addf7 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -15,13 +15,13 @@ async function getMxDomain(email) { return domain; } -export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { +export async function fillQueueForCampaign(campaign, subscribers, smtpServer) { console.log(`[queueFiller] Processing campaign ${campaign.id}:`, { hasEmailTemplateVersion: !!campaign.EmailTemplateVersion, subjectOverride: campaign.subject_override, templateVersionId: campaign.template_version_id, subscribersCount: subscribers.length, - smtpServersCount: smtpServers.length + smtpServerId: smtpServer?.id }); if (campaign.EmailTemplateVersion) { @@ -41,103 +41,96 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { domainMap[domain].push(sub); } - // Обрабатываем каждый домен и SMTP сервер + // Обрабатываем каждый домен с одним SMTP сервером for (const [domain, subs] of Object.entries(domainMap)) { - for (const smtp of smtpServers) { - const topicName = `mail-send-${domain}-${smtp.id}`; + const topicName = `mail-send-${domain}-${smtpServer.id}`; - // Создаем топик если его нет - const topicCreated = await topicManager.createTopic(topicName); - if (!topicCreated) { - console.error(`[queueFiller] Failed to create topic: ${topicName}`); - continue; - } - - // Проверяем, какие подписчики уже получили email - const existingDeliveryLogs = await DeliveryLog.findAll({ - where: { - campaign_id: campaign.id, - subscriber_id: { [Op.in]: subs.map(s => s.id) } - }, - attributes: ['subscriber_id'], - raw: true - }); - - const sentSubscriberIds = new Set(existingDeliveryLogs.map(log => log.subscriber_id)); - const unsentSubs = subs.filter(sub => !sentSubscriberIds.has(sub.id)); - - // Дополнительно проверяем статус подписчиков - const activeSubs = await Subscriber.findAll({ - where: { - id: { [Op.in]: unsentSubs.map(s => s.id) }, - status: 'active' - }, - attributes: ['id', 'email'], - raw: true - }); - - const activeSubIds = new Set(activeSubs.map(s => s.id)); - const finalSubs = unsentSubs.filter(sub => activeSubIds.has(sub.id)); - - console.log(`[queueFiller] After status check: ${finalSubs.length} active subscribers`); - - if (finalSubs.length === 0) { - console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`); - continue; - } - - console.log(`[queueFiller] Found ${subs.length} subscribers, ${unsentSubs.length} unsent, ${finalSubs.length} active`); - - if (finalSubs.length === 0) { - console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`); - continue; - } - - // Отправляем сообщения в топик только для активных неотправленных подписчиков - const messages = finalSubs.map(sub => { - // Проверяем наличие версии шаблона - if (!campaign.EmailTemplateVersion) { - console.error(`[queueFiller] Campaign ${campaign.id} has no EmailTemplateVersion`); - return null; - } - - const message = { - campaignId: campaign.id, - subject: campaign.subject_override || campaign.EmailTemplateVersion.subject || 'No subject', - text: campaign.EmailTemplateVersion.body_text || '', - html: campaign.EmailTemplateVersion.body_html || '', - mx: domain, // для обратной совместимости - subscriberId: sub.id, - email: sub.email, - smtpServerId: smtp.id, - }; - - console.log(`[queueFiller] Created message for campaign ${campaign.id}, subscriber ${sub.id}:`, { - subject: message.subject, - hasText: !!message.text, - hasHtml: !!message.html, - email: message.email - }); - - return message; - }).filter(msg => msg !== null); - - for (const message of messages) { - console.log(`[queueFiller] Sending message to topic ${topicName}:`, { - campaignId: message.campaignId, - subscriberId: message.subscriberId, - email: message.email - }); - - const sent = await topicManager.sendMessage(topicName, message); - if (!sent) { - console.error(`[queueFiller] Failed to send message to topic: ${topicName}`); - } else { - console.log(`[queueFiller] Successfully sent message to topic: ${topicName}`); - } - } - - console.log(`[queueFiller] Sent ${messages.length} messages to topic: ${topicName}`); + // Создаем топик если его нет + const topicCreated = await topicManager.createTopic(topicName); + if (!topicCreated) { + console.error(`[queueFiller] Failed to create topic: ${topicName}`); + continue; } + + // Проверяем, какие подписчики уже получили email + const existingDeliveryLogs = await DeliveryLog.findAll({ + where: { + campaign_id: campaign.id, + subscriber_id: { [Op.in]: subs.map(s => s.id) } + }, + attributes: ['subscriber_id'], + raw: true + }); + + const sentSubscriberIds = new Set(existingDeliveryLogs.map(log => log.subscriber_id)); + const unsentSubs = subs.filter(sub => !sentSubscriberIds.has(sub.id)); + + // Дополнительно проверяем статус подписчиков + const activeSubs = await Subscriber.findAll({ + where: { + id: { [Op.in]: unsentSubs.map(s => s.id) }, + status: 'active' + }, + attributes: ['id', 'email'], + raw: true + }); + + const activeSubIds = new Set(activeSubs.map(s => s.id)); + const finalSubs = unsentSubs.filter(sub => activeSubIds.has(sub.id)); + + console.log(`[queueFiller] After status check: ${finalSubs.length} active subscribers`); + + if (finalSubs.length === 0) { + console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`); + continue; + } + + console.log(`[queueFiller] Found ${subs.length} subscribers, ${unsentSubs.length} unsent, ${finalSubs.length} active`); + + // Отправляем сообщения в топик только для активных неотправленных подписчиков + const messages = finalSubs.map(sub => { + // Проверяем наличие версии шаблона + if (!campaign.EmailTemplateVersion) { + console.error(`[queueFiller] Campaign ${campaign.id} has no EmailTemplateVersion`); + return null; + } + + const message = { + campaignId: campaign.id, + subject: campaign.subject_override || campaign.EmailTemplateVersion.subject || 'No subject', + text: campaign.EmailTemplateVersion.body_text || '', + html: campaign.EmailTemplateVersion.body_html || '', + mx: domain, // для обратной совместимости + subscriberId: sub.id, + email: sub.email, + smtpServerId: smtpServer.id, + }; + + console.log(`[queueFiller] Created message for campaign ${campaign.id}, subscriber ${sub.id}:`, { + subject: message.subject, + hasText: !!message.text, + hasHtml: !!message.html, + email: message.email + }); + + return message; + }).filter(msg => msg !== null); + + for (const message of messages) { + console.log(`[queueFiller] Sending message to topic ${topicName}:`, { + campaignId: message.campaignId, + subscriberId: message.subscriberId, + email: message.email + }); + + const sent = await topicManager.sendMessage(topicName, message); + if (!sent) { + console.error(`[queueFiller] Failed to send message to topic: ${topicName}`); + } else { + console.log(`[queueFiller] Successfully sent message to topic: ${topicName}`); + } + } + + console.log(`[queueFiller] Sent ${messages.length} messages to topic: ${topicName}`); } } \ No newline at end of file diff --git a/mail-service/src/service/queueFillerJob.js b/mail-service/src/service/queueFillerJob.js index 8bf3d73..399af88 100644 --- a/mail-service/src/service/queueFillerJob.js +++ b/mail-service/src/service/queueFillerJob.js @@ -24,7 +24,8 @@ export async function processScheduledCampaigns() { { model: EmailTemplateVersion, as: 'EmailTemplateVersion' - } + }, + SmtpServer ], }); @@ -117,17 +118,17 @@ export async function processScheduledCampaigns() { subscribers.map(s => ({ id: s.id, email: s.email })) ); - const smtpServers = await campaign.getSmtpServers(); - console.log(`[queueFillerJob] Found ${smtpServers.length} SMTP servers for campaign ${campaign.id}:`, - smtpServers.map(s => ({ id: s.id, name: s.name, host: s.host })) - ); - - if (smtpServers.length === 0) { - console.log(`[queueFillerJob] No SMTP servers found for campaign ${campaign.id}`); - continue; - } - - await fillQueueForCampaign(campaign, subscribers, smtpServers); + // Проверяем наличие SMTP сервера + if (!campaign.SmtpServer) { + console.log(`[queueFillerJob] No SMTP server found for campaign ${campaign.id} with smtp_server_id: ${campaign.smtp_server_id}`); + continue; + } + + console.log(`[queueFillerJob] Found SMTP server for campaign ${campaign.id}:`, + { id: campaign.SmtpServer.id, name: campaign.SmtpServer.name, host: campaign.SmtpServer.host } + ); + + await fillQueueForCampaign(campaign, subscribers, campaign.SmtpServer); } // Обновляем статус кампании на 'sending' только если были отправлены сообщения