This commit is contained in:
romantarkin 2025-07-23 16:17:25 +05:00
parent 98e5091742
commit 6e89deb3f5
10 changed files with 1428 additions and 8 deletions

View File

@ -4,7 +4,7 @@ import styles from './SideMenu.module.css';
const SideMenu = ({ active, onSelect }) => {
return (
<aside className={styles.menu}>
<div className={styles.project}>CoreSync Marketing</div>
<div className={styles.project}>CoreSync MRM</div>
<nav className={styles.nav}>
<div className={styles.section}>Email-рассылки</div>
<ul>

View File

@ -2,6 +2,7 @@ import React, { useState, useEffect } from 'react';
import { useUser } from '../context/UserContext';
import EditTemplateModal from '../modals/EditTemplateModal';
import CreateTemplateModal from '../modals/CreateTemplateModal';
import TemplateVersionsPage from './TemplateVersionsPage';
import Paginator from '../components/Paginator';
const PAGE_SIZE = 10;
@ -18,11 +19,12 @@ function EmailTemplatesPage() {
const [deleteLoading, setDeleteLoading] = useState(null);
const [createTemplate, setCreateTemplate] = useState(null);
const [createLoading, setCreateLoading] = useState(false);
const [selectedTemplate, setSelectedTemplate] = useState(null);
useEffect(() => {
fetchTemplates(page);
if (!selectedTemplate) fetchTemplates(page);
// eslint-disable-next-line
}, [page]);
}, [page, selectedTemplate]);
const fetchTemplates = async (page) => {
setLoading(true);
@ -131,6 +133,10 @@ function EmailTemplatesPage() {
}
};
if (selectedTemplate) {
return <TemplateVersionsPage template={selectedTemplate} onBack={() => setSelectedTemplate(null)} token={token} />;
}
return (
<div>
<div style={{ display: 'flex', justifyContent: 'flex-end', alignItems: 'center', marginBottom: 12 }}>
@ -151,12 +157,12 @@ function EmailTemplatesPage() {
</thead>
<tbody>
{templates.map(t => (
<tr key={t.id} style={{ borderBottom: '1px solid #e5e7eb' }}>
<tr key={t.id} style={{ borderBottom: '1px solid #e5e7eb', cursor: 'pointer' }} onClick={() => setSelectedTemplate(t)}>
<td style={tdStyle}>{t.id}</td>
<td style={tdStyle}>{t.name}</td>
<td style={tdStyle}>
<button onClick={() => handleEdit(t)} style={btnStyle}>Редактировать</button>
<button onClick={() => handleDelete(t.id)} style={btnStyle} disabled={deleteLoading === t.id}>
<button onClick={e => { e.stopPropagation(); handleEdit(t); }} style={btnStyle}>Редактировать</button>
<button onClick={e => { e.stopPropagation(); handleDelete(t.id); }} style={btnStyle} disabled={deleteLoading === t.id}>
{deleteLoading === t.id ? 'Удаление...' : 'Удалить'}
</button>
</td>

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,10 @@
},
"type": "module",
"dependencies": {
"dns": "^0.2.2",
"dotenv": "^17.2.0",
"express": "^5.1.0",
"kafkajs": "^2.2.4",
"mysql2": "^3.14.2",
"sequelize": "^6.37.7"
}

View File

@ -3,7 +3,20 @@ import { EmailTemplateVersion, EmailTemplate } from '../models/index.js';
export default {
async create(req, res) {
try {
const version = await EmailTemplateVersion.create(req.body);
const { template_id, subject, body_html, body_text, is_active } = req.body;
const latest = await EmailTemplateVersion.findOne({
where: { template_id },
order: [['version_number', 'DESC']],
});
const version_number = latest ? latest.version_number + 1 : 1;
const version = await EmailTemplateVersion.create({
template_id,
version_number,
subject,
body_html,
body_text,
is_active: is_active !== undefined ? is_active : true,
});
res.status(201).json(version);
} catch (err) {
res.status(400).json({ error: err.message });

View File

@ -3,6 +3,7 @@ dotenv.config();
import express from 'express';
import { sequelize } from './models/index.js';
import routes from './routes/index.js';
import { processScheduledCampaigns } from './service/queueFillerJob.js';
const app = express();
app.use(express.json());
@ -24,6 +25,11 @@ app.use('/api/mail', routes);
}
})();
// Периодически заполняем очередь для scheduled кампаний
setInterval(() => {
processScheduledCampaigns().catch(err => console.error('Queue fill error:', err));
}, 60 * 1000); // раз в минуту
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Mail Service listening on port ${PORT}`);

View File

@ -8,7 +8,7 @@ export default (sequelize) => {
group_id: { type: DataTypes.INTEGER, allowNull: false },
subject_override: { type: DataTypes.STRING },
scheduled_at: { type: DataTypes.DATE },
status: { type: DataTypes.ENUM('draft', 'scheduled', 'sent', 'failed'), defaultValue: 'draft' },
status: { type: DataTypes.ENUM('draft', 'scheduled', 'sending', 'sent', 'failed'), defaultValue: 'draft' },
created_at: { type: DataTypes.DATE, defaultValue: Sequelize.NOW },
}, { tableName: 'campaigns', timestamps: false });
return Campaign;

View File

@ -0,0 +1,29 @@
import { Kafka } from 'kafkajs';
// import nodemailer from 'nodemailer'; // Для реальной отправки
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'mail-sender',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID || 'mail-sender-group' });
export async function startMailSender(processTask) {
await consumer.connect();
// Подписываемся на все топики mail-send-*
await consumer.subscribe({ topic: /^mail-send-.+$/, fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const task = JSON.parse(message.value.toString());
// processTask(task) должен реализовывать отправку писем подписчикам через SMTP
// task.smtpServerId теперь один, а не массив
await processTask(task, topic);
},
});
}
// Пример processTask:
// async function processTask(task, topic) {
// // Здесь логика отправки писем через SMTP
// // task.campaignId, task.mx, task.subscribers, task.smtpServerId
// // topic - имя топика (mail-send-mx-smtpId)
// }

View File

@ -0,0 +1,48 @@
import dns from 'dns/promises';
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const producer = kafka.producer();
async function getMxDomain(email) {
const domain = email.split('@')[1];
try {
const mxRecords = await dns.resolveMx(domain);
if (mxRecords && mxRecords.length > 0) {
// Берём самый приоритетный MX
return mxRecords.sort((a, b) => a.priority - b.priority)[0].exchange;
}
} catch (e) {}
return domain;
}
export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
await producer.connect();
// Группируем подписчиков по MX-домену
const mxMap = {};
for (const sub of subscribers) {
const mx = await getMxDomain(sub.email);
if (!mxMap[mx]) mxMap[mx] = [];
mxMap[mx].push(sub);
}
// Для каждого MX и каждого SMTP создаём задачу в Kafka для КАЖДОГО подписчика
for (const [mx, subs] of Object.entries(mxMap)) {
for (const smtp of smtpServers) {
const topic = `mail-send-${mx}-${smtp.id}`;
const messages = subs.map(sub => ({
value: JSON.stringify({
campaignId: campaign.id,
mx,
subscriberId: sub.id,
email: sub.email,
smtpServerId: smtp.id,
}),
}));
await producer.send({ topic, messages });
}
}
await producer.disconnect();
}

View File

@ -0,0 +1,63 @@
import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js';
import { fillQueueForCampaign } from './queueFiller.js';
import { Op } from 'sequelize';
import { Kafka } from 'kafkajs';
const BATCH_SIZE = 10000;
async function clearKafkaTopics(prefix = 'mail-send-') {
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
const toDelete = topics.filter(t => t.startsWith(prefix));
if (toDelete.length > 0) {
await admin.deleteTopics({ topics: toDelete });
// Пересоздавать топики не нужно — Kafka создаст их автоматически при отправке сообщений
}
await admin.disconnect();
}
export async function processScheduledCampaigns() {
// Очищаем все mail-send-* топики перед построением
await clearKafkaTopics('mail-send-');
// 1. Найти все кампании в статусе scheduled
const campaigns = await Campaign.findAll({
where: { status: 'scheduled' },
include: [MailingGroup, SmtpServer],
});
for (const campaign of campaigns) {
// 2. Получить id всех подписчиков группы батчами
let offset = 0;
let allSubscriberIds = [];
while (true) {
const groupSubs = await GroupSubscriber.findAll({
where: { group_id: campaign.group_id },
attributes: ['subscriber_id'],
offset,
limit: BATCH_SIZE,
raw: true,
});
if (groupSubs.length === 0) break;
allSubscriberIds.push(...groupSubs.map(gs => gs.subscriber_id));
if (groupSubs.length < BATCH_SIZE) break;
offset += BATCH_SIZE;
}
// 3. Получить подписчиков батчами и сразу отправлять в очередь (не держим всех в памяти)
for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) {
const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE);
const subscribers = await Subscriber.findAll({
where: { id: { [Op.in]: batchIds } },
attributes: ['id', 'email'],
raw: true,
});
const smtpServers = await campaign.getSmtpServers();
await fillQueueForCampaign(campaign, subscribers, smtpServers);
}
// 4. Обновить статус кампании на sending
await campaign.update({ status: 'sending' });
}
}