diff --git a/docker-compose.yml b/docker-compose.yml index 064d838..c8ccf28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -99,8 +99,6 @@ services: container_name: kafka depends_on: - zookeeper - ports: - - '9092:9092' environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 @@ -113,6 +111,18 @@ services: volumes: - kafka_data:/var/lib/kafka/data + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + environment: + DYNAMIC_CONFIG_ENABLED: 'true' + volumes: + - ./kafka.yaml:/etc/kafkaui/dynamic_config.yaml + depends_on: + - kafka + volumes: mysql_data: kafka_data: diff --git a/kafka.yaml b/kafka.yaml new file mode 100644 index 0000000..8a07001 --- /dev/null +++ b/kafka.yaml @@ -0,0 +1,5 @@ +kafka: + clusters: + - name: local-kafka + bootstrapServers: kafka:9092 + zookeeper: zookeeper:2181 \ No newline at end of file diff --git a/mail-service/src/controllers/deliveryLogController.js b/mail-service/src/controllers/deliveryLogController.js index efd19ba..a94a98e 100644 --- a/mail-service/src/controllers/deliveryLogController.js +++ b/mail-service/src/controllers/deliveryLogController.js @@ -1,6 +1,6 @@ import { DeliveryLog, Campaign, Subscriber } from '../models/index.js'; import { Op } from 'sequelize'; -import { Kafka } from 'kafkajs'; +import { topicManager } from '../service/topicManager.js'; export default { async create(req, res) { @@ -11,6 +11,7 @@ export default { res.status(400).json({ error: err.message }); } }, + async getAll(req, res) { try { const limit = parseInt(req.query.limit) || 20; @@ -26,6 +27,7 @@ export default { res.status(500).json({ error: err.message }); } }, + async getById(req, res) { try { const log = await DeliveryLog.findByPk(req.params.id, { include: [Campaign, Subscriber] }); @@ -35,6 +37,7 @@ export default { res.status(500).json({ error: err.message }); } }, + async update(req, res) { try { const log = await DeliveryLog.findByPk(req.params.id); @@ -45,6 +48,7 @@ export default { res.status(400).json({ error: err.message }); } }, + async delete(req, res) { try { const log = await DeliveryLog.findByPk(req.params.id); @@ -55,35 +59,42 @@ export default { res.status(500).json({ error: err.message }); } }, + async getPendingCount(req, res) { try { - // Kafka config - const kafka = new Kafka({ - clientId: process.env.KAFKA_CLIENT_ID || 'pending-api', - brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], - }); - const admin = kafka.admin(); - await admin.connect(); - const topics = await admin.listTopics(); - const mailTopics = topics.filter(t => t.startsWith('mail-send-')); + // Получаем все топики через TopicManager + const mailTopics = await topicManager.getAllTopics(); let totalLag = 0; + + // Для каждого топика получаем информацию о lag for (const topic of mailTopics) { - const partitions = await admin.fetchTopicOffsets(topic); - // Получаем consumer group id (тот же, что у mailSender) - const groupId = process.env.KAFKA_GROUP_ID || 'mail-sender-group'; - const consumerOffsets = await admin.fetchOffsets({ groupId, topic }); - for (const p of partitions) { - const partition = p.partition; - const latest = parseInt(p.high); - const committed = parseInt( - (consumerOffsets.find(c => c.partition === partition) || {}).offset || '0' - ); - // Если consumer ещё не читал этот partition, offset может быть -1 - const lag = latest - (committed > 0 ? committed : 0); - totalLag += lag > 0 ? lag : 0; + try { + const kafka = new (await import('kafkajs')).Kafka({ + clientId: process.env.KAFKA_CLIENT_ID || 'pending-api', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], + }); + const admin = kafka.admin(); + await admin.connect(); + + const partitions = await admin.fetchTopicOffsets(topic); + const groupId = process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group'; + const consumerOffsets = await admin.fetchOffsets({ groupId, topic }); + + for (const p of partitions) { + const partition = p.partition; + const latest = parseInt(p.high); + const committed = parseInt( + (consumerOffsets.find(c => c.partition === partition) || {}).offset || '0' + ); + const lag = latest - (committed > 0 ? committed : 0); + totalLag += lag > 0 ? lag : 0; + } + + await admin.disconnect(); + } catch (error) { + console.error(`Error getting lag for topic ${topic}:`, error); } } - await admin.disconnect(); const sentCount = await DeliveryLog.count({ where: { @@ -95,5 +106,5 @@ export default { } catch (err) { res.status(500).json({ error: err.message }); } - }, + } }; \ No newline at end of file diff --git a/mail-service/src/controllers/topicController.js b/mail-service/src/controllers/topicController.js new file mode 100644 index 0000000..3e7d43b --- /dev/null +++ b/mail-service/src/controllers/topicController.js @@ -0,0 +1,275 @@ +import { topicManager } from '../service/topicManager.js'; +import { dynamicConsumer } from '../service/dynamicConsumer.js'; +import { topicRegistry } from '../service/topicRegistry.js'; + +export default { + // Получить список всех топиков + async getAllTopics(req, res) { + try { + const topics = await topicManager.getAllTopics(); + const subscribedTopics = dynamicConsumer.getSubscribedTopics(); + + const topicsWithStatus = topics.map(topic => ({ + name: topic, + subscribed: subscribedTopics.includes(topic), + active: topicRegistry.hasTopic(topic), + metadata: topicRegistry.getTopicMetadata(topic) + })); + + res.json({ + topics: topicsWithStatus, + total: topics.length, + subscribed: subscribedTopics.length, + active: topicRegistry.getSize(), + registry: topicRegistry.getStatistics() + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Создать новый топик + async createTopic(req, res) { + try { + const { topicName, partitions = 1, replicationFactor = 1 } = req.body; + + if (!topicName) { + return res.status(400).json({ error: 'Topic name is required' }); + } + + const success = await topicManager.createTopic(topicName, partitions, replicationFactor); + + if (success) { + res.status(201).json({ + message: `Topic ${topicName} created successfully`, + topic: topicName, + metadata: topicRegistry.getTopicMetadata(topicName) + }); + } else { + res.status(500).json({ error: 'Failed to create topic' }); + } + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Удалить топик + async deleteTopic(req, res) { + try { + const { topicName } = req.params; + + if (!topicName) { + return res.status(400).json({ error: 'Topic name is required' }); + } + + const success = await topicManager.deleteTopic(topicName); + + if (success) { + res.json({ + message: `Topic ${topicName} deleted successfully`, + topic: topicName + }); + } else { + res.status(500).json({ error: 'Failed to delete topic' }); + } + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Подписаться на топик + async subscribeToTopic(req, res) { + try { + const { topicName } = req.body; + + if (!topicName) { + return res.status(400).json({ error: 'Topic name is required' }); + } + + const success = await dynamicConsumer.subscribeToTopic(topicName); + + if (success) { + res.json({ + message: `Subscribed to topic ${topicName}`, + topic: topicName + }); + } else { + res.status(500).json({ error: 'Failed to subscribe to topic' }); + } + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Отписаться от топика + async unsubscribeFromTopic(req, res) { + try { + const { topicName } = req.params; + + if (!topicName) { + return res.status(400).json({ error: 'Topic name is required' }); + } + + const success = await dynamicConsumer.unsubscribeFromTopic(topicName); + + if (success) { + res.json({ + message: `Unsubscribed from topic ${topicName}`, + topic: topicName + }); + } else { + res.status(500).json({ error: 'Failed to unsubscribe from topic' }); + } + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Получить статус consumer'а + async getConsumerStatus(req, res) { + try { + const subscribedTopics = dynamicConsumer.getSubscribedTopics(); + const activeTopics = topicRegistry.getAllTopics(); + + res.json({ + isRunning: dynamicConsumer.isRunning, + subscribedTopics, + activeTopics, + subscribedCount: subscribedTopics.length, + activeCount: activeTopics.length, + registry: topicRegistry.getStatistics() + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Очистить все топики + async clearAllTopics(req, res) { + try { + const deletedCount = await topicManager.clearTopics('mail-send-'); + + res.json({ + message: `Cleared ${deletedCount} topics`, + deletedCount + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Отправить тестовое сообщение в топик + async sendTestMessage(req, res) { + try { + const { topicName, message } = req.body; + + if (!topicName || !message) { + return res.status(400).json({ error: 'Topic name and message are required' }); + } + + const success = await topicManager.sendMessage(topicName, message); + + if (success) { + res.json({ + message: `Test message sent to topic ${topicName}`, + topic: topicName, + sentMessage: message, + metadata: topicRegistry.getTopicMetadata(topicName) + }); + } else { + res.status(500).json({ error: 'Failed to send test message' }); + } + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Получить статистику реестра + async getRegistryStatistics(req, res) { + try { + const statistics = topicRegistry.getStatistics(); + res.json(statistics); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Синхронизировать реестр с реальными топиками + async syncRegistry(req, res) { + try { + await topicManager.syncRegistry(); + const statistics = topicRegistry.getStatistics(); + + res.json({ + message: 'Registry synchronized successfully', + statistics + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Получить топики по префиксу + async getTopicsByPrefix(req, res) { + try { + const { prefix } = req.params; + const topics = topicRegistry.getTopicsByPrefix(prefix); + + const topicsWithMetadata = topics.map(topic => ({ + name: topic, + metadata: topicRegistry.getTopicMetadata(topic) + })); + + res.json({ + prefix, + topics: topicsWithMetadata, + count: topics.length + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Очистить реестр + async clearRegistry(req, res) { + try { + topicRegistry.clearAll(); + res.json({ + message: 'Registry cleared successfully', + statistics: topicRegistry.getStatistics() + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Экспорт реестра + async exportRegistry(req, res) { + try { + const exportData = topicRegistry.toJSON(); + res.json(exportData); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Импорт реестра + async importRegistry(req, res) { + try { + const { data } = req.body; + + if (!data) { + return res.status(400).json({ error: 'Import data is required' }); + } + + topicRegistry.fromJSON(data); + + res.json({ + message: 'Registry imported successfully', + statistics: topicRegistry.getStatistics() + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + } +}; \ No newline at end of file diff --git a/mail-service/src/index.js b/mail-service/src/index.js index a76d98a..554735e 100644 --- a/mail-service/src/index.js +++ b/mail-service/src/index.js @@ -4,17 +4,21 @@ import express from 'express'; import { sequelize } from './models/index.js'; import routes from './routes/index.js'; import { processScheduledCampaigns } from './service/queueFillerJob.js'; -import { startMailSender } from './service/mailSender.js'; +import { dynamicConsumer } from './service/dynamicConsumer.js'; +import { topicManager } from './service/topicManager.js'; +import { topicRegistry } from './service/topicRegistry.js'; import authMiddleware from './middleware/auth.js'; const app = express(); app.use(express.json()); -app.get('/', (req, res) => { - res.send('Mail Service is running'); -}); +// Middleware +app.use('/api/mail', authMiddleware); -app.use('/api/mail', authMiddleware, routes); +// Routes +app.use('/api/mail', routes); + +const PORT = process.env.PORT || 3000; (async () => { try { @@ -40,11 +44,35 @@ setInterval(async () => { } finally { isQueueFilling = false; } -}, 1000); // раз в минуту +}, 1000); -startMailSender(); +// Запускаем динамический consumer и инициализируем реестр +(async () => { + try { + await dynamicConsumer.connect(); + + // Синхронизируем реестр с существующими топиками + await topicManager.syncRegistry(); + + // Подписываемся на существующие топики + const existingTopics = await topicManager.getAllTopics(); + if (existingTopics.length > 0) { + console.log(`[index] Found ${existingTopics.length} existing topics:`, existingTopics); + await dynamicConsumer.subscribeToTopics(existingTopics); + } + + // Периодически проверяем новые топики и синхронизируем реестр + setInterval(async () => { + await dynamicConsumer.autoSubscribeToNewTopics(); + await topicManager.syncRegistry(); + }, 5000); // Проверяем каждые 5 секунд + + console.log('[index] Dynamic consumer started with registry synchronization'); + } catch (err) { + console.error('Dynamic consumer error:', err); + } +})(); -const PORT = process.env.PORT || 3000; app.listen(PORT, () => { - console.log(`Mail Service listening on port ${PORT}`); + console.log(`Mail service running on port ${PORT}`); }); \ No newline at end of file diff --git a/mail-service/src/routes/index.js b/mail-service/src/routes/index.js index 8dcb244..b95d241 100644 --- a/mail-service/src/routes/index.js +++ b/mail-service/src/routes/index.js @@ -7,6 +7,7 @@ import emailTemplateVersionRoutes from './emailTemplateVersion.js'; import campaignRoutes from './campaign.js'; import deliveryLogRoutes from './deliveryLog.js'; import smtpServerRoutes from './smtpServer.js'; +import topicRoutes from './topic.js'; const router = Router(); @@ -18,5 +19,6 @@ router.use('/email-template-versions', emailTemplateVersionRoutes); router.use('/campaigns', campaignRoutes); router.use('/delivery-logs', deliveryLogRoutes); router.use('/smtp-servers', smtpServerRoutes); +router.use('/topics', topicRoutes); export default router; \ No newline at end of file diff --git a/mail-service/src/routes/topic.js b/mail-service/src/routes/topic.js new file mode 100644 index 0000000..01b1e5a --- /dev/null +++ b/mail-service/src/routes/topic.js @@ -0,0 +1,48 @@ +import express from 'express'; +import topicController from '../controllers/topicController.js'; + +const router = express.Router(); + +// GET /api/mail/topics - Получить список всех топиков +router.get('/', topicController.getAllTopics); + +// POST /api/mail/topics - Создать новый топик +router.post('/', topicController.createTopic); + +// DELETE /api/mail/topics/:topicName - Удалить топик +router.delete('/:topicName', topicController.deleteTopic); + +// POST /api/mail/topics/subscribe - Подписаться на топик +router.post('/subscribe', topicController.subscribeToTopic); + +// DELETE /api/mail/topics/unsubscribe/:topicName - Отписаться от топика +router.delete('/unsubscribe/:topicName', topicController.unsubscribeFromTopic); + +// GET /api/mail/topics/status - Получить статус consumer'а +router.get('/status', topicController.getConsumerStatus); + +// DELETE /api/mail/topics/clear - Очистить все топики +router.delete('/clear', topicController.clearAllTopics); + +// POST /api/mail/topics/test-message - Отправить тестовое сообщение +router.post('/test-message', topicController.sendTestMessage); + +// GET /api/mail/topics/statistics - Получить статистику реестра +router.get('/statistics', topicController.getRegistryStatistics); + +// POST /api/mail/topics/sync - Синхронизировать реестр +router.post('/sync', topicController.syncRegistry); + +// GET /api/mail/topics/prefix/:prefix - Получить топики по префиксу +router.get('/prefix/:prefix', topicController.getTopicsByPrefix); + +// DELETE /api/mail/topics/registry/clear - Очистить реестр +router.delete('/registry/clear', topicController.clearRegistry); + +// GET /api/mail/topics/registry/export - Экспорт реестра +router.get('/registry/export', topicController.exportRegistry); + +// POST /api/mail/topics/registry/import - Импорт реестра +router.post('/registry/import', topicController.importRegistry); + +export default router; \ No newline at end of file diff --git a/mail-service/src/service/dynamicConsumer.js b/mail-service/src/service/dynamicConsumer.js new file mode 100644 index 0000000..bbbb431 --- /dev/null +++ b/mail-service/src/service/dynamicConsumer.js @@ -0,0 +1,214 @@ +import { Kafka } from 'kafkajs'; +import nodemailer from 'nodemailer'; +import { SmtpServer } from '../models/index.js'; +import { topicManager } from './topicManager.js'; + +const kafka = new Kafka({ + clientId: process.env.KAFKA_CLIENT_ID || 'dynamic-consumer', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], +}); + +export class DynamicConsumer { + constructor() { + this.consumer = kafka.consumer({ + groupId: process.env.KAFKA_GROUP_ID || 'dynamic-consumer-group' + }); + this.subscribedTopics = new Set(); + this.isRunning = false; + this.messageHandler = null; + } + + async connect() { + try { + await this.consumer.connect(); + console.log('[DynamicConsumer] Connected to Kafka'); + return true; + } catch (error) { + console.error('[DynamicConsumer] Connection error:', error); + return false; + } + } + + async disconnect() { + try { + await this.consumer.disconnect(); + this.subscribedTopics.clear(); + this.isRunning = false; + console.log('[DynamicConsumer] Disconnected from Kafka'); + } catch (error) { + console.error('[DynamicConsumer] Disconnect error:', error); + } + } + + // Установка обработчика сообщений + setMessageHandler(handler) { + this.messageHandler = handler; + } + + // Подписка на конкретный топик + async subscribeToTopic(topicName) { + if (this.subscribedTopics.has(topicName)) { + console.log(`[DynamicConsumer] Already subscribed to topic: ${topicName}`); + return true; + } + + try { + await this.consumer.subscribe({ + topic: topicName, + fromBeginning: false + }); + this.subscribedTopics.add(topicName); + console.log(`[DynamicConsumer] Subscribed to topic: ${topicName}`); + + // Если consumer еще не запущен, запускаем его + if (!this.isRunning) { + await this.startConsuming(); + } + + return true; + } catch (error) { + console.error(`[DynamicConsumer] Error subscribing to ${topicName}:`, error); + return false; + } + } + + // Отписка от топика + async unsubscribeFromTopic(topicName) { + if (!this.subscribedTopics.has(topicName)) { + console.log(`[DynamicConsumer] Not subscribed to topic: ${topicName}`); + return true; + } + + try { + await this.consumer.stop(); + this.subscribedTopics.delete(topicName); + + // Переподписываемся на оставшиеся топики + if (this.subscribedTopics.size > 0) { + await this.consumer.subscribe( + Array.from(this.subscribedTopics).map(topic => ({ + topic, + fromBeginning: false + })) + ); + await this.startConsuming(); + } + + console.log(`[DynamicConsumer] Unsubscribed from topic: ${topicName}`); + return true; + } catch (error) { + console.error(`[DynamicConsumer] Error unsubscribing from ${topicName}:`, error); + return false; + } + } + + // Подписка на несколько топиков + async subscribeToTopics(topicNames) { + const results = []; + for (const topicName of topicNames) { + const result = await this.subscribeToTopic(topicName); + results.push({ topic: topicName, success: result }); + } + return results; + } + + // Получение списка подписанных топиков + getSubscribedTopics() { + return Array.from(this.subscribedTopics); + } + + // Запуск обработки сообщений + async startConsuming() { + if (this.isRunning) { + console.log('[DynamicConsumer] Already consuming messages'); + return; + } + + try { + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + console.log(`[DynamicConsumer] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}`); + + if (this.messageHandler) { + try { + const task = JSON.parse(message.value.toString()); + await this.messageHandler(task, topic); + } catch (error) { + console.error(`[DynamicConsumer] Error processing message from ${topic}:`, error); + } + } + }, + }); + + this.isRunning = true; + console.log('[DynamicConsumer] Started consuming messages'); + } catch (error) { + console.error('[DynamicConsumer] Error starting consumer:', error); + } + } + + // Остановка обработки сообщений + async stopConsuming() { + try { + await this.consumer.stop(); + this.isRunning = false; + console.log('[DynamicConsumer] Stopped consuming messages'); + } catch (error) { + console.error('[DynamicConsumer] Error stopping consumer:', error); + } + } + + // Автоматическое обнаружение и подписка на новые топики + async autoSubscribeToNewTopics() { + try { + const allTopics = await topicManager.getAllTopics(); + const newTopics = allTopics.filter(topic => !this.subscribedTopics.has(topic)); + + if (newTopics.length > 0) { + console.log(`[DynamicConsumer] Found ${newTopics.length} new topics:`, newTopics); + await this.subscribeToTopics(newTopics); + } + } catch (error) { + console.error('[DynamicConsumer] Error in auto-subscribe:', error); + } + } +} + +// Обработчик сообщений для отправки email +async function processEmailTask(task, topic) { + try { + // Получаем SMTP-сервер из БД + const smtp = await SmtpServer.findByPk(task.smtpServerId); + if (!smtp) { + console.error('SMTP server not found for id', task.smtpServerId); + return; + } + + const transporter = nodemailer.createTransport({ + host: smtp.host, + port: smtp.port, + secure: smtp.secure, + auth: { + user: smtp.username, + pass: smtp.password, + }, + }); + + const mailOptions = { + from: smtp.from_email, + to: task.email, + subject: 'Test email', + text: 'This is a test email from DynamicConsumer', + html: 'This is a test email from DynamicConsumer', + }; + + const info = await transporter.sendMail(mailOptions); + console.log('Email sent:', info.messageId, 'to', task.email); + } catch (err) { + console.error('Error sending email:', err, 'task:', task); + } +} + +// Экспортируем экземпляр с установленным обработчиком +export const dynamicConsumer = new DynamicConsumer(); +dynamicConsumer.setMessageHandler(processEmailTask); \ No newline at end of file diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index d66f04e..55e1e6b 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -1,11 +1,5 @@ import dns from 'dns/promises'; -import { Kafka } from 'kafkajs'; - -const kafka = new Kafka({ - clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler', - brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], -}); -const producer = kafka.producer(); +import { topicManager } from './topicManager.js'; async function getMxDomain(email) { const domain = email.split('@')[1]; @@ -20,30 +14,43 @@ async function getMxDomain(email) { } export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { - await producer.connect(); - // Группируем подписчиков по домену (а не по MX) + // Группируем подписчиков по домену const domainMap = {}; for (const sub of subscribers) { const domain = sub.email.split('@')[1]; if (!domainMap[domain]) domainMap[domain] = []; domainMap[domain].push(sub); } - // Берём только первый домен и первый smtp - const domainEntry = Object.entries(domainMap)[0]; - const smtp = smtpServers[0]; - if (domainEntry && smtp) { - const [domain, subs] = domainEntry; - const topic = `mail-send-${domain}-${smtp.id}`; - const messages = subs.map(sub => ({ - value: JSON.stringify({ + + // Обрабатываем каждый домен и SMTP сервер + for (const [domain, subs] of Object.entries(domainMap)) { + for (const smtp of smtpServers) { + const topicName = `mail-send-${domain}-${smtp.id}`; + + // Создаем топик если его нет + const topicCreated = await topicManager.createTopic(topicName); + if (!topicCreated) { + console.error(`[queueFiller] Failed to create topic: ${topicName}`); + continue; + } + + // Отправляем сообщения в топик + const messages = subs.map(sub => ({ campaignId: campaign.id, mx: domain, // для обратной совместимости subscriberId: sub.id, email: sub.email, smtpServerId: smtp.id, - }), - })); - await producer.send({ topic, messages }); + })); + + for (const message of messages) { + const sent = await topicManager.sendMessage(topicName, message); + if (!sent) { + console.error(`[queueFiller] Failed to send message to topic: ${topicName}`); + } + } + + console.log(`[queueFiller] Sent ${messages.length} messages to topic: ${topicName}`); + } } - await producer.disconnect(); } \ No newline at end of file diff --git a/mail-service/src/service/queueFillerJob.js b/mail-service/src/service/queueFillerJob.js index f4d9a92..bff3f15 100644 --- a/mail-service/src/service/queueFillerJob.js +++ b/mail-service/src/service/queueFillerJob.js @@ -1,38 +1,26 @@ import { Campaign, MailingGroup, GroupSubscriber, Subscriber, SmtpServer } from '../models/index.js'; import { fillQueueForCampaign } from './queueFiller.js'; import { Op } from 'sequelize'; -import { Kafka } from 'kafkajs'; +import { topicManager } from './topicManager.js'; const BATCH_SIZE = 10000; async function clearKafkaTopics(prefix = 'mail-send-') { - const kafka = new Kafka({ - clientId: process.env.KAFKA_CLIENT_ID || 'queue-filler', - brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], - }); - const admin = kafka.admin(); - await admin.connect(); - const topics = await admin.listTopics(); - const toDelete = topics.filter(t => t.startsWith(prefix)); - if (toDelete.length > 0) { - await admin.deleteTopics({ topics: toDelete }); - // Пересоздавать топики не нужно — Kafka создаст их автоматически при отправке сообщений - } - await admin.disconnect(); + const deletedCount = await topicManager.clearTopics(prefix); + console.log(`[queueFillerJob] Cleared ${deletedCount} topics`); } export async function processScheduledCampaigns() { - // Очищаем все mail-send-* топики перед построением - await clearKafkaTopics('mail-send-'); - // 1. Найти все кампании в статусе scheduled + const campaigns = await Campaign.findAll({ where: { status: 'scheduled' }, include: [MailingGroup, SmtpServer], }); + for (const campaign of campaigns) { - // 2. Получить id всех подписчиков группы батчами let offset = 0; let allSubscriberIds = []; + while (true) { const groupSubs = await GroupSubscriber.findAll({ where: { group_id: campaign.group_id }, @@ -41,12 +29,13 @@ export async function processScheduledCampaigns() { limit: BATCH_SIZE, raw: true, }); + if (groupSubs.length === 0) break; allSubscriberIds.push(...groupSubs.map(gs => gs.subscriber_id)); if (groupSubs.length < BATCH_SIZE) break; offset += BATCH_SIZE; } - // 3. Получить подписчиков батчами и сразу отправлять в очередь (не держим всех в памяти) + for (let i = 0; i < allSubscriberIds.length; i += BATCH_SIZE) { const batchIds = allSubscriberIds.slice(i, i + BATCH_SIZE); const subscribers = await Subscriber.findAll({ @@ -54,10 +43,11 @@ export async function processScheduledCampaigns() { attributes: ['id', 'email'], raw: true, }); + const smtpServers = await campaign.getSmtpServers(); await fillQueueForCampaign(campaign, subscribers, smtpServers); } - // 4. Обновить статус кампании на sending + await campaign.update({ status: 'sending' }); } } \ No newline at end of file diff --git a/mail-service/src/service/topicManager.js b/mail-service/src/service/topicManager.js new file mode 100644 index 0000000..ae8e093 --- /dev/null +++ b/mail-service/src/service/topicManager.js @@ -0,0 +1,208 @@ +import { Kafka } from 'kafkajs'; +import { topicRegistry } from './topicRegistry.js'; + +const kafka = new Kafka({ + clientId: process.env.KAFKA_CLIENT_ID || 'topic-manager', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], +}); + +const admin = kafka.admin(); +const producer = kafka.producer(); + +export class TopicManager { + constructor() { + this.isConnected = false; + } + + async connect() { + if (!this.isConnected) { + await admin.connect(); + await producer.connect(); + this.isConnected = true; + console.log('[TopicManager] Connected to Kafka'); + } + } + + async disconnect() { + if (this.isConnected) { + await admin.disconnect(); + await producer.disconnect(); + this.isConnected = false; + console.log('[TopicManager] Disconnected from Kafka'); + } + } + + // Создание топика + async createTopic(topicName, partitions = 1, replicationFactor = 1) { + await this.connect(); + + try { + await admin.createTopics({ + topics: [{ + topic: topicName, + numPartitions: partitions, + replicationFactor: replicationFactor, + }], + }); + + // Добавляем топик в реестр + topicRegistry.addTopic(topicName, { + partitions, + replicationFactor, + createdBy: 'TopicManager' + }); + + console.log(`[TopicManager] Created topic: ${topicName}`); + return true; + } catch (error) { + if (error.message.includes('already exists')) { + console.log(`[TopicManager] Topic ${topicName} already exists`); + // Добавляем существующий топик в реестр + if (!topicRegistry.hasTopic(topicName)) { + topicRegistry.addTopic(topicName, { + partitions, + replicationFactor, + createdBy: 'TopicManager', + existing: true + }); + } + return true; + } + console.error(`[TopicManager] Error creating topic ${topicName}:`, error); + return false; + } + } + + // Отправка сообщения в топик + async sendMessage(topicName, message) { + await this.connect(); + + try { + await producer.send({ + topic: topicName, + messages: [{ value: JSON.stringify(message) }], + }); + + // Увеличиваем счетчик сообщений в реестре + topicRegistry.incrementMessageCount(topicName); + + console.log(`[TopicManager] Sent message to topic: ${topicName}`); + return true; + } catch (error) { + console.error(`[TopicManager] Error sending message to ${topicName}:`, error); + return false; + } + } + + // Получение списка всех топиков + async getAllTopics() { + await this.connect(); + + try { + const topics = await admin.listTopics(); + const mailTopics = topics.filter(topic => topic.startsWith('mail-send-')); + + // Синхронизируем реестр с реальными топиками + mailTopics.forEach(topic => { + if (!topicRegistry.hasTopic(topic)) { + topicRegistry.addTopic(topic, { synced: true }); + } + }); + + return mailTopics; + } catch (error) { + console.error('[TopicManager] Error getting topics:', error); + return []; + } + } + + // Удаление топика + async deleteTopic(topicName) { + await this.connect(); + + try { + await admin.deleteTopics({ topics: [topicName] }); + + // Удаляем топик из реестра + topicRegistry.removeTopic(topicName); + + console.log(`[TopicManager] Deleted topic: ${topicName}`); + return true; + } catch (error) { + console.error(`[TopicManager] Error deleting topic ${topicName}:`, error); + return false; + } + } + + // Очистка всех топиков с префиксом + async clearTopics(prefix = 'mail-send-') { + await this.connect(); + + try { + const topics = await this.getAllTopics(); + const topicsToDelete = topics.filter(topic => topic.startsWith(prefix)); + + if (topicsToDelete.length > 0) { + await admin.deleteTopics({ topics: topicsToDelete }); + + // Очищаем топики из реестра + topicsToDelete.forEach(topic => topicRegistry.removeTopic(topic)); + + console.log(`[TopicManager] Deleted ${topicsToDelete.length} topics`); + } + + return topicsToDelete.length; + } catch (error) { + console.error('[TopicManager] Error clearing topics:', error); + return 0; + } + } + + // Получение активных топиков из реестра + getActiveTopics() { + return topicRegistry.getAllTopics(); + } + + // Проверка существования топика + async topicExists(topicName) { + await this.connect(); + + try { + const topics = await admin.listTopics(); + return topics.includes(topicName); + } catch (error) { + console.error('[TopicManager] Error checking topic existence:', error); + return false; + } + } + + // Получение статистики из реестра + getRegistryStatistics() { + return topicRegistry.getStatistics(); + } + + // Синхронизация реестра с реальными топиками + async syncRegistry() { + const realTopics = await this.getAllTopics(); + const registryTopics = topicRegistry.getAllTopics(); + + // Добавляем новые топики в реестр + realTopics.forEach(topic => { + if (!topicRegistry.hasTopic(topic)) { + topicRegistry.addTopic(topic, { synced: true }); + } + }); + + // Удаляем несуществующие топики из реестра + registryTopics.forEach(topic => { + if (!realTopics.includes(topic)) { + topicRegistry.removeTopic(topic); + } + }); + + console.log(`[TopicManager] Registry synced. Real topics: ${realTopics.length}, Registry topics: ${topicRegistry.getSize()}`); + } +} + +// Экспортируем экземпляр для использования в других модулях +export const topicManager = new TopicManager(); \ No newline at end of file diff --git a/mail-service/src/service/topicRegistry.js b/mail-service/src/service/topicRegistry.js new file mode 100644 index 0000000..210f7c2 --- /dev/null +++ b/mail-service/src/service/topicRegistry.js @@ -0,0 +1,140 @@ +// Реестр топиков - хранилище списка активных топиков +class TopicRegistry { + constructor() { + this.topics = new Set(); + this.topicMetadata = new Map(); // Дополнительная информация о топиках + } + + // Добавить топик в реестр + addTopic(topicName, metadata = {}) { + this.topics.add(topicName); + this.topicMetadata.set(topicName, { + createdAt: new Date(), + messageCount: 0, + lastMessageAt: null, + ...metadata + }); + console.log(`[TopicRegistry] Added topic: ${topicName}`); + } + + // Удалить топик из реестра + removeTopic(topicName) { + this.topics.delete(topicName); + this.topicMetadata.delete(topicName); + console.log(`[TopicRegistry] Removed topic: ${topicName}`); + } + + // Проверить, существует ли топик в реестре + hasTopic(topicName) { + return this.topics.has(topicName); + } + + // Получить все топики + getAllTopics() { + return Array.from(this.topics); + } + + // Получить топики с фильтром + getTopicsByFilter(filter) { + return Array.from(this.topics).filter(filter); + } + + // Получить топики по префиксу + getTopicsByPrefix(prefix) { + return this.getTopicsByFilter(topic => topic.startsWith(prefix)); + } + + // Получить метаданные топика + getTopicMetadata(topicName) { + return this.topicMetadata.get(topicName) || null; + } + + // Обновить метаданные топика + updateTopicMetadata(topicName, updates) { + const current = this.topicMetadata.get(topicName); + if (current) { + this.topicMetadata.set(topicName, { ...current, ...updates }); + } + } + + // Увеличить счетчик сообщений для топика + incrementMessageCount(topicName) { + const metadata = this.topicMetadata.get(topicName); + if (metadata) { + metadata.messageCount++; + metadata.lastMessageAt = new Date(); + } + } + + // Получить статистику по топикам + getStatistics() { + const topics = Array.from(this.topics); + const totalMessages = Array.from(this.topicMetadata.values()) + .reduce((sum, meta) => sum + meta.messageCount, 0); + + const activeTopics = topics.filter(topic => { + const metadata = this.topicMetadata.get(topic); + return metadata && metadata.lastMessageAt && + (new Date() - metadata.lastMessageAt) < 24 * 60 * 60 * 1000; // Последние 24 часа + }); + + return { + totalTopics: topics.length, + totalMessages, + activeTopics: activeTopics.length, + topics: topics.map(topic => ({ + name: topic, + metadata: this.topicMetadata.get(topic) + })) + }; + } + + // Очистить все топики + clearAll() { + this.topics.clear(); + this.topicMetadata.clear(); + console.log('[TopicRegistry] Cleared all topics'); + } + + // Очистить топики по префиксу + clearByPrefix(prefix) { + const topicsToRemove = this.getTopicsByPrefix(prefix); + topicsToRemove.forEach(topic => { + this.removeTopic(topic); + }); + console.log(`[TopicRegistry] Cleared ${topicsToRemove.length} topics with prefix: ${prefix}`); + return topicsToRemove.length; + } + + // Получить размер реестра + getSize() { + return this.topics.size; + } + + // Проверить, пуст ли реестр + isEmpty() { + return this.topics.size === 0; + } + + // Экспорт реестра в JSON + toJSON() { + return { + topics: Array.from(this.topics), + metadata: Object.fromEntries(this.topicMetadata), + statistics: this.getStatistics() + }; + } + + // Импорт реестра из JSON + fromJSON(data) { + if (data.topics) { + this.topics = new Set(data.topics); + } + if (data.metadata) { + this.topicMetadata = new Map(Object.entries(data.metadata)); + } + } +} + +// Экспортируем экземпляр для использования в других модулях +export const topicRegistry = new TopicRegistry(); \ No newline at end of file