This commit is contained in:
romantarkin 2025-08-18 13:58:55 +05:00
parent 293df3b004
commit 30922e00b1
5 changed files with 112 additions and 120 deletions

View File

@ -3,11 +3,7 @@ import { Campaign, EmailTemplateVersion, MailingGroup, SmtpServer } from '../mod
export default {
async create(req, res) {
try {
const { smtp_server_id, ...campaignData } = req.body;
const campaign = await Campaign.create(campaignData);
if (Array.isArray([smtp_server_id])) {
await campaign.setSmtpServers([smtp_server_id]);
}
const campaign = await Campaign.create(req.body);
const campaignWithSmtps = await Campaign.findByPk(campaign.id, {
include: [
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },
@ -55,13 +51,9 @@ export default {
},
async update(req, res) {
try {
const { smtp_server_id, ...campaignData } = req.body;
const campaign = await Campaign.findByPk(req.params.id);
if (!campaign) return res.status(404).json({ error: 'Campaign not found' });
await campaign.update(campaignData);
if (Array.isArray([smtp_server_id])) {
await campaign.setSmtpServers([smtp_server_id]);
}
await campaign.update(req.body);
const campaignWithSmtps = await Campaign.findByPk(campaign.id, {
include: [
{ model: EmailTemplateVersion, as: 'EmailTemplateVersion' },

View File

@ -6,6 +6,7 @@ export default (sequelize) => {
user_id: { type: DataTypes.INTEGER, allowNull: false },
template_version_id: { type: DataTypes.INTEGER, allowNull: false },
group_id: { type: DataTypes.INTEGER, allowNull: false },
smtp_server_id: { type: DataTypes.INTEGER, allowNull: false },
subject_override: { type: DataTypes.STRING },
scheduled_at: { type: DataTypes.DATE },
status: { type: DataTypes.ENUM('draft', 'scheduled', 'sending', 'sent', 'failed'), defaultValue: 'draft' },

View File

@ -30,7 +30,7 @@ const Campaign = CampaignModel(sequelize);
const DeliveryLog = DeliveryLogModel(sequelize);
const SmtpServer = SmtpServerModel(sequelize);
// Промежуточная таблица для связи many-to-many
// Промежуточная таблица для связи many-to-many (оставляем для обратной совместимости)
const CampaignSmtpServer = sequelize.define('CampaignSmtpServer', {}, { tableName: 'campaign_smtp_servers', timestamps: false });
// Связи
@ -48,6 +48,11 @@ Campaign.belongsTo(MailingGroup, { foreignKey: 'group_id' });
DeliveryLog.belongsTo(Campaign, { foreignKey: 'campaign_id' });
DeliveryLog.belongsTo(Subscriber, { foreignKey: 'subscriber_id' });
// Связь one-to-many между Campaign и SmtpServer
Campaign.belongsTo(SmtpServer, { foreignKey: 'smtp_server_id' });
SmtpServer.hasMany(Campaign, { foreignKey: 'smtp_server_id' });
// Оставляем старую связь many-to-many для обратной совместимости
Campaign.belongsToMany(SmtpServer, { through: CampaignSmtpServer, foreignKey: 'campaign_id', otherKey: 'smtp_server_id' });
SmtpServer.belongsToMany(Campaign, { through: CampaignSmtpServer, foreignKey: 'smtp_server_id', otherKey: 'campaign_id' });

View File

@ -15,13 +15,13 @@ async function getMxDomain(email) {
return domain;
}
export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
export async function fillQueueForCampaign(campaign, subscribers, smtpServer) {
console.log(`[queueFiller] Processing campaign ${campaign.id}:`, {
hasEmailTemplateVersion: !!campaign.EmailTemplateVersion,
subjectOverride: campaign.subject_override,
templateVersionId: campaign.template_version_id,
subscribersCount: subscribers.length,
smtpServersCount: smtpServers.length
smtpServerId: smtpServer?.id
});
if (campaign.EmailTemplateVersion) {
@ -41,10 +41,9 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
domainMap[domain].push(sub);
}
// Обрабатываем каждый домен и SMTP сервер
// Обрабатываем каждый домен с одним SMTP сервером
for (const [domain, subs] of Object.entries(domainMap)) {
for (const smtp of smtpServers) {
const topicName = `mail-send-${domain}-${smtp.id}`;
const topicName = `mail-send-${domain}-${smtpServer.id}`;
// Создаем топик если его нет
const topicCreated = await topicManager.createTopic(topicName);
@ -88,11 +87,6 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
console.log(`[queueFiller] Found ${subs.length} subscribers, ${unsentSubs.length} unsent, ${finalSubs.length} active`);
if (finalSubs.length === 0) {
console.log(`[queueFiller] No active subscribers for campaign ${campaign.id}`);
continue;
}
// Отправляем сообщения в топик только для активных неотправленных подписчиков
const messages = finalSubs.map(sub => {
// Проверяем наличие версии шаблона
@ -109,7 +103,7 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
mx: domain, // для обратной совместимости
subscriberId: sub.id,
email: sub.email,
smtpServerId: smtp.id,
smtpServerId: smtpServer.id,
};
console.log(`[queueFiller] Created message for campaign ${campaign.id}, subscriber ${sub.id}:`, {
@ -139,5 +133,4 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
console.log(`[queueFiller] Sent ${messages.length} messages to topic: ${topicName}`);
}
}
}

View File

@ -24,7 +24,8 @@ export async function processScheduledCampaigns() {
{
model: EmailTemplateVersion,
as: 'EmailTemplateVersion'
}
},
SmtpServer
],
});
@ -117,17 +118,17 @@ export async function processScheduledCampaigns() {
subscribers.map(s => ({ id: s.id, email: s.email }))
);
const smtpServers = await campaign.getSmtpServers();
console.log(`[queueFillerJob] Found ${smtpServers.length} SMTP servers for campaign ${campaign.id}:`,
smtpServers.map(s => ({ id: s.id, name: s.name, host: s.host }))
);
if (smtpServers.length === 0) {
console.log(`[queueFillerJob] No SMTP servers found for campaign ${campaign.id}`);
// Проверяем наличие SMTP сервера
if (!campaign.SmtpServer) {
console.log(`[queueFillerJob] No SMTP server found for campaign ${campaign.id} with smtp_server_id: ${campaign.smtp_server_id}`);
continue;
}
await fillQueueForCampaign(campaign, subscribers, smtpServers);
console.log(`[queueFillerJob] Found SMTP server for campaign ${campaign.id}:`,
{ id: campaign.SmtpServer.id, name: campaign.SmtpServer.name, host: campaign.SmtpServer.host }
);
await fillQueueForCampaign(campaign, subscribers, campaign.SmtpServer);
}
// Обновляем статус кампании на 'sending' только если были отправлены сообщения