fix
This commit is contained in:
parent
b3141bf9ec
commit
9398fbfafc
@ -8,7 +8,12 @@ export default {
|
||||
if (Array.isArray(smtp_server_ids)) {
|
||||
await campaign.setSmtpServers(smtp_server_ids);
|
||||
}
|
||||
const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [SmtpServer] });
|
||||
const campaignWithSmtps = await Campaign.findByPk(campaign.id, {
|
||||
include: [
|
||||
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },
|
||||
SmtpServer
|
||||
]
|
||||
});
|
||||
res.status(201).json(campaignWithSmtps);
|
||||
} catch (err) {
|
||||
res.status(400).json({ error: err.message });
|
||||
@ -19,7 +24,11 @@ export default {
|
||||
const limit = parseInt(req.query.limit) || 20;
|
||||
const offset = parseInt(req.query.offset) || 0;
|
||||
const result = await Campaign.findAndCountAll({
|
||||
include: [EmailTemplateVersion, MailingGroup, SmtpServer],
|
||||
include: [
|
||||
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },
|
||||
MailingGroup,
|
||||
SmtpServer
|
||||
],
|
||||
limit,
|
||||
offset,
|
||||
order: [['id', 'ASC']]
|
||||
@ -31,7 +40,13 @@ export default {
|
||||
},
|
||||
async getById(req, res) {
|
||||
try {
|
||||
const campaign = await Campaign.findByPk(req.params.id, { include: [EmailTemplateVersion, MailingGroup, SmtpServer] });
|
||||
const campaign = await Campaign.findByPk(req.params.id, {
|
||||
include: [
|
||||
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },
|
||||
MailingGroup,
|
||||
SmtpServer
|
||||
]
|
||||
});
|
||||
if (!campaign) return res.status(404).json({ error: 'Campaign not found' });
|
||||
res.json(campaign);
|
||||
} catch (err) {
|
||||
@ -47,7 +62,12 @@ export default {
|
||||
if (Array.isArray(smtp_server_ids)) {
|
||||
await campaign.setSmtpServers(smtp_server_ids);
|
||||
}
|
||||
const campaignWithSmtps = await Campaign.findByPk(campaign.id, { include: [SmtpServer] });
|
||||
const campaignWithSmtps = await Campaign.findByPk(campaign.id, {
|
||||
include: [
|
||||
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },
|
||||
SmtpServer
|
||||
]
|
||||
});
|
||||
res.json(campaignWithSmtps);
|
||||
} catch (err) {
|
||||
res.status(400).json({ error: err.message });
|
||||
|
||||
@ -34,15 +34,15 @@ const SmtpServer = SmtpServerModel(sequelize);
|
||||
const CampaignSmtpServer = sequelize.define('CampaignSmtpServer', {}, { tableName: 'campaign_smtp_servers', timestamps: false });
|
||||
|
||||
// Связи
|
||||
MailingGroup.belongsToMany(Subscriber, { through: GroupSubscriber, foreignKey: 'group_id', otherKey: 'subscriber_id' });
|
||||
Subscriber.belongsToMany(MailingGroup, { through: GroupSubscriber, foreignKey: 'subscriber_id', otherKey: 'group_id' });
|
||||
MailingGroup.belongsToMany(Subscriber, { through: GroupSubscriber, foreignKey: 'group_id', otherKey: 'subscriber_id', as: 'Subscribers' });
|
||||
Subscriber.belongsToMany(MailingGroup, { through: GroupSubscriber, foreignKey: 'subscriber_id', otherKey: 'group_id', as: 'MailingGroups' });
|
||||
GroupSubscriber.belongsTo(MailingGroup, { foreignKey: 'group_id' });
|
||||
GroupSubscriber.belongsTo(Subscriber, { foreignKey: 'subscriber_id' });
|
||||
|
||||
EmailTemplate.hasMany(EmailTemplateVersion, { foreignKey: 'template_id' });
|
||||
EmailTemplateVersion.belongsTo(EmailTemplate, { foreignKey: 'template_id' });
|
||||
|
||||
Campaign.belongsTo(EmailTemplateVersion, { foreignKey: 'template_version_id' });
|
||||
Campaign.belongsTo(EmailTemplateVersion, { foreignKey: 'template_version_id', as: 'EmailTemplateVersion' });
|
||||
Campaign.belongsTo(MailingGroup, { foreignKey: 'group_id' });
|
||||
|
||||
DeliveryLog.belongsTo(Campaign, { foreignKey: 'campaign_id' });
|
||||
|
||||
@ -132,6 +132,15 @@ export class DynamicConsumer {
|
||||
if (this.messageHandler) {
|
||||
try {
|
||||
const task = JSON.parse(message.value.toString());
|
||||
console.log(`[DynamicConsumer] Processing task:`, {
|
||||
campaignId: task.campaignId,
|
||||
subscriberId: task.subscriberId,
|
||||
email: task.email,
|
||||
hasSubject: !!task.subject,
|
||||
hasText: !!task.text,
|
||||
hasHtml: !!task.html,
|
||||
smtpServerId: task.smtpServerId
|
||||
});
|
||||
await this.messageHandler(task, topic);
|
||||
} catch (error) {
|
||||
console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error);
|
||||
@ -178,6 +187,17 @@ export class DynamicConsumer {
|
||||
async function processEmailTask(task, topic) {
|
||||
let deliveryLog = null;
|
||||
|
||||
// Проверяем наличие всех необходимых полей
|
||||
if (!task.subject || !task.text || !task.html) {
|
||||
console.error(`[DynamicConsumer] Missing required fields in task:`, {
|
||||
hasSubject: !!task.subject,
|
||||
hasText: !!task.text,
|
||||
hasHtml: !!task.html,
|
||||
task: task
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
await new Promise((resolve) => setTimeout(resolve, 60_000));
|
||||
|
||||
|
||||
@ -14,6 +14,23 @@ async function getMxDomain(email) {
|
||||
}
|
||||
|
||||
export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
|
||||
console.log(`[queueFiller] Processing campaign ${campaign.id}:`, {
|
||||
hasEmailTemplateVersion: !!campaign.EmailTemplateVersion,
|
||||
subjectOverride: campaign.subject_override,
|
||||
templateVersionId: campaign.template_version_id,
|
||||
subscribersCount: subscribers.length,
|
||||
smtpServersCount: smtpServers.length
|
||||
});
|
||||
|
||||
if (campaign.EmailTemplateVersion) {
|
||||
console.log(`[queueFiller] EmailTemplateVersion details:`, {
|
||||
id: campaign.EmailTemplateVersion.id,
|
||||
subject: campaign.EmailTemplateVersion.subject,
|
||||
hasBodyText: !!campaign.EmailTemplateVersion.body_text,
|
||||
hasBodyHtml: !!campaign.EmailTemplateVersion.body_html
|
||||
});
|
||||
}
|
||||
|
||||
// Группируем подписчиков по домену
|
||||
const domainMap = {};
|
||||
for (const sub of subscribers) {
|
||||
@ -35,16 +52,33 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
|
||||
}
|
||||
|
||||
// Отправляем сообщения в топик
|
||||
const messages = subs.map(sub => ({
|
||||
const messages = subs.map(sub => {
|
||||
// Проверяем наличие версии шаблона
|
||||
if (!campaign.EmailTemplateVersion) {
|
||||
console.error(`[queueFiller] Campaign ${campaign.id} has no EmailTemplateVersion`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const message = {
|
||||
campaignId: campaign.id,
|
||||
subject: campaign.subject,
|
||||
text: campaign.text,
|
||||
html: campaign.html,
|
||||
subject: campaign.subject_override || campaign.EmailTemplateVersion.subject || 'No subject',
|
||||
text: campaign.EmailTemplateVersion.body_text || '',
|
||||
html: campaign.EmailTemplateVersion.body_html || '',
|
||||
mx: domain, // для обратной совместимости
|
||||
subscriberId: sub.id,
|
||||
email: sub.email,
|
||||
smtpServerId: smtp.id,
|
||||
}));
|
||||
};
|
||||
|
||||
console.log(`[queueFiller] Created message for campaign ${campaign.id}, subscriber ${sub.id}:`, {
|
||||
subject: message.subject,
|
||||
hasText: !!message.text,
|
||||
hasHtml: !!message.html,
|
||||
email: message.email
|
||||
});
|
||||
|
||||
return message;
|
||||
}).filter(msg => msg !== null);
|
||||
|
||||
for (const message of messages) {
|
||||
const sent = await topicManager.sendMessage(topicName, message);
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js';
|
||||
import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer, EmailTemplateVersion } from '../models/index.js';
|
||||
import { fillQueueForCampaign } from './queueFiller.js';
|
||||
import { Op } from 'sequelize';
|
||||
import { topicManager } from './topicManager.js';
|
||||
@ -14,10 +14,36 @@ export async function processScheduledCampaigns() {
|
||||
|
||||
const campaigns = await Campaign.findAll({
|
||||
where: { status: 'scheduled' },
|
||||
include: [MailingGroup, SmtpServer],
|
||||
include: [
|
||||
MailingGroup,
|
||||
SmtpServer,
|
||||
{
|
||||
model: EmailTemplateVersion,
|
||||
as: 'EmailTemplateVersion'
|
||||
}
|
||||
],
|
||||
});
|
||||
|
||||
console.log(`[queueFillerJob] Found ${campaigns.length} scheduled campaigns`);
|
||||
|
||||
if (campaigns.length === 0) {
|
||||
console.log(`[queueFillerJob] No scheduled campaigns found`);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const campaign of campaigns) {
|
||||
console.log(`[queueFillerJob] Processing campaign ${campaign.id}:`, {
|
||||
hasEmailTemplateVersion: !!campaign.EmailTemplateVersion,
|
||||
templateVersionId: campaign.template_version_id,
|
||||
subjectOverride: campaign.subject_override,
|
||||
groupId: campaign.group_id
|
||||
});
|
||||
|
||||
if (!campaign.EmailTemplateVersion) {
|
||||
console.log(`[queueFillerJob] Campaign ${campaign.id} has no EmailTemplateVersion, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
let offset = 0;
|
||||
let allSubscriberIds = [];
|
||||
|
||||
@ -36,6 +62,13 @@ export async function processScheduledCampaigns() {
|
||||
offset += BATCH_SIZE;
|
||||
}
|
||||
|
||||
console.log(`[queueFillerJob] Found ${allSubscriberIds.length} subscriber IDs for group ${campaign.group_id}`);
|
||||
|
||||
if (allSubscriberIds.length === 0) {
|
||||
console.log(`[queueFillerJob] No subscribers found for group ${campaign.group_id}, skipping campaign ${campaign.id}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) {
|
||||
const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE);
|
||||
const subscribers = await Subscriber.findAll({
|
||||
@ -44,7 +77,25 @@ export async function processScheduledCampaigns() {
|
||||
raw: true,
|
||||
});
|
||||
|
||||
if (subscribers.length === 0) {
|
||||
console.log(`[queueFillerJob] No subscribers found for batch IDs:`, batchIds);
|
||||
continue;
|
||||
}
|
||||
|
||||
console.log(`[queueFillerJob] Found ${subscribers.length} subscribers for batch:`,
|
||||
subscribers.map(s => ({ id: s.id, email: s.email }))
|
||||
);
|
||||
|
||||
const smtpServers = await campaign.getSmtpServers();
|
||||
console.log(`[queueFillerJob] Found ${smtpServers.length} SMTP servers for campaign ${campaign.id}:`,
|
||||
smtpServers.map(s => ({ id: s.id, name: s.name, host: s.host }))
|
||||
);
|
||||
|
||||
if (smtpServers.length === 0) {
|
||||
console.log(`[queueFillerJob] No SMTP servers found for campaign ${campaign.id}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
await fillQueueForCampaign(campaign, subscribers, smtpServers);
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user