This commit is contained in:
romantarkin 2025-07-29 09:49:33 +05:00
parent 26973a5126
commit 6e80508a29
12 changed files with 1015 additions and 77 deletions

View File

@ -99,8 +99,6 @@ services:
container_name: kafka container_name: kafka
depends_on: depends_on:
- zookeeper - zookeeper
ports:
- '9092:9092'
environment: environment:
KAFKA_BROKER_ID: 1 KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
@ -113,6 +111,18 @@ services:
volumes: volumes:
- kafka_data:/var/lib/kafka/data - kafka_data:/var/lib/kafka/data
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
DYNAMIC_CONFIG_ENABLED: 'true'
volumes:
- ./kafka.yaml:/etc/kafkaui/dynamic_config.yaml
depends_on:
- kafka
volumes: volumes:
mysql_data: mysql_data:
kafka_data: kafka_data:

5
kafka.yaml Normal file
View File

@ -0,0 +1,5 @@
kafka:
clusters:
- name: local-kafka
bootstrapServers: kafka:9092
zookeeper: zookeeper:2181

View File

@ -1,6 +1,6 @@
import { DeliveryLog, Campaign, Subscriber } from '../models/index.js'; import { DeliveryLog, Campaign, Subscriber } from '../models/index.js';
import { Op } from 'sequelize'; import { Op } from 'sequelize';
import { Kafka } from 'kafkajs'; import { topicManager } from '../service/topicManager.js';
export default { export default {
async create(req, res) { async create(req, res) {
@ -11,6 +11,7 @@ export default {
res.status(400).json({ error: err.message }); res.status(400).json({ error: err.message });
} }
}, },
async getAll(req, res) { async getAll(req, res) {
try { try {
const limit = parseInt(req.query.limit) || 20; const limit = parseInt(req.query.limit) || 20;
@ -26,6 +27,7 @@ export default {
res.status(500).json({ error: err.message }); res.status(500).json({ error: err.message });
} }
}, },
async getById(req, res) { async getById(req, res) {
try { try {
const log = await DeliveryLog.findByPk(req.params.id, { include: [Campaign, Subscriber] }); const log = await DeliveryLog.findByPk(req.params.id, { include: [Campaign, Subscriber] });
@ -35,6 +37,7 @@ export default {
res.status(500).json({ error: err.message }); res.status(500).json({ error: err.message });
} }
}, },
async update(req, res) { async update(req, res) {
try { try {
const log = await DeliveryLog.findByPk(req.params.id); const log = await DeliveryLog.findByPk(req.params.id);
@ -45,6 +48,7 @@ export default {
res.status(400).json({ error: err.message }); res.status(400).json({ error: err.message });
} }
}, },
async delete(req, res) { async delete(req, res) {
try { try {
const log = await DeliveryLog.findByPk(req.params.id); const log = await DeliveryLog.findByPk(req.params.id);
@ -55,35 +59,42 @@ export default {
res.status(500).json({ error: err.message }); res.status(500).json({ error: err.message });
} }
}, },
async getPendingCount(req, res) { async getPendingCount(req, res) {
try { try {
// Kafka config // Получаем все топики через TopicManager
const kafka = new Kafka({ const mailTopics = await topicManager.getAllTopics();
clientId: process.env.KAFKA_CLIENT_ID || 'pending-api',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
const mailTopics = topics.filter(t => t.startsWith('mail-send-'));
let totalLag = 0; let totalLag = 0;
// Для каждого топика получаем информацию о lag
for (const topic of mailTopics) { for (const topic of mailTopics) {
const partitions = await admin.fetchTopicOffsets(topic); try {
// Получаем consumer group id (тот же, что у mailSender) const kafka = new (await import('kafkajs')).Kafka({
const groupId = process.env.KAFKA_GROUP_ID || 'mail-sender-group'; clientId: process.env.KAFKA_CLIENT_ID || 'pending-api',
const consumerOffsets = await admin.fetchOffsets({ groupId, topic }); brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
for (const p of partitions) { });
const partition = p.partition; const admin = kafka.admin();
const latest = parseInt(p.high); await admin.connect();
const committed = parseInt(
(consumerOffsets.find(c => c.partition === partition) || {}).offset || '0' const partitions = await admin.fetchTopicOffsets(topic);
); const groupId = process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group';
// Если consumer ещё не читал этот partition, offset может быть -1 const consumerOffsets = await admin.fetchOffsets({ groupId, topic });
const lag = latest - (committed > 0 ? committed : 0);
totalLag += lag > 0 ? lag : 0; for (const p of partitions) {
const partition = p.partition;
const latest = parseInt(p.high);
const committed = parseInt(
(consumerOffsets.find(c => c.partition === partition) || {}).offset || '0'
);
const lag = latest - (committed > 0 ? committed : 0);
totalLag += lag > 0 ? lag : 0;
}
await admin.disconnect();
} catch (error) {
console.error(`Error getting lag for topic ${topic}:`, error);
} }
} }
await admin.disconnect();
const sentCount = await DeliveryLog.count({ const sentCount = await DeliveryLog.count({
where: { where: {
@ -95,5 +106,5 @@ export default {
} catch (err) { } catch (err) {
res.status(500).json({ error: err.message }); res.status(500).json({ error: err.message });
} }
}, }
}; };

View File

@ -0,0 +1,275 @@
import { topicManager } from '../service/topicManager.js';
import { dynamicConsumer } from '../service/dynamicConsumer.js';
import { topicRegistry } from '../service/topicRegistry.js';
export default {
// Получить список всех топиков
async getAllTopics(req, res) {
try {
const topics = await topicManager.getAllTopics();
const subscribedTopics = dynamicConsumer.getSubscribedTopics();
const topicsWithStatus = topics.map(topic => ({
name: topic,
subscribed: subscribedTopics.includes(topic),
active: topicRegistry.hasTopic(topic),
metadata: topicRegistry.getTopicMetadata(topic)
}));
res.json({
topics: topicsWithStatus,
total: topics.length,
subscribed: subscribedTopics.length,
active: topicRegistry.getSize(),
registry: topicRegistry.getStatistics()
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Создать новый топик
async createTopic(req, res) {
try {
const { topicName, partitions = 1, replicationFactor = 1 } = req.body;
if (!topicName) {
return res.status(400).json({ error: 'Topic name is required' });
}
const success = await topicManager.createTopic(topicName, partitions, replicationFactor);
if (success) {
res.status(201).json({
message: `Topic ${topicName} created successfully`,
topic: topicName,
metadata: topicRegistry.getTopicMetadata(topicName)
});
} else {
res.status(500).json({ error: 'Failed to create topic' });
}
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Удалить топик
async deleteTopic(req, res) {
try {
const { topicName } = req.params;
if (!topicName) {
return res.status(400).json({ error: 'Topic name is required' });
}
const success = await topicManager.deleteTopic(topicName);
if (success) {
res.json({
message: `Topic ${topicName} deleted successfully`,
topic: topicName
});
} else {
res.status(500).json({ error: 'Failed to delete topic' });
}
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Подписаться на топик
async subscribeToTopic(req, res) {
try {
const { topicName } = req.body;
if (!topicName) {
return res.status(400).json({ error: 'Topic name is required' });
}
const success = await dynamicConsumer.subscribeToTopic(topicName);
if (success) {
res.json({
message: `Subscribed to topic ${topicName}`,
topic: topicName
});
} else {
res.status(500).json({ error: 'Failed to subscribe to topic' });
}
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Отписаться от топика
async unsubscribeFromTopic(req, res) {
try {
const { topicName } = req.params;
if (!topicName) {
return res.status(400).json({ error: 'Topic name is required' });
}
const success = await dynamicConsumer.unsubscribeFromTopic(topicName);
if (success) {
res.json({
message: `Unsubscribed from topic ${topicName}`,
topic: topicName
});
} else {
res.status(500).json({ error: 'Failed to unsubscribe from topic' });
}
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Получить статус consumer'а
async getConsumerStatus(req, res) {
try {
const subscribedTopics = dynamicConsumer.getSubscribedTopics();
const activeTopics = topicRegistry.getAllTopics();
res.json({
isRunning: dynamicConsumer.isRunning,
subscribedTopics,
activeTopics,
subscribedCount: subscribedTopics.length,
activeCount: activeTopics.length,
registry: topicRegistry.getStatistics()
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Очистить все топики
async clearAllTopics(req, res) {
try {
const deletedCount = await topicManager.clearTopics('mail-send-');
res.json({
message: `Cleared ${deletedCount} topics`,
deletedCount
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Отправить тестовое сообщение в топик
async sendTestMessage(req, res) {
try {
const { topicName, message } = req.body;
if (!topicName || !message) {
return res.status(400).json({ error: 'Topic name and message are required' });
}
const success = await topicManager.sendMessage(topicName, message);
if (success) {
res.json({
message: `Test message sent to topic ${topicName}`,
topic: topicName,
sentMessage: message,
metadata: topicRegistry.getTopicMetadata(topicName)
});
} else {
res.status(500).json({ error: 'Failed to send test message' });
}
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Получить статистику реестра
async getRegistryStatistics(req, res) {
try {
const statistics = topicRegistry.getStatistics();
res.json(statistics);
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Синхронизировать реестр с реальными топиками
async syncRegistry(req, res) {
try {
await topicManager.syncRegistry();
const statistics = topicRegistry.getStatistics();
res.json({
message: 'Registry synchronized successfully',
statistics
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Получить топики по префиксу
async getTopicsByPrefix(req, res) {
try {
const { prefix } = req.params;
const topics = topicRegistry.getTopicsByPrefix(prefix);
const topicsWithMetadata = topics.map(topic => ({
name: topic,
metadata: topicRegistry.getTopicMetadata(topic)
}));
res.json({
prefix,
topics: topicsWithMetadata,
count: topics.length
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Очистить реестр
async clearRegistry(req, res) {
try {
topicRegistry.clearAll();
res.json({
message: 'Registry cleared successfully',
statistics: topicRegistry.getStatistics()
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Экспорт реестра
async exportRegistry(req, res) {
try {
const exportData = topicRegistry.toJSON();
res.json(exportData);
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Импорт реестра
async importRegistry(req, res) {
try {
const { data } = req.body;
if (!data) {
return res.status(400).json({ error: 'Import data is required' });
}
topicRegistry.fromJSON(data);
res.json({
message: 'Registry imported successfully',
statistics: topicRegistry.getStatistics()
});
} catch (err) {
res.status(500).json({ error: err.message });
}
}
};

View File

@ -4,17 +4,21 @@ import express from 'express';
import { sequelize } from './models/index.js'; import { sequelize } from './models/index.js';
import routes from './routes/index.js'; import routes from './routes/index.js';
import { processScheduledCampaigns } from './service/queueFillerJob.js'; import { processScheduledCampaigns } from './service/queueFillerJob.js';
import { startMailSender } from './service/mailSender.js'; import { dynamicConsumer } from './service/dynamicConsumer.js';
import { topicManager } from './service/topicManager.js';
import { topicRegistry } from './service/topicRegistry.js';
import authMiddleware from './middleware/auth.js'; import authMiddleware from './middleware/auth.js';
const app = express(); const app = express();
app.use(express.json()); app.use(express.json());
app.get('/', (req, res) => { // Middleware
res.send('Mail Service is running'); app.use('/api/mail', authMiddleware);
});
app.use('/api/mail', authMiddleware, routes); // Routes
app.use('/api/mail', routes);
const PORT = process.env.PORT || 3000;
(async () => { (async () => {
try { try {
@ -40,11 +44,35 @@ setInterval(async () => {
} finally { } finally {
isQueueFilling = false; isQueueFilling = false;
} }
}, 1000); // раз в минуту }, 1000);
startMailSender(); // Запускаем динамический consumer и инициализируем реестр
(async () => {
try {
await dynamicConsumer.connect();
// Синхронизируем реестр с существующими топиками
await topicManager.syncRegistry();
// Подписываемся на существующие топики
const existingTopics = await topicManager.getAllTopics();
if (existingTopics.length > 0) {
console.log(`[index] Found ${existingTopics.length} existing topics:`, existingTopics);
await dynamicConsumer.subscribeToTopics(existingTopics);
}
// Периодически проверяем новые топики и синхронизируем реестр
setInterval(async () => {
await dynamicConsumer.autoSubscribeToNewTopics();
await topicManager.syncRegistry();
}, 5000); // Проверяем каждые 5 секунд
console.log('[index] Dynamic consumer started with registry synchronization');
} catch (err) {
console.error('Dynamic consumer error:', err);
}
})();
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => { app.listen(PORT, () => {
console.log(`Mail Service listening on port ${PORT}`); console.log(`Mail service running on port ${PORT}`);
}); });

View File

@ -7,6 +7,7 @@ import emailTemplateVersionRoutes from './emailTemplateVersion.js';
import campaignRoutes from './campaign.js'; import campaignRoutes from './campaign.js';
import deliveryLogRoutes from './deliveryLog.js'; import deliveryLogRoutes from './deliveryLog.js';
import smtpServerRoutes from './smtpServer.js'; import smtpServerRoutes from './smtpServer.js';
import topicRoutes from './topic.js';
const router = Router(); const router = Router();
@ -18,5 +19,6 @@ router.use('/email-template-versions', emailTemplateVersionRoutes);
router.use('/campaigns', campaignRoutes); router.use('/campaigns', campaignRoutes);
router.use('/delivery-logs', deliveryLogRoutes); router.use('/delivery-logs', deliveryLogRoutes);
router.use('/smtp-servers', smtpServerRoutes); router.use('/smtp-servers', smtpServerRoutes);
router.use('/topics', topicRoutes);
export default router; export default router;

View File

@ -0,0 +1,48 @@
import express from 'express';
import topicController from '../controllers/topicController.js';
const router = express.Router();
// GET /api/mail/topics - Получить список всех топиков
router.get('/', topicController.getAllTopics);
// POST /api/mail/topics - Создать новый топик
router.post('/', topicController.createTopic);
// DELETE /api/mail/topics/:topicName - Удалить топик
router.delete('/:topicName', topicController.deleteTopic);
// POST /api/mail/topics/subscribe - Подписаться на топик
router.post('/subscribe', topicController.subscribeToTopic);
// DELETE /api/mail/topics/unsubscribe/:topicName - Отписаться от топика
router.delete('/unsubscribe/:topicName', topicController.unsubscribeFromTopic);
// GET /api/mail/topics/status - Получить статус consumer'а
router.get('/status', topicController.getConsumerStatus);
// DELETE /api/mail/topics/clear - Очистить все топики
router.delete('/clear', topicController.clearAllTopics);
// POST /api/mail/topics/test-message - Отправить тестовое сообщение
router.post('/test-message', topicController.sendTestMessage);
// GET /api/mail/topics/statistics - Получить статистику реестра
router.get('/statistics', topicController.getRegistryStatistics);
// POST /api/mail/topics/sync - Синхронизировать реестр
router.post('/sync', topicController.syncRegistry);
// GET /api/mail/topics/prefix/:prefix - Получить топики по префиксу
router.get('/prefix/:prefix', topicController.getTopicsByPrefix);
// DELETE /api/mail/topics/registry/clear - Очистить реестр
router.delete('/registry/clear', topicController.clearRegistry);
// GET /api/mail/topics/registry/export - Экспорт реестра
router.get('/registry/export', topicController.exportRegistry);
// POST /api/mail/topics/registry/import - Импорт реестра
router.post('/registry/import', topicController.importRegistry);
export default router;

View File

@ -0,0 +1,214 @@
import { Kafka } from 'kafkajs';
import nodemailer from 'nodemailer';
import { SmtpServer } from '../models/index.js';
import { topicManager } from './topicManager.js';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'dynamic-consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
export class DynamicConsumer {
constructor() {
this.consumer = kafka.consumer({
groupId: process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group'
});
this.subscribedTopics = new Set();
this.isRunning = false;
this.messageHandler = null;
}
async connect() {
try {
await this.consumer.connect();
console.log('[DynamicConsumer] Connected to Kafka');
return true;
} catch (error) {
console.error('[DynamicConsumer] Connection error:', error);
return false;
}
}
async disconnect() {
try {
await this.consumer.disconnect();
this.subscribedTopics.clear();
this.isRunning = false;
console.log('[DynamicConsumer] Disconnected from Kafka');
} catch (error) {
console.error('[DynamicConsumer] Disconnect error:', error);
}
}
// Установка обработчика сообщений
setMessageHandler(handler) {
this.messageHandler = handler;
}
// Подписка на конкретный топик
async subscribeToTopic(topicName) {
if (this.subscribedTopics.has(topicName)) {
console.log(`[DynamicConsumer] Already subscribed to topic: ${topicName}`);
return true;
}
try {
await this.consumer.subscribe({
topic: topicName,
fromBeginning: false
});
this.subscribedTopics.add(topicName);
console.log(`[DynamicConsumer] Subscribed to topic: ${topicName}`);
// Если consumer еще не запущен, запускаем его
if (!this.isRunning) {
await this.startConsuming();
}
return true;
} catch (error) {
console.error(`[DynamicConsumer] Error subscribing to ${topicName}:`, error);
return false;
}
}
// Отписка от топика
async unsubscribeFromTopic(topicName) {
if (!this.subscribedTopics.has(topicName)) {
console.log(`[DynamicConsumer] Not subscribed to topic: ${topicName}`);
return true;
}
try {
await this.consumer.stop();
this.subscribedTopics.delete(topicName);
// Переподписываемся на оставшиеся топики
if (this.subscribedTopics.size > 0) {
await this.consumer.subscribe(
Array.from(this.subscribedTopics).map(topic => ({
topic,
fromBeginning: false
}))
);
await this.startConsuming();
}
console.log(`[DynamicConsumer] Unsubscribed from topic: ${topicName}`);
return true;
} catch (error) {
console.error(`[DynamicConsumer] Error unsubscribing from ${topicName}:`, error);
return false;
}
}
// Подписка на несколько топиков
async subscribeToTopics(topicNames) {
const results = [];
for (const topicName of topicNames) {
const result = await this.subscribeToTopic(topicName);
results.push({ topic: topicName, success: result });
}
return results;
}
// Получение списка подписанных топиков
getSubscribedTopics() {
return Array.from(this.subscribedTopics);
}
// Запуск обработки сообщений
async startConsuming() {
if (this.isRunning) {
console.log('[DynamicConsumer] Already consuming messages');
return;
}
try {
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`[DynamicConsumer] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}`);
if (this.messageHandler) {
try {
const task = JSON.parse(message.value.toString());
await this.messageHandler(task, topic);
} catch (error) {
console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error);
}
}
},
});
this.isRunning = true;
console.log('[DynamicConsumer] Started consuming messages');
} catch (error) {
console.error('[DynamicConsumer] Error starting consumer:', error);
}
}
// Остановка обработки сообщений
async stopConsuming() {
try {
await this.consumer.stop();
this.isRunning = false;
console.log('[DynamicConsumer] Stopped consuming messages');
} catch (error) {
console.error('[DynamicConsumer] Error stopping consumer:', error);
}
}
// Автоматическое обнаружение и подписка на новые топики
async autoSubscribeToNewTopics() {
try {
const allTopics = await topicManager.getAllTopics();
const newTopics = allTopics.filter(topic => !this.subscribedTopics.has(topic));
if (newTopics.length > 0) {
console.log(`[DynamicConsumer] Found ${newTopics.length} new topics:`, newTopics);
await this.subscribeToTopics(newTopics);
}
} catch (error) {
console.error('[DynamicConsumer] Error in auto-subscribe:', error);
}
}
}
// Обработчик сообщений для отправки email
async function processEmailTask(task, topic) {
try {
// Получаем SMTP-сервер из БД
const smtp = await SmtpServer.findByPk(task.smtpServerId);
if (!smtp) {
console.error('SMTP server not found for id', task.smtpServerId);
return;
}
const transporter = nodemailer.createTransport({
host: smtp.host,
port: smtp.port,
secure: smtp.secure,
auth: {
user: smtp.username,
pass: smtp.password,
},
});
const mailOptions = {
from: smtp.from_email,
to: task.email,
subject: 'Test email',
text: 'This is a test email from DynamicConsumer',
html: '<b>This is a test email from DynamicConsumer</b>',
};
const info = await transporter.sendMail(mailOptions);
console.log('Email sent:', info.messageId, 'to', task.email);
} catch (err) {
console.error('Error sending email:', err, 'task:', task);
}
}
// Экспортируем экземпляр с установленным обработчиком
export const dynamicConsumer = new DynamicConsumer();
dynamicConsumer.setMessageHandler(processEmailTask);

View File

@ -1,11 +1,5 @@
import dns from 'dns/promises'; import dns from 'dns/promises';
import { Kafka } from 'kafkajs'; import { topicManager } from './topicManager.js';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const producer = kafka.producer();
async function getMxDomain(email) { async function getMxDomain(email) {
const domain = email.split('@')[1]; const domain = email.split('@')[1];
@ -20,30 +14,43 @@ async function getMxDomain(email) {
} }
export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
await producer.connect(); // Группируем подписчиков по домену
// Группируем подписчиков по домену (а не по MX)
const domainMap = {}; const domainMap = {};
for (const sub of subscribers) { for (const sub of subscribers) {
const domain = sub.email.split('@')[1]; const domain = sub.email.split('@')[1];
if (!domainMap[domain]) domainMap[domain] = []; if (!domainMap[domain]) domainMap[domain] = [];
domainMap[domain].push(sub); domainMap[domain].push(sub);
} }
// Берём только первый домен и первый smtp
const domainEntry = Object.entries(domainMap)[0]; // Обрабатываем каждый домен и SMTP сервер
const smtp = smtpServers[0]; for (const [domain, subs] of Object.entries(domainMap)) {
if (domainEntry && smtp) { for (const smtp of smtpServers) {
const [domain, subs] = domainEntry; const topicName = `mail-send-${domain}-${smtp.id}`;
const topic = `mail-send-${domain}-${smtp.id}`;
const messages = subs.map(sub => ({ // Создаем топик если его нет
value: JSON.stringify({ const topicCreated = await topicManager.createTopic(topicName);
if (!topicCreated) {
console.error(`[queueFiller] Failed to create topic: ${topicName}`);
continue;
}
// Отправляем сообщения в топик
const messages = subs.map(sub => ({
campaignId: campaign.id, campaignId: campaign.id,
mx: domain, // для обратной совместимости mx: domain, // для обратной совместимости
subscriberId: sub.id, subscriberId: sub.id,
email: sub.email, email: sub.email,
smtpServerId: smtp.id, smtpServerId: smtp.id,
}), }));
}));
await producer.send({ topic, messages }); for (const message of messages) {
const sent = await topicManager.sendMessage(topicName, message);
if (!sent) {
console.error(`[queueFiller] Failed to send message to topic: ${topicName}`);
}
}
console.log(`[queueFiller] Sent ${messages.length} messages to topic: ${topicName}`);
}
} }
await producer.disconnect();
} }

View File

@ -1,38 +1,26 @@
import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js'; import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js';
import { fillQueueForCampaign } from './queueFiller.js'; import { fillQueueForCampaign } from './queueFiller.js';
import { Op } from 'sequelize'; import { Op } from 'sequelize';
import { Kafka } from 'kafkajs'; import { topicManager } from './topicManager.js';
const BATCH_SIZE = 10000; const BATCH_SIZE = 10000;
async function clearKafkaTopics(prefix = 'mail-send-') { async function clearKafkaTopics(prefix = 'mail-send-') {
const kafka = new Kafka({ const deletedCount = await topicManager.clearTopics(prefix);
clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler', console.log(`[queueFillerJob] Cleared ${deletedCount} topics`);
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
const toDelete = topics.filter(t => t.startsWith(prefix));
if (toDelete.length > 0) {
await admin.deleteTopics({ topics: toDelete });
// Пересоздавать топики не нужно — Kafka создаст их автоматически при отправке сообщений
}
await admin.disconnect();
} }
export async function processScheduledCampaigns() { export async function processScheduledCampaigns() {
// Очищаем все mail-send-* топики перед построением
await clearKafkaTopics('mail-send-');
// 1. Найти все кампании в статусе scheduled
const campaigns = await Campaign.findAll({ const campaigns = await Campaign.findAll({
where: { status: 'scheduled' }, where: { status: 'scheduled' },
include: [MailingGroup, SmtpServer], include: [MailingGroup, SmtpServer],
}); });
for (const campaign of campaigns) { for (const campaign of campaigns) {
// 2. Получить id всех подписчиков группы батчами
let offset = 0; let offset = 0;
let allSubscriberIds = []; let allSubscriberIds = [];
while (true) { while (true) {
const groupSubs = await GroupSubscriber.findAll({ const groupSubs = await GroupSubscriber.findAll({
where: { group_id: campaign.group_id }, where: { group_id: campaign.group_id },
@ -41,12 +29,13 @@ export async function processScheduledCampaigns() {
limit: BATCH_SIZE, limit: BATCH_SIZE,
raw: true, raw: true,
}); });
if (groupSubs.length === 0) break; if (groupSubs.length === 0) break;
allSubscriberIds.push(...groupSubs.map(gs => gs.subscriber_id)); allSubscriberIds.push(...groupSubs.map(gs => gs.subscriber_id));
if (groupSubs.length < BATCH_SIZE) break; if (groupSubs.length < BATCH_SIZE) break;
offset += BATCH_SIZE; offset += BATCH_SIZE;
} }
// 3. Получить подписчиков батчами и сразу отправлять в очередь (не держим всех в памяти)
for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) { for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) {
const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE); const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE);
const subscribers = await Subscriber.findAll({ const subscribers = await Subscriber.findAll({
@ -54,10 +43,11 @@ export async function processScheduledCampaigns() {
attributes: ['id', 'email'], attributes: ['id', 'email'],
raw: true, raw: true,
}); });
const smtpServers = await campaign.getSmtpServers(); const smtpServers = await campaign.getSmtpServers();
await fillQueueForCampaign(campaign, subscribers, smtpServers); await fillQueueForCampaign(campaign, subscribers, smtpServers);
} }
// 4. Обновить статус кампании на sending
await campaign.update({ status: 'sending' }); await campaign.update({ status: 'sending' });
} }
} }

View File

@ -0,0 +1,208 @@
import { Kafka } from 'kafkajs';
import { topicRegistry } from './topicRegistry.js';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'topic-manager',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
const admin = kafka.admin();
const producer = kafka.producer();
export class TopicManager {
constructor() {
this.isConnected = false;
}
async connect() {
if (!this.isConnected) {
await admin.connect();
await producer.connect();
this.isConnected = true;
console.log('[TopicManager] Connected to Kafka');
}
}
async disconnect() {
if (this.isConnected) {
await admin.disconnect();
await producer.disconnect();
this.isConnected = false;
console.log('[TopicManager] Disconnected from Kafka');
}
}
// Создание топика
async createTopic(topicName, partitions = 1, replicationFactor = 1) {
await this.connect();
try {
await admin.createTopics({
topics: [{
topic: topicName,
numPartitions: partitions,
replicationFactor: replicationFactor,
}],
});
// Добавляем топик в реестр
topicRegistry.addTopic(topicName, {
partitions,
replicationFactor,
createdBy: 'TopicManager'
});
console.log(`[TopicManager] Created topic: ${topicName}`);
return true;
} catch (error) {
if (error.message.includes('already exists')) {
console.log(`[TopicManager] Topic ${topicName} already exists`);
// Добавляем существующий топик в реестр
if (!topicRegistry.hasTopic(topicName)) {
topicRegistry.addTopic(topicName, {
partitions,
replicationFactor,
createdBy: 'TopicManager',
existing: true
});
}
return true;
}
console.error(`[TopicManager] Error creating topic ${topicName}:`, error);
return false;
}
}
// Отправка сообщения в топик
async sendMessage(topicName, message) {
await this.connect();
try {
await producer.send({
topic: topicName,
messages: [{ value: JSON.stringify(message) }],
});
// Увеличиваем счетчик сообщений в реестре
topicRegistry.incrementMessageCount(topicName);
console.log(`[TopicManager] Sent message to topic: ${topicName}`);
return true;
} catch (error) {
console.error(`[TopicManager] Error sending message to ${topicName}:`, error);
return false;
}
}
// Получение списка всех топиков
async getAllTopics() {
await this.connect();
try {
const topics = await admin.listTopics();
const mailTopics = topics.filter(topic => topic.startsWith('mail-send-'));
// Синхронизируем реестр с реальными топиками
mailTopics.forEach(topic => {
if (!topicRegistry.hasTopic(topic)) {
topicRegistry.addTopic(topic, { synced: true });
}
});
return mailTopics;
} catch (error) {
console.error('[TopicManager] Error getting topics:', error);
return [];
}
}
// Удаление топика
async deleteTopic(topicName) {
await this.connect();
try {
await admin.deleteTopics({ topics: [topicName] });
// Удаляем топик из реестра
topicRegistry.removeTopic(topicName);
console.log(`[TopicManager] Deleted topic: ${topicName}`);
return true;
} catch (error) {
console.error(`[TopicManager] Error deleting topic ${topicName}:`, error);
return false;
}
}
// Очистка всех топиков с префиксом
async clearTopics(prefix = 'mail-send-') {
await this.connect();
try {
const topics = await this.getAllTopics();
const topicsToDelete = topics.filter(topic => topic.startsWith(prefix));
if (topicsToDelete.length > 0) {
await admin.deleteTopics({ topics: topicsToDelete });
// Очищаем топики из реестра
topicsToDelete.forEach(topic => topicRegistry.removeTopic(topic));
console.log(`[TopicManager] Deleted ${topicsToDelete.length} topics`);
}
return topicsToDelete.length;
} catch (error) {
console.error('[TopicManager] Error clearing topics:', error);
return 0;
}
}
// Получение активных топиков из реестра
getActiveTopics() {
return topicRegistry.getAllTopics();
}
// Проверка существования топика
async topicExists(topicName) {
await this.connect();
try {
const topics = await admin.listTopics();
return topics.includes(topicName);
} catch (error) {
console.error('[TopicManager] Error checking topic existence:', error);
return false;
}
}
// Получение статистики из реестра
getRegistryStatistics() {
return topicRegistry.getStatistics();
}
// Синхронизация реестра с реальными топиками
async syncRegistry() {
const realTopics = await this.getAllTopics();
const registryTopics = topicRegistry.getAllTopics();
// Добавляем новые топики в реестр
realTopics.forEach(topic => {
if (!topicRegistry.hasTopic(topic)) {
topicRegistry.addTopic(topic, { synced: true });
}
});
// Удаляем несуществующие топики из реестра
registryTopics.forEach(topic => {
if (!realTopics.includes(topic)) {
topicRegistry.removeTopic(topic);
}
});
console.log(`[TopicManager] Registry synced. Real topics: ${realTopics.length}, Registry topics: ${topicRegistry.getSize()}`);
}
}
// Экспортируем экземпляр для использования в других модулях
export const topicManager = new TopicManager();

View File

@ -0,0 +1,140 @@
// Реестр топиков - хранилище списка активных топиков
class TopicRegistry {
constructor() {
this.topics = new Set();
this.topicMetadata = new Map(); // Дополнительная информация о топиках
}
// Добавить топик в реестр
addTopic(topicName, metadata = {}) {
this.topics.add(topicName);
this.topicMetadata.set(topicName, {
createdAt: new Date(),
messageCount: 0,
lastMessageAt: null,
...metadata
});
console.log(`[TopicRegistry] Added topic: ${topicName}`);
}
// Удалить топик из реестра
removeTopic(topicName) {
this.topics.delete(topicName);
this.topicMetadata.delete(topicName);
console.log(`[TopicRegistry] Removed topic: ${topicName}`);
}
// Проверить, существует ли топик в реестре
hasTopic(topicName) {
return this.topics.has(topicName);
}
// Получить все топики
getAllTopics() {
return Array.from(this.topics);
}
// Получить топики с фильтром
getTopicsByFilter(filter) {
return Array.from(this.topics).filter(filter);
}
// Получить топики по префиксу
getTopicsByPrefix(prefix) {
return this.getTopicsByFilter(topic => topic.startsWith(prefix));
}
// Получить метаданные топика
getTopicMetadata(topicName) {
return this.topicMetadata.get(topicName) || null;
}
// Обновить метаданные топика
updateTopicMetadata(topicName, updates) {
const current = this.topicMetadata.get(topicName);
if (current) {
this.topicMetadata.set(topicName, { ...current, ...updates });
}
}
// Увеличить счетчик сообщений для топика
incrementMessageCount(topicName) {
const metadata = this.topicMetadata.get(topicName);
if (metadata) {
metadata.messageCount++;
metadata.lastMessageAt = new Date();
}
}
// Получить статистику по топикам
getStatistics() {
const topics = Array.from(this.topics);
const totalMessages = Array.from(this.topicMetadata.values())
.reduce((sum, meta) => sum + meta.messageCount, 0);
const activeTopics = topics.filter(topic => {
const metadata = this.topicMetadata.get(topic);
return metadata && metadata.lastMessageAt &&
(new Date() - metadata.lastMessageAt) < 24 * 60 * 60 * 1000; // Последние 24 часа
});
return {
totalTopics: topics.length,
totalMessages,
activeTopics: activeTopics.length,
topics: topics.map(topic => ({
name: topic,
metadata: this.topicMetadata.get(topic)
}))
};
}
// Очистить все топики
clearAll() {
this.topics.clear();
this.topicMetadata.clear();
console.log('[TopicRegistry] Cleared all topics');
}
// Очистить топики по префиксу
clearByPrefix(prefix) {
const topicsToRemove = this.getTopicsByPrefix(prefix);
topicsToRemove.forEach(topic => {
this.removeTopic(topic);
});
console.log(`[TopicRegistry] Cleared ${topicsToRemove.length} topics with prefix: ${prefix}`);
return topicsToRemove.length;
}
// Получить размер реестра
getSize() {
return this.topics.size;
}
// Проверить, пуст ли реестр
isEmpty() {
return this.topics.size === 0;
}
// Экспорт реестра в JSON
toJSON() {
return {
topics: Array.from(this.topics),
metadata: Object.fromEntries(this.topicMetadata),
statistics: this.getStatistics()
};
}
// Импорт реестра из JSON
fromJSON(data) {
if (data.topics) {
this.topics = new Set(data.topics);
}
if (data.metadata) {
this.topicMetadata = new Map(Object.entries(data.metadata));
}
}
}
// Экспортируем экземпляр для использования в других модулях
export const topicRegistry = new TopicRegistry();