diff --git a/mail-service/package-lock.json b/mail-service/package-lock.json index 46a8f1c..adae22e 100644 --- a/mail-service/package-lock.json +++ b/mail-service/package-lock.json @@ -10,6 +10,7 @@ "express": "^5.1.0", "kafkajs": "^2.2.4", "mysql2": "^3.14.2", + "nodemailer": "^7.0.5", "sequelize": "^6.37.7" } }, @@ -1258,6 +1259,14 @@ "uuid": "bin/uuid" } }, + "node_modules/nodemailer": { + "version": "7.0.5", + "resolved": "https://registry.npmjs.org/nodemailer/-/nodemailer-7.0.5.tgz", + "integrity": "sha512-nsrh2lO3j4GkLLXoeEksAMgAOqxOv6QumNRVQTJwKH4nuiww6iC2y7GyANs9kRAxCexg3+lTWM3PZ91iLlVjfg==", + "engines": { + "node": ">=6.0.0" + } + }, "node_modules/oauth-sign": { "version": "0.2.0", "resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.2.0.tgz", diff --git a/mail-service/package.json b/mail-service/package.json index f49b78a..bb41ef6 100644 --- a/mail-service/package.json +++ b/mail-service/package.json @@ -9,6 +9,7 @@ "express": "^5.1.0", "kafkajs": "^2.2.4", "mysql2": "^3.14.2", + "nodemailer": "^7.0.5", "sequelize": "^6.37.7" } } diff --git a/mail-service/src/controllers/deliveryLogController.js b/mail-service/src/controllers/deliveryLogController.js index 36a8119..efd19ba 100644 --- a/mail-service/src/controllers/deliveryLogController.js +++ b/mail-service/src/controllers/deliveryLogController.js @@ -1,4 +1,6 @@ import { DeliveryLog, Campaign, Subscriber } from '../models/index.js'; +import { Op } from 'sequelize'; +import { Kafka } from 'kafkajs'; export default { async create(req, res) { @@ -53,4 +55,45 @@ export default { res.status(500).json({ error: err.message }); } }, + async getPendingCount(req, res) { + try { + // Kafka config + const kafka = new Kafka({ + clientId: process.env.KAFKA_CLIENT_ID || 'pending-api', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], + }); + const admin = kafka.admin(); + await admin.connect(); + const topics = await admin.listTopics(); + const mailTopics = topics.filter(t => t.startsWith('mail-send-')); + let totalLag = 0; + for (const topic of mailTopics) { + const partitions = await admin.fetchTopicOffsets(topic); + // Получаем consumer group id (тот же, что у mailSender) + const groupId = process.env.KAFKA_GROUP_ID || 'mail-sender-group'; + const consumerOffsets = await admin.fetchOffsets({ groupId, topic }); + for (const p of partitions) { + const partition = p.partition; + const latest = parseInt(p.high); + const committed = parseInt( + (consumerOffsets.find(c => c.partition === partition) || {}).offset || '0' + ); + // Если consumer ещё не читал этот partition, offset может быть -1 + const lag = latest - (committed > 0 ? committed : 0); + totalLag += lag > 0 ? lag : 0; + } + } + await admin.disconnect(); + + const sentCount = await DeliveryLog.count({ + where: { + status: 'sent', + }, + }); + + res.json({ pending: totalLag, sent: sentCount }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, }; \ No newline at end of file diff --git a/mail-service/src/index.js b/mail-service/src/index.js index 26f3185..54a6ac3 100644 --- a/mail-service/src/index.js +++ b/mail-service/src/index.js @@ -4,6 +4,7 @@ import express from 'express'; import { sequelize } from './models/index.js'; import routes from './routes/index.js'; import { processScheduledCampaigns } from './service/queueFillerJob.js'; +import { startMailSender } from './service/mailSender.js'; const app = express(); app.use(express.json()); @@ -38,7 +39,9 @@ setInterval(async () => { } finally { isQueueFilling = false; } -}, 60 * 1000); // раз в минуту +}, 1000); // раз в минуту + +startMailSender(); const PORT = process.env.PORT || 3000; app.listen(PORT, () => { diff --git a/mail-service/src/routes/deliveryLog.js b/mail-service/src/routes/deliveryLog.js index 66f99e9..fa59e3b 100644 --- a/mail-service/src/routes/deliveryLog.js +++ b/mail-service/src/routes/deliveryLog.js @@ -5,6 +5,7 @@ const router = Router(); router.post('/', deliveryLogController.create); router.get('/', deliveryLogController.getAll); +router.get('/pending-count', deliveryLogController.getPendingCount); router.get('/:id', deliveryLogController.getById); router.put('/:id', deliveryLogController.update); router.delete('/:id', deliveryLogController.delete); diff --git a/mail-service/src/service/mailSender.js b/mail-service/src/service/mailSender.js index 62579c7..dd7c1b0 100644 --- a/mail-service/src/service/mailSender.js +++ b/mail-service/src/service/mailSender.js @@ -1,5 +1,6 @@ import { Kafka } from 'kafkajs'; -// import nodemailer from 'nodemailer'; // Для реальной отправки +import nodemailer from 'nodemailer'; +import { SmtpServer } from '../models/index.js'; const kafka = new Kafka({ clientId: process.env.KAFKA_CLIENT_ID || 'mail-sender', @@ -7,23 +8,50 @@ const kafka = new Kafka({ }); const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID || 'mail-sender-group' }); -export async function startMailSender(processTask) { +export async function startMailSender() { await consumer.connect(); - // Подписываемся на все топики mail-send-* + console.log('[mailSender] Consumer connected'); await consumer.subscribe({ topic: /^mail-send-.+$/, fromBeginning: false }); + console.log('[mailSender] Subscribed to topics: mail-send-*'); await consumer.run({ eachMessage: async ({ topic, partition, message }) => { - const task = JSON.parse(message.value.toString()); - // processTask(task) должен реализовывать отправку писем подписчикам через SMTP - // task.smtpServerId теперь один, а не массив - await processTask(task, topic); + console.log(`[mailSender] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}, value=${message.value.toString()}`); + // Можно раскомментировать для реальной отправки: + // const task = JSON.parse(message.value.toString()); + // await processTask(task, topic); }, }); + console.log('[mailSender] Consumer is running and waiting for messages...'); } -// Пример processTask: -// async function processTask(task, topic) { -// // Здесь логика отправки писем через SMTP -// // task.campaignId, task.mx, task.subscribers, task.smtpServerId -// // topic - имя топика (mail-send-mx-smtpId) -// } \ No newline at end of file +// Реальная отправка через SMTP +async function processTask(task, topic) { + try { + // Получаем SMTP-сервер из БД + const smtp = await SmtpServer.findByPk(task.smtpServerId); + if (!smtp) { + console.error('SMTP server not found for id', task.smtpServerId); + return; + } + const transporter = nodemailer.createTransport({ + host: smtp.host, + port: smtp.port, + secure: smtp.secure, + auth: { + user: smtp.username, + pass: smtp.password, + }, + }); + const mailOptions = { + from: smtp.from_email, + to: task.email, + subject: 'Test email', + text: 'This is a test email from mailSender', + html: 'This is a test email from mailSender', + }; + const info = await transporter.sendMail(mailOptions); + console.log('Email sent:', info.messageId, 'to', task.email); + } catch (err) { + console.error('Error sending email:', err, 'task:', task); + } +} \ No newline at end of file diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index 5aec74b..d66f04e 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -21,28 +21,29 @@ async function getMxDomain(email) { export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { await producer.connect(); - // Группируем подписчиков по MX-домену - const mxMap = {}; + // Группируем подписчиков по домену (а не по MX) + const domainMap = {}; for (const sub of subscribers) { - const mx = await getMxDomain(sub.email); - if (!mxMap[mx]) mxMap[mx] = []; - mxMap[mx].push(sub); + const domain = sub.email.split('@')[1]; + if (!domainMap[domain]) domainMap[domain] = []; + domainMap[domain].push(sub); } - // Для каждого MX и каждого SMTP создаём задачу в Kafka для КАЖДОГО подписчика - for (const [mx, subs] of Object.entries(mxMap)) { - for (const smtp of smtpServers) { - const topic = `mail-send-${mx}-${smtp.id}`; - const messages = subs.map(sub => ({ - value: JSON.stringify({ - campaignId: campaign.id, - mx, - subscriberId: sub.id, - email: sub.email, - smtpServerId: smtp.id, - }), - })); - await producer.send({ topic, messages }); - } + // Берём только первый домен и первый smtp + const domainEntry = Object.entries(domainMap)[0]; + const smtp = smtpServers[0]; + if (domainEntry && smtp) { + const [domain, subs] = domainEntry; + const topic = `mail-send-${domain}-${smtp.id}`; + const messages = subs.map(sub => ({ + value: JSON.stringify({ + campaignId: campaign.id, + mx: domain, // для обратной совместимости + subscriberId: sub.id, + email: sub.email, + smtpServerId: smtp.id, + }), + })); + await producer.send({ topic, messages }); } await producer.disconnect(); } \ No newline at end of file