diff --git a/mail-service/src/controllers/trackingController.js b/mail-service/src/controllers/trackingController.js index fcc175c..b55c04a 100644 --- a/mail-service/src/controllers/trackingController.js +++ b/mail-service/src/controllers/trackingController.js @@ -1,4 +1,5 @@ import { DeliveryLog } from '../models/index.js'; +import { trackingService } from '../service/trackingService.js'; export default { async trackOpen(req, res) { @@ -12,12 +13,14 @@ export default { return res.status(404).send('Not found'); } - // Обновляем время открытия, если еще не было установлено + // Отправляем событие в Kafka вместо прямого обновления if (!deliveryLog.opened_at) { - await deliveryLog.update({ - opened_at: new Date() - }); - console.log(`[Tracking] Email opened: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`); + await trackingService.trackEmailOpen( + deliveryLogId, + deliveryLog.campaign_id, + deliveryLog.subscriber_id + ); + console.log(`[Tracking] Email open event sent to Kafka: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`); } // Возвращаем прозрачный 1x1 пиксель @@ -53,12 +56,15 @@ export default { return res.status(404).send('Not found'); } - // Обновляем время клика, если еще не было установлено + // Отправляем событие в Kafka вместо прямого обновления if (!deliveryLog.clicked_at) { - await deliveryLog.update({ - clicked_at: new Date() - }); - console.log(`[Tracking] Email clicked: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`); + await trackingService.trackEmailClick( + deliveryLogId, + deliveryLog.campaign_id, + deliveryLog.subscriber_id, + url + ); + console.log(`[Tracking] Email click event sent to Kafka: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`); } // Перенаправляем на оригинальный URL diff --git a/mail-service/src/controllers/trackingManagementController.js b/mail-service/src/controllers/trackingManagementController.js new file mode 100644 index 0000000..157524e --- /dev/null +++ b/mail-service/src/controllers/trackingManagementController.js @@ -0,0 +1,92 @@ +import { trackingConsumer } from '../service/trackingConsumer.js'; +import { trackingService } from '../service/trackingService.js'; +import { topicManager } from '../service/topicManager.js'; + +export default { + // Получить статус tracking consumer + async getConsumerStatus(req, res) { + try { + const status = { + isActive: trackingConsumer.isActive(), + topicName: trackingConsumer.topicName, + timestamp: new Date().toISOString() + }; + + res.json(status); + } catch (error) { + console.error('[TrackingManagement] Error getting consumer status:', error); + res.status(500).json({ error: 'Internal server error' }); + } + }, + + // Перезапустить tracking consumer + async restartConsumer(req, res) { + try { + console.log('[TrackingManagement] Restarting tracking consumer...'); + + // Останавливаем consumer + await trackingConsumer.stop(); + + // Запускаем заново + await trackingConsumer.start(); + + res.json({ + message: 'Tracking consumer restarted successfully', + timestamp: new Date().toISOString() + }); + } catch (error) { + console.error('[TrackingManagement] Error restarting consumer:', error); + res.status(500).json({ error: 'Failed to restart consumer' }); + } + }, + + // Получить статистику топика отслеживания + async getTopicStatistics(req, res) { + try { + const topicName = trackingService.topicName; + const metadata = topicManager.getRegistryStatistics(); + + const topicStats = metadata.topics.find(topic => topic.name === topicName); + + res.json({ + topicName, + statistics: topicStats || null, + overallStats: metadata, + timestamp: new Date().toISOString() + }); + } catch (error) { + console.error('[TrackingManagement] Error getting topic statistics:', error); + res.status(500).json({ error: 'Internal server error' }); + } + }, + + // Проверить здоровье tracking сервиса + async healthCheck(req, res) { + try { + const consumerActive = trackingConsumer.isActive(); + const topicExists = await topicManager.topicExists(trackingService.topicName); + + const health = { + status: consumerActive && topicExists ? 'healthy' : 'unhealthy', + consumer: { + active: consumerActive + }, + topic: { + name: trackingService.topicName, + exists: topicExists + }, + timestamp: new Date().toISOString() + }; + + const statusCode = health.status === 'healthy' ? 200 : 503; + res.status(statusCode).json(health); + } catch (error) { + console.error('[TrackingManagement] Error in health check:', error); + res.status(503).json({ + status: 'unhealthy', + error: 'Health check failed', + timestamp: new Date().toISOString() + }); + } + } +}; \ No newline at end of file diff --git a/mail-service/src/index.js b/mail-service/src/index.js index 92baa17..f34018a 100644 --- a/mail-service/src/index.js +++ b/mail-service/src/index.js @@ -7,6 +7,8 @@ import { processScheduledCampaigns } from './service/queueFillerJob.js'; import { dynamicConsumer } from './service/dynamicConsumer.js'; import { topicManager } from './service/topicManager.js'; import { topicRegistry } from './service/topicRegistry.js'; +import { trackingService } from './service/trackingService.js'; +import { trackingConsumer } from './service/trackingConsumer.js'; const app = express(); app.use(express.json()); @@ -74,6 +76,64 @@ setInterval(async () => { } })(); -app.listen(PORT, () => { +// Инициализируем и запускаем tracking сервис +(async () => { + try { + // Инициализируем топик для событий отслеживания + await trackingService.initializeTopic(); + + // Запускаем consumer для обработки событий отслеживания + await trackingConsumer.start(); + + console.log('[index] Tracking service initialized and consumer started'); + } catch (err) { + console.error('Tracking service initialization error:', err); + } +})(); + +const server = app.listen(PORT, () => { console.log(`Mail service running on port ${PORT}`); +}); + +// Graceful shutdown +process.on('SIGTERM', async () => { + console.log('SIGTERM received, shutting down gracefully...'); + + try { + // Останавливаем tracking consumer + await trackingConsumer.stop(); + + // Останавливаем dynamic consumer + await dynamicConsumer.disconnect(); + + // Закрываем сервер + server.close(() => { + console.log('Server closed'); + process.exit(0); + }); + } catch (error) { + console.error('Error during graceful shutdown:', error); + process.exit(1); + } +}); + +process.on('SIGINT', async () => { + console.log('SIGINT received, shutting down gracefully...'); + + try { + // Останавливаем tracking consumer + await trackingConsumer.stop(); + + // Останавливаем dynamic consumer + await dynamicConsumer.disconnect(); + + // Закрываем сервер + server.close(() => { + console.log('Server closed'); + process.exit(0); + }); + } catch (error) { + console.error('Error during graceful shutdown:', error); + process.exit(1); + } }); \ No newline at end of file diff --git a/mail-service/src/routes/index.js b/mail-service/src/routes/index.js index 9c26a4e..efc8b13 100644 --- a/mail-service/src/routes/index.js +++ b/mail-service/src/routes/index.js @@ -9,6 +9,7 @@ import deliveryLogRoutes from './deliveryLog.js'; import smtpServerRoutes from './smtpServer.js'; import topicRoutes from './topic.js'; import trackingRoutes from './tracking.js'; +import trackingManagementRoutes from './trackingManagement.js'; import authMiddleware from '../middleware/auth.js'; const router = Router(); @@ -23,5 +24,6 @@ router.use('/delivery-logs', authMiddleware, deliveryLogRoutes); router.use('/smtp-servers', authMiddleware, smtpServerRoutes); router.use('/topics', authMiddleware, topicRoutes); router.use('/track', trackingRoutes); +router.use('/tracking-management', authMiddleware, trackingManagementRoutes); export default router; \ No newline at end of file diff --git a/mail-service/src/routes/trackingManagement.js b/mail-service/src/routes/trackingManagement.js new file mode 100644 index 0000000..61dfa09 --- /dev/null +++ b/mail-service/src/routes/trackingManagement.js @@ -0,0 +1,18 @@ +import express from 'express'; +import trackingManagementController from '../controllers/trackingManagementController.js'; + +const router = express.Router(); + +// Получить статус tracking consumer +router.get('/status', trackingManagementController.getConsumerStatus); + +// Перезапустить tracking consumer +router.post('/restart', trackingManagementController.restartConsumer); + +// Получить статистику топика отслеживания +router.get('/statistics', trackingManagementController.getTopicStatistics); + +// Проверить здоровье tracking сервиса +router.get('/health', trackingManagementController.healthCheck); + +export default router; \ No newline at end of file diff --git a/mail-service/src/service/trackingConsumer.js b/mail-service/src/service/trackingConsumer.js new file mode 100644 index 0000000..009bfed --- /dev/null +++ b/mail-service/src/service/trackingConsumer.js @@ -0,0 +1,138 @@ +import { Kafka } from 'kafkajs'; +import { DeliveryLog } from '../models/index.js'; + +const kafka = new Kafka({ + clientId: process.env.KAFKA_CLIENT_ID || 'tracking-consumer', + brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], +}); + +export class TrackingConsumer { + constructor() { + this.consumer = kafka.consumer({ groupId: 'tracking-consumer-group' }); + this.topicName = 'email-tracking-events'; + this.isRunning = false; + } + + async connect() { + try { + await this.consumer.connect(); + console.log('[TrackingConsumer] Connected to Kafka'); + } catch (error) { + console.error('[TrackingConsumer] Error connecting to Kafka:', error); + throw error; + } + } + + async disconnect() { + try { + await this.consumer.disconnect(); + this.isRunning = false; + console.log('[TrackingConsumer] Disconnected from Kafka'); + } catch (error) { + console.error('[TrackingConsumer] Error disconnecting from Kafka:', error); + } + } + + async start() { + if (this.isRunning) { + console.log('[TrackingConsumer] Already running'); + return; + } + + try { + await this.connect(); + + await this.consumer.subscribe({ + topic: this.topicName, + fromBeginning: false + }); + + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + try { + const event = JSON.parse(message.value.toString()); + await this.processTrackingEvent(event); + } catch (error) { + console.error('[TrackingConsumer] Error processing message:', error); + } + }, + }); + + this.isRunning = true; + console.log(`[TrackingConsumer] Started consuming from topic: ${this.topicName}`); + } catch (error) { + console.error('[TrackingConsumer] Error starting consumer:', error); + throw error; + } + } + + async processTrackingEvent(event) { + try { + const { type, deliveryLogId, campaignId, subscriberId, url, timestamp } = event; + + // Находим запись в DeliveryLog + const deliveryLog = await DeliveryLog.findByPk(deliveryLogId); + + if (!deliveryLog) { + console.warn(`[TrackingConsumer] DeliveryLog not found: ${deliveryLogId}`); + return; + } + + switch (type) { + case 'email_open': + await this.processEmailOpen(deliveryLog, timestamp); + break; + case 'email_click': + await this.processEmailClick(deliveryLog, timestamp, url); + break; + default: + console.warn(`[TrackingConsumer] Unknown event type: ${type}`); + } + } catch (error) { + console.error('[TrackingConsumer] Error processing tracking event:', error); + } + } + + async processEmailOpen(deliveryLog, timestamp) { + // Обновляем время открытия, если еще не было установлено + if (!deliveryLog.opened_at) { + await deliveryLog.update({ + opened_at: new Date(timestamp) + }); + console.log(`[TrackingConsumer] Email opened: deliveryLogId=${deliveryLog.id}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`); + } + } + + async processEmailClick(deliveryLog, timestamp, url) { + // Обновляем время клика, если еще не было установлено + if (!deliveryLog.clicked_at) { + await deliveryLog.update({ + clicked_at: new Date(timestamp) + }); + console.log(`[TrackingConsumer] Email clicked: deliveryLogId=${deliveryLog.id}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`); + } + } + + // Метод для остановки consumer + async stop() { + if (!this.isRunning) { + console.log('[TrackingConsumer] Not running'); + return; + } + + try { + await this.disconnect(); + console.log('[TrackingConsumer] Stopped'); + } catch (error) { + console.error('[TrackingConsumer] Error stopping consumer:', error); + } + } + + // Проверка статуса consumer + isActive() { + return this.isRunning; + } +} + +// Экспортируем экземпляр для использования в других модулях +export const trackingConsumer = new TrackingConsumer(); \ No newline at end of file diff --git a/mail-service/src/service/trackingService.js b/mail-service/src/service/trackingService.js new file mode 100644 index 0000000..14b7c9d --- /dev/null +++ b/mail-service/src/service/trackingService.js @@ -0,0 +1,71 @@ +import { topicManager } from './topicManager.js'; + +export class TrackingService { + constructor() { + this.topicName = 'email-tracking-events'; + } + + // Отправка события открытия письма + async trackEmailOpen(deliveryLogId, campaignId, subscriberId) { + const event = { + type: 'email_open', + deliveryLogId, + campaignId, + subscriberId, + timestamp: new Date().toISOString(), + metadata: { + userAgent: null, // Можно добавить в будущем + ipAddress: null // Можно добавить в будущем + } + }; + + try { + await topicManager.sendMessage(this.topicName, event); + console.log(`[TrackingService] Email open event sent to Kafka: deliveryLogId=${deliveryLogId}`); + return true; + } catch (error) { + console.error('[TrackingService] Error sending email open event to Kafka:', error); + return false; + } + } + + // Отправка события клика по ссылке + async trackEmailClick(deliveryLogId, campaignId, subscriberId, url) { + const event = { + type: 'email_click', + deliveryLogId, + campaignId, + subscriberId, + url, + timestamp: new Date().toISOString(), + metadata: { + userAgent: null, // Можно добавить в будущем + ipAddress: null // Можно добавить в будущем + } + }; + + try { + await topicManager.sendMessage(this.topicName, event); + console.log(`[TrackingService] Email click event sent to Kafka: deliveryLogId=${deliveryLogId}, url=${url}`); + return true; + } catch (error) { + console.error('[TrackingService] Error sending email click event to Kafka:', error); + return false; + } + } + + // Инициализация топика для событий отслеживания + async initializeTopic() { + try { + await topicManager.createTopic(this.topicName, 3, 1); // 3 партиции, 1 реплика + console.log(`[TrackingService] Initialized topic: ${this.topicName}`); + return true; + } catch (error) { + console.error('[TrackingService] Error initializing tracking topic:', error); + return false; + } + } +} + +// Экспортируем экземпляр для использования в других модулях +export const trackingService = new TrackingService(); \ No newline at end of file