From 9398fbfafc4d5bdef05cc0a5e6adf3de1f6a264a Mon Sep 17 00:00:00 2001 From: romantarkin Date: Tue, 29 Jul 2025 12:09:22 +0500 Subject: [PATCH] fix --- .../src/controllers/campaignController.js | 28 ++++++++-- mail-service/src/models/index.js | 6 +- mail-service/src/service/dynamicConsumer.js | 20 +++++++ mail-service/src/service/queueFiller.js | 54 ++++++++++++++---- mail-service/src/service/queueFillerJob.js | 55 ++++++++++++++++++- 5 files changed, 144 insertions(+), 19 deletions(-) diff --git a/mail-service/src/controllers/campaignController.js b/mail-service/src/controllers/campaignController.js index cdca558..0e8eb4d 100644 --- a/mail-service/src/controllers/campaignController.js +++ b/mail-service/src/controllers/campaignController.js @@ -8,7 +8,12 @@ export default { if (Array.isArray(smtp_server_ids)) { await campaign.setSmtpServers(smtp_server_ids); } - const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [SmtpServer] }); + const campaignWithSmtps = await Campaign.findByPk(campaign.id, { + include: [ + { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, + SmtpServer + ] + }); res.status(201).json(campaignWithSmtps); } catch (err) { res.status(400).json({ error: err.message }); @@ -19,7 +24,11 @@ export default { const limit = parseInt(req.query.limit) || 20; const offset = parseInt(req.query.offset) || 0; const result = await Campaign.findAndCountAll({ - include: [EmailTemplateVersion, MailingGroup, SmtpServer], + include: [ + { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, + MailingGroup, + SmtpServer + ], limit, offset, order: [['id', 'ASC']] @@ -31,7 +40,13 @@ export default { }, async getById(req, res) { try { - const campaign = await Campaign.findByPk(req.params.id, { include: [EmailTemplateVersion, MailingGroup, SmtpServer] }); + const campaign = await Campaign.findByPk(req.params.id, { + include: [ + { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, + MailingGroup, + SmtpServer + ] + }); if (!campaign) return res.status(404).json({ error: 'Campaign not found' }); res.json(campaign); } catch (err) { @@ -47,7 +62,12 @@ export default { if (Array.isArray(smtp_server_ids)) { await campaign.setSmtpServers(smtp_server_ids); } - const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [SmtpServer] }); + const campaignWithSmtps = await Campaign.findByPk(campaign.id, { + include: [ + { model: EmailTemplateVersion, as: 'EmailTemplateVersion' }, + SmtpServer + ] + }); res.json(campaignWithSmtps); } catch (err) { res.status(400).json({ error: err.message }); diff --git a/mail-service/src/models/index.js b/mail-service/src/models/index.js index 8ae0c00..4122ec6 100644 --- a/mail-service/src/models/index.js +++ b/mail-service/src/models/index.js @@ -34,15 +34,15 @@ const SmtpServer = SmtpServerModel(sequelize); const CampaignSmtpServer = sequelize.define('CampaignSmtpServer', {}, { tableName: 'campaign_smtp_servers', timestamps: false }); // Связи -MailingGroup.belongsToMany(Subscriber, { through: GroupSubscriber, foreignKey: 'group_id', otherKey: 'subscriber_id' }); -Subscriber.belongsToMany(MailingGroup, { through: GroupSubscriber, foreignKey: 'subscriber_id', otherKey: 'group_id' }); +MailingGroup.belongsToMany(Subscriber, { through: GroupSubscriber, foreignKey: 'group_id', otherKey: 'subscriber_id', as: 'Subscribers' }); +Subscriber.belongsToMany(MailingGroup, { through: GroupSubscriber, foreignKey: 'subscriber_id', otherKey: 'group_id', as: 'MailingGroups' }); GroupSubscriber.belongsTo(MailingGroup, { foreignKey: 'group_id' }); GroupSubscriber.belongsTo(Subscriber, { foreignKey: 'subscriber_id' }); EmailTemplate.hasMany(EmailTemplateVersion, { foreignKey: 'template_id' }); EmailTemplateVersion.belongsTo(EmailTemplate, { foreignKey: 'template_id' }); -Campaign.belongsTo(EmailTemplateVersion, { foreignKey: 'template_version_id' }); +Campaign.belongsTo(EmailTemplateVersion, { foreignKey: 'template_version_id', as: 'EmailTemplateVersion' }); Campaign.belongsTo(MailingGroup, { foreignKey: 'group_id' }); DeliveryLog.belongsTo(Campaign, { foreignKey: 'campaign_id' }); diff --git a/mail-service/src/service/dynamicConsumer.js b/mail-service/src/service/dynamicConsumer.js index b6b5c24..94172f5 100644 --- a/mail-service/src/service/dynamicConsumer.js +++ b/mail-service/src/service/dynamicConsumer.js @@ -132,6 +132,15 @@ export class DynamicConsumer { if (this.messageHandler) { try { const task = JSON.parse(message.value.toString()); + console.log(`[DynamicConsumer] Processing task:`, { + campaignId: task.campaignId, + subscriberId: task.subscriberId, + email: task.email, + hasSubject: !!task.subject, + hasText: !!task.text, + hasHtml: !!task.html, + smtpServerId: task.smtpServerId + }); await this.messageHandler(task, topic); } catch (error) { console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error); @@ -178,6 +187,17 @@ export class DynamicConsumer { async function processEmailTask(task, topic) { let deliveryLog = null; + // Проверяем наличие всех необходимых полей + if (!task.subject || !task.text || !task.html) { + console.error(`[DynamicConsumer] Missing required fields in task:`, { + hasSubject: !!task.subject, + hasText: !!task.text, + hasHtml: !!task.html, + task: task + }); + return; + } + try { await new Promise((resolve) => setTimeout(resolve, 60_000)); diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index ff01cf0..25be98c 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -14,6 +14,23 @@ async function getMxDomain(email) { } export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { + console.log(`[queueFiller] Processing campaign ${campaign.id}:`, { + hasEmailTemplateVersion: !!campaign.EmailTemplateVersion, + subjectOverride: campaign.subject_override, + templateVersionId: campaign.template_version_id, + subscribersCount: subscribers.length, + smtpServersCount: smtpServers.length + }); + + if (campaign.EmailTemplateVersion) { + console.log(`[queueFiller] EmailTemplateVersion details:`, { + id: campaign.EmailTemplateVersion.id, + subject: campaign.EmailTemplateVersion.subject, + hasBodyText: !!campaign.EmailTemplateVersion.body_text, + hasBodyHtml: !!campaign.EmailTemplateVersion.body_html + }); + } + // Группируем подписчиков по домену const domainMap = {}; for (const sub of subscribers) { @@ -35,16 +52,33 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { } // Отправляем сообщения в топик - const messages = subs.map(sub => ({ - campaignId: campaign.id, - subject: campaign.subject, - text: campaign.text, - html: campaign.html, - mx: domain, // для обратной совместимости - subscriberId: sub.id, - email: sub.email, - smtpServerId: smtp.id, - })); + const messages = subs.map(sub => { + // Проверяем наличие версии шаблона + if (!campaign.EmailTemplateVersion) { + console.error(`[queueFiller] Campaign ${campaign.id} has no EmailTemplateVersion`); + return null; + } + + const message = { + campaignId: campaign.id, + subject: campaign.subject_override || campaign.EmailTemplateVersion.subject || 'No subject', + text: campaign.EmailTemplateVersion.body_text || '', + html: campaign.EmailTemplateVersion.body_html || '', + mx: domain, // для обратной совместимости + subscriberId: sub.id, + email: sub.email, + smtpServerId: smtp.id, + }; + + console.log(`[queueFiller] Created message for campaign ${campaign.id}, subscriber ${sub.id}:`, { + subject: message.subject, + hasText: !!message.text, + hasHtml: !!message.html, + email: message.email + }); + + return message; + }).filter(msg => msg !== null); for (const message of messages) { const sent = await topicManager.sendMessage(topicName, message); diff --git a/mail-service/src/service/queueFillerJob.js b/mail-service/src/service/queueFillerJob.js index bff3f15..474aa85 100644 --- a/mail-service/src/service/queueFillerJob.js +++ b/mail-service/src/service/queueFillerJob.js @@ -1,4 +1,4 @@ -import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js'; +import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer, EmailTemplateVersion } from '../models/index.js'; import { fillQueueForCampaign } from './queueFiller.js'; import { Op } from 'sequelize'; import { topicManager } from './topicManager.js'; @@ -14,10 +14,36 @@ export async function processScheduledCampaigns() { const campaigns = await Campaign.findAll({ where: { status: 'scheduled' }, - include: [MailingGroup, SmtpServer], + include: [ + MailingGroup, + SmtpServer, + { + model: EmailTemplateVersion, + as: 'EmailTemplateVersion' + } + ], }); + console.log(`[queueFillerJob] Found ${campaigns.length} scheduled campaigns`); + + if (campaigns.length === 0) { + console.log(`[queueFillerJob] No scheduled campaigns found`); + return; + } + for (const campaign of campaigns) { + console.log(`[queueFillerJob] Processing campaign ${campaign.id}:`, { + hasEmailTemplateVersion: !!campaign.EmailTemplateVersion, + templateVersionId: campaign.template_version_id, + subjectOverride: campaign.subject_override, + groupId: campaign.group_id + }); + + if (!campaign.EmailTemplateVersion) { + console.log(`[queueFillerJob] Campaign ${campaign.id} has no EmailTemplateVersion, skipping`); + continue; + } + let offset = 0; let allSubscriberIds = []; @@ -36,6 +62,13 @@ export async function processScheduledCampaigns() { offset += BATCH_SIZE; } + console.log(`[queueFillerJob] Found ${allSubscriberIds.length} subscriber IDs for group ${campaign.group_id}`); + + if (allSubscriberIds.length === 0) { + console.log(`[queueFillerJob] No subscribers found for group ${campaign.group_id}, skipping campaign ${campaign.id}`); + continue; + } + for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) { const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE); const subscribers = await Subscriber.findAll({ @@ -44,7 +77,25 @@ export async function processScheduledCampaigns() { raw: true, }); + if (subscribers.length === 0) { + console.log(`[queueFillerJob] No subscribers found for batch IDs:`, batchIds); + continue; + } + + console.log(`[queueFillerJob] Found ${subscribers.length} subscribers for batch:`, + subscribers.map(s => ({ id: s.id, email: s.email })) + ); + const smtpServers = await campaign.getSmtpServers(); + console.log(`[queueFillerJob] Found ${smtpServers.length} SMTP servers for campaign ${campaign.id}:`, + smtpServers.map(s => ({ id: s.id, name: s.name, host: s.host })) + ); + + if (smtpServers.length === 0) { + console.log(`[queueFillerJob] No SMTP servers found for campaign ${campaign.id}`); + continue; + } + await fillQueueForCampaign(campaign, subscribers, smtpServers); }