kafka
This commit is contained in:
parent
63eb19fd02
commit
790411a2d7
9
mail-service/package-lock.json
generated
9
mail-service/package-lock.json
generated
@ -10,6 +10,7 @@
|
||||
"express": "^5.1.0",
|
||||
"kafkajs": "^2.2.4",
|
||||
"mysql2": "^3.14.2",
|
||||
"nodemailer": "^7.0.5",
|
||||
"sequelize": "^6.37.7"
|
||||
}
|
||||
},
|
||||
@ -1258,6 +1259,14 @@
|
||||
"uuid": "bin/uuid"
|
||||
}
|
||||
},
|
||||
"node_modules/nodemailer": {
|
||||
"version": "7.0.5",
|
||||
"resolved": "https://registry.npmjs.org/nodemailer/-/nodemailer-7.0.5.tgz",
|
||||
"integrity": "sha512-nsrh2lO3j4GkLLXoeEksAMgAOqxOv6QumNRVQTJwKH4nuiww6iC2y7GyANs9kRAxCexg3+lTWM3PZ91iLlVjfg==",
|
||||
"engines": {
|
||||
"node": ">=6.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/oauth-sign": {
|
||||
"version": "0.2.0",
|
||||
"resolved": "https://registry.npmjs.org/oauth-sign/-/oauth-sign-0.2.0.tgz",
|
||||
|
||||
@ -9,6 +9,7 @@
|
||||
"express": "^5.1.0",
|
||||
"kafkajs": "^2.2.4",
|
||||
"mysql2": "^3.14.2",
|
||||
"nodemailer": "^7.0.5",
|
||||
"sequelize": "^6.37.7"
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import { DeliveryLog, Campaign, Subscriber } from '../models/index.js';
|
||||
import { Op } from 'sequelize';
|
||||
import { Kafka } from 'kafkajs';
|
||||
|
||||
export default {
|
||||
async create(req, res) {
|
||||
@ -53,4 +55,45 @@ export default {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
},
|
||||
async getPendingCount(req, res) {
|
||||
try {
|
||||
// Kafka config
|
||||
const kafka = new Kafka({
|
||||
clientId: process.env.KAFKA_CLIENT_ID || 'pending-api',
|
||||
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
|
||||
});
|
||||
const admin = kafka.admin();
|
||||
await admin.connect();
|
||||
const topics = await admin.listTopics();
|
||||
const mailTopics = topics.filter(t => t.startsWith('mail-send-'));
|
||||
let totalLag = 0;
|
||||
for (const topic of mailTopics) {
|
||||
const partitions = await admin.fetchTopicOffsets(topic);
|
||||
// Получаем consumer group id (тот же, что у mailSender)
|
||||
const groupId = process.env.KAFKA_GROUP_ID || 'mail-sender-group';
|
||||
const consumerOffsets = await admin.fetchOffsets({ groupId, topic });
|
||||
for (const p of partitions) {
|
||||
const partition = p.partition;
|
||||
const latest = parseInt(p.high);
|
||||
const committed = parseInt(
|
||||
(consumerOffsets.find(c => c.partition === partition) || {}).offset || '0'
|
||||
);
|
||||
// Если consumer ещё не читал этот partition, offset может быть -1
|
||||
const lag = latest - (committed > 0 ? committed : 0);
|
||||
totalLag += lag > 0 ? lag : 0;
|
||||
}
|
||||
}
|
||||
await admin.disconnect();
|
||||
|
||||
const sentCount = await DeliveryLog.count({
|
||||
where: {
|
||||
status: 'sent',
|
||||
},
|
||||
});
|
||||
|
||||
res.json({ pending: totalLag, sent: sentCount });
|
||||
} catch (err) {
|
||||
res.status(500).json({ error: err.message });
|
||||
}
|
||||
},
|
||||
};
|
||||
@ -4,6 +4,7 @@ import express from 'express';
|
||||
import { sequelize } from './models/index.js';
|
||||
import routes from './routes/index.js';
|
||||
import { processScheduledCampaigns } from './service/queueFillerJob.js';
|
||||
import { startMailSender } from './service/mailSender.js';
|
||||
|
||||
const app = express();
|
||||
app.use(express.json());
|
||||
@ -38,7 +39,9 @@ setInterval(async () => {
|
||||
} finally {
|
||||
isQueueFilling = false;
|
||||
}
|
||||
}, 60 * 1000); // раз в минуту
|
||||
}, 1000); // раз в минуту
|
||||
|
||||
startMailSender();
|
||||
|
||||
const PORT = process.env.PORT || 3000;
|
||||
app.listen(PORT, () => {
|
||||
|
||||
@ -5,6 +5,7 @@ const router = Router();
|
||||
|
||||
router.post('/', deliveryLogController.create);
|
||||
router.get('/', deliveryLogController.getAll);
|
||||
router.get('/pending-count', deliveryLogController.getPendingCount);
|
||||
router.get('/:id', deliveryLogController.getById);
|
||||
router.put('/:id', deliveryLogController.update);
|
||||
router.delete('/:id', deliveryLogController.delete);
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Kafka } from 'kafkajs';
|
||||
// import nodemailer from 'nodemailer'; // Для реальной отправки
|
||||
import nodemailer from 'nodemailer';
|
||||
import { SmtpServer } from '../models/index.js';
|
||||
|
||||
const kafka = new Kafka({
|
||||
clientId: process.env.KAFKA_CLIENT_ID || 'mail-sender',
|
||||
@ -7,23 +8,50 @@ const kafka = new Kafka({
|
||||
});
|
||||
const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID || 'mail-sender-group' });
|
||||
|
||||
export async function startMailSender(processTask) {
|
||||
export async function startMailSender() {
|
||||
await consumer.connect();
|
||||
// Подписываемся на все топики mail-send-*
|
||||
console.log('[mailSender] Consumer connected');
|
||||
await consumer.subscribe({ topic: /^mail-send-.+$/, fromBeginning: false });
|
||||
console.log('[mailSender] Subscribed to topics: mail-send-*');
|
||||
await consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
const task = JSON.parse(message.value.toString());
|
||||
// processTask(task) должен реализовывать отправку писем подписчикам через SMTP
|
||||
// task.smtpServerId теперь один, а не массив
|
||||
await processTask(task, topic);
|
||||
console.log(`[mailSender] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}, value=${message.value.toString()}`);
|
||||
// Можно раскомментировать для реальной отправки:
|
||||
// const task = JSON.parse(message.value.toString());
|
||||
// await processTask(task, topic);
|
||||
},
|
||||
});
|
||||
console.log('[mailSender] Consumer is running and waiting for messages...');
|
||||
}
|
||||
|
||||
// Пример processTask:
|
||||
// async function processTask(task, topic) {
|
||||
// // Здесь логика отправки писем через SMTP
|
||||
// // task.campaignId, task.mx, task.subscribers, task.smtpServerId
|
||||
// // topic - имя топика (mail-send-mx-smtpId)
|
||||
// }
|
||||
// Реальная отправка через SMTP
|
||||
async function processTask(task, topic) {
|
||||
try {
|
||||
// Получаем SMTP-сервер из БД
|
||||
const smtp = await SmtpServer.findByPk(task.smtpServerId);
|
||||
if (!smtp) {
|
||||
console.error('SMTP server not found for id', task.smtpServerId);
|
||||
return;
|
||||
}
|
||||
const transporter = nodemailer.createTransport({
|
||||
host: smtp.host,
|
||||
port: smtp.port,
|
||||
secure: smtp.secure,
|
||||
auth: {
|
||||
user: smtp.username,
|
||||
pass: smtp.password,
|
||||
},
|
||||
});
|
||||
const mailOptions = {
|
||||
from: smtp.from_email,
|
||||
to: task.email,
|
||||
subject: 'Test email',
|
||||
text: 'This is a test email from mailSender',
|
||||
html: '<b>This is a test email from mailSender</b>',
|
||||
};
|
||||
const info = await transporter.sendMail(mailOptions);
|
||||
console.log('Email sent:', info.messageId, 'to', task.email);
|
||||
} catch (err) {
|
||||
console.error('Error sending email:', err, 'task:', task);
|
||||
}
|
||||
}
|
||||
@ -21,28 +21,29 @@ async function getMxDomain(email) {
|
||||
|
||||
export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
|
||||
await producer.connect();
|
||||
// Группируем подписчиков по MX-домену
|
||||
const mxMap = {};
|
||||
// Группируем подписчиков по домену (а не по MX)
|
||||
const domainMap = {};
|
||||
for (const sub of subscribers) {
|
||||
const mx = await getMxDomain(sub.email);
|
||||
if (!mxMap[mx]) mxMap[mx] = [];
|
||||
mxMap[mx].push(sub);
|
||||
const domain = sub.email.split('@')[1];
|
||||
if (!domainMap[domain]) domainMap[domain] = [];
|
||||
domainMap[domain].push(sub);
|
||||
}
|
||||
// Для каждого MX и каждого SMTP создаём задачу в Kafka для КАЖДОГО подписчика
|
||||
for (const [mx, subs] of Object.entries(mxMap)) {
|
||||
for (const smtp of smtpServers) {
|
||||
const topic = `mail-send-${mx}-${smtp.id}`;
|
||||
const messages = subs.map(sub => ({
|
||||
value: JSON.stringify({
|
||||
campaignId: campaign.id,
|
||||
mx,
|
||||
subscriberId: sub.id,
|
||||
email: sub.email,
|
||||
smtpServerId: smtp.id,
|
||||
}),
|
||||
}));
|
||||
await producer.send({ topic, messages });
|
||||
}
|
||||
// Берём только первый домен и первый smtp
|
||||
const domainEntry = Object.entries(domainMap)[0];
|
||||
const smtp = smtpServers[0];
|
||||
if (domainEntry && smtp) {
|
||||
const [domain, subs] = domainEntry;
|
||||
const topic = `mail-send-${domain}-${smtp.id}`;
|
||||
const messages = subs.map(sub => ({
|
||||
value: JSON.stringify({
|
||||
campaignId: campaign.id,
|
||||
mx: domain, // для обратной совместимости
|
||||
subscriberId: sub.id,
|
||||
email: sub.email,
|
||||
smtpServerId: smtp.id,
|
||||
}),
|
||||
}));
|
||||
await producer.send({ topic, messages });
|
||||
}
|
||||
await producer.disconnect();
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user