traking kafka

This commit is contained in:
romantarkin 2025-08-18 16:15:54 +05:00
parent 3d1308a479
commit aeb0ffc0d2
7 changed files with 398 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import { DeliveryLog } from '../models/index.js'; import { DeliveryLog } from '../models/index.js';
import { trackingService } from '../service/trackingService.js';
export default { export default {
async trackOpen(req, res) { async trackOpen(req, res) {
@ -12,12 +13,14 @@ export default {
return res.status(404).send('Not found'); return res.status(404).send('Not found');
} }
// Обновляем время открытия, если еще не было установлено // Отправляем событие в Kafka вместо прямого обновления
if (!deliveryLog.opened_at) { if (!deliveryLog.opened_at) {
await deliveryLog.update({ await trackingService.trackEmailOpen(
opened_at: new Date() deliveryLogId,
}); deliveryLog.campaign_id,
console.log(`[Tracking] Email opened: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`); deliveryLog.subscriber_id
);
console.log(`[Tracking] Email open event sent to Kafka: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`);
} }
// Возвращаем прозрачный 1x1 пиксель // Возвращаем прозрачный 1x1 пиксель
@ -53,12 +56,15 @@ export default {
return res.status(404).send('Not found'); return res.status(404).send('Not found');
} }
// Обновляем время клика, если еще не было установлено // Отправляем событие в Kafka вместо прямого обновления
if (!deliveryLog.clicked_at) { if (!deliveryLog.clicked_at) {
await deliveryLog.update({ await trackingService.trackEmailClick(
clicked_at: new Date() deliveryLogId,
}); deliveryLog.campaign_id,
console.log(`[Tracking] Email clicked: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`); deliveryLog.subscriber_id,
url
);
console.log(`[Tracking] Email click event sent to Kafka: deliveryLogId=${deliveryLogId}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`);
} }
// Перенаправляем на оригинальный URL // Перенаправляем на оригинальный URL

View File

@ -0,0 +1,92 @@
import { trackingConsumer } from '../service/trackingConsumer.js';
import { trackingService } from '../service/trackingService.js';
import { topicManager } from '../service/topicManager.js';
export default {
// Получить статус tracking consumer
async getConsumerStatus(req, res) {
try {
const status = {
isActive: trackingConsumer.isActive(),
topicName: trackingConsumer.topicName,
timestamp: new Date().toISOString()
};
res.json(status);
} catch (error) {
console.error('[TrackingManagement] Error getting consumer status:', error);
res.status(500).json({ error: 'Internal server error' });
}
},
// Перезапустить tracking consumer
async restartConsumer(req, res) {
try {
console.log('[TrackingManagement] Restarting tracking consumer...');
// Останавливаем consumer
await trackingConsumer.stop();
// Запускаем заново
await trackingConsumer.start();
res.json({
message: 'Tracking consumer restarted successfully',
timestamp: new Date().toISOString()
});
} catch (error) {
console.error('[TrackingManagement] Error restarting consumer:', error);
res.status(500).json({ error: 'Failed to restart consumer' });
}
},
// Получить статистику топика отслеживания
async getTopicStatistics(req, res) {
try {
const topicName = trackingService.topicName;
const metadata = topicManager.getRegistryStatistics();
const topicStats = metadata.topics.find(topic => topic.name === topicName);
res.json({
topicName,
statistics: topicStats || null,
overallStats: metadata,
timestamp: new Date().toISOString()
});
} catch (error) {
console.error('[TrackingManagement] Error getting topic statistics:', error);
res.status(500).json({ error: 'Internal server error' });
}
},
// Проверить здоровье tracking сервиса
async healthCheck(req, res) {
try {
const consumerActive = trackingConsumer.isActive();
const topicExists = await topicManager.topicExists(trackingService.topicName);
const health = {
status: consumerActive && topicExists ? 'healthy' : 'unhealthy',
consumer: {
active: consumerActive
},
topic: {
name: trackingService.topicName,
exists: topicExists
},
timestamp: new Date().toISOString()
};
const statusCode = health.status === 'healthy' ? 200 : 503;
res.status(statusCode).json(health);
} catch (error) {
console.error('[TrackingManagement] Error in health check:', error);
res.status(503).json({
status: 'unhealthy',
error: 'Health check failed',
timestamp: new Date().toISOString()
});
}
}
};

View File

@ -7,6 +7,8 @@ import { processScheduledCampaigns } from './service/queueFillerJob.js';
import { dynamicConsumer } from './service/dynamicConsumer.js'; import { dynamicConsumer } from './service/dynamicConsumer.js';
import { topicManager } from './service/topicManager.js'; import { topicManager } from './service/topicManager.js';
import { topicRegistry } from './service/topicRegistry.js'; import { topicRegistry } from './service/topicRegistry.js';
import { trackingService } from './service/trackingService.js';
import { trackingConsumer } from './service/trackingConsumer.js';
const app = express(); const app = express();
app.use(express.json()); app.use(express.json());
@ -74,6 +76,64 @@ setInterval(async () => {
} }
})(); })();
app.listen(PORT, () => { // Инициализируем и запускаем tracking сервис
(async () => {
try {
// Инициализируем топик для событий отслеживания
await trackingService.initializeTopic();
// Запускаем consumer для обработки событий отслеживания
await trackingConsumer.start();
console.log('[index] Tracking service initialized and consumer started');
} catch (err) {
console.error('Tracking service initialization error:', err);
}
})();
const server = app.listen(PORT, () => {
console.log(`Mail service running on port ${PORT}`); console.log(`Mail service running on port ${PORT}`);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
console.log('SIGTERM received, shutting down gracefully...');
try {
// Останавливаем tracking consumer
await trackingConsumer.stop();
// Останавливаем dynamic consumer
await dynamicConsumer.disconnect();
// Закрываем сервер
server.close(() => {
console.log('Server closed');
process.exit(0);
});
} catch (error) {
console.error('Error during graceful shutdown:', error);
process.exit(1);
}
});
process.on('SIGINT', async () => {
console.log('SIGINT received, shutting down gracefully...');
try {
// Останавливаем tracking consumer
await trackingConsumer.stop();
// Останавливаем dynamic consumer
await dynamicConsumer.disconnect();
// Закрываем сервер
server.close(() => {
console.log('Server closed');
process.exit(0);
});
} catch (error) {
console.error('Error during graceful shutdown:', error);
process.exit(1);
}
}); });

View File

@ -9,6 +9,7 @@ import deliveryLogRoutes from './deliveryLog.js';
import smtpServerRoutes from './smtpServer.js'; import smtpServerRoutes from './smtpServer.js';
import topicRoutes from './topic.js'; import topicRoutes from './topic.js';
import trackingRoutes from './tracking.js'; import trackingRoutes from './tracking.js';
import trackingManagementRoutes from './trackingManagement.js';
import authMiddleware from '../middleware/auth.js'; import authMiddleware from '../middleware/auth.js';
const router = Router(); const router = Router();
@ -23,5 +24,6 @@ router.use('/delivery-logs', authMiddleware, deliveryLogRoutes);
router.use('/smtp-servers', authMiddleware, smtpServerRoutes); router.use('/smtp-servers', authMiddleware, smtpServerRoutes);
router.use('/topics', authMiddleware, topicRoutes); router.use('/topics', authMiddleware, topicRoutes);
router.use('/track', trackingRoutes); router.use('/track', trackingRoutes);
router.use('/tracking-management', authMiddleware, trackingManagementRoutes);
export default router; export default router;

View File

@ -0,0 +1,18 @@
import express from 'express';
import trackingManagementController from '../controllers/trackingManagementController.js';
const router = express.Router();
// Получить статус tracking consumer
router.get('/status', trackingManagementController.getConsumerStatus);
// Перезапустить tracking consumer
router.post('/restart', trackingManagementController.restartConsumer);
// Получить статистику топика отслеживания
router.get('/statistics', trackingManagementController.getTopicStatistics);
// Проверить здоровье tracking сервиса
router.get('/health', trackingManagementController.healthCheck);
export default router;

View File

@ -0,0 +1,138 @@
import { Kafka } from 'kafkajs';
import { DeliveryLog } from '../models/index.js';
const kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID || 'tracking-consumer',
brokers: [process.env.KAFKA_BROKER || 'localhost:9092'],
});
export class TrackingConsumer {
constructor() {
this.consumer = kafka.consumer({ groupId: 'tracking-consumer-group' });
this.topicName = 'email-tracking-events';
this.isRunning = false;
}
async connect() {
try {
await this.consumer.connect();
console.log('[TrackingConsumer] Connected to Kafka');
} catch (error) {
console.error('[TrackingConsumer] Error connecting to Kafka:', error);
throw error;
}
}
async disconnect() {
try {
await this.consumer.disconnect();
this.isRunning = false;
console.log('[TrackingConsumer] Disconnected from Kafka');
} catch (error) {
console.error('[TrackingConsumer] Error disconnecting from Kafka:', error);
}
}
async start() {
if (this.isRunning) {
console.log('[TrackingConsumer] Already running');
return;
}
try {
await this.connect();
await this.consumer.subscribe({
topic: this.topicName,
fromBeginning: false
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
await this.processTrackingEvent(event);
} catch (error) {
console.error('[TrackingConsumer] Error processing message:', error);
}
},
});
this.isRunning = true;
console.log(`[TrackingConsumer] Started consuming from topic: ${this.topicName}`);
} catch (error) {
console.error('[TrackingConsumer] Error starting consumer:', error);
throw error;
}
}
async processTrackingEvent(event) {
try {
const { type, deliveryLogId, campaignId, subscriberId, url, timestamp } = event;
// Находим запись в DeliveryLog
const deliveryLog = await DeliveryLog.findByPk(deliveryLogId);
if (!deliveryLog) {
console.warn(`[TrackingConsumer] DeliveryLog not found: ${deliveryLogId}`);
return;
}
switch (type) {
case 'email_open':
await this.processEmailOpen(deliveryLog, timestamp);
break;
case 'email_click':
await this.processEmailClick(deliveryLog, timestamp, url);
break;
default:
console.warn(`[TrackingConsumer] Unknown event type: ${type}`);
}
} catch (error) {
console.error('[TrackingConsumer] Error processing tracking event:', error);
}
}
async processEmailOpen(deliveryLog, timestamp) {
// Обновляем время открытия, если еще не было установлено
if (!deliveryLog.opened_at) {
await deliveryLog.update({
opened_at: new Date(timestamp)
});
console.log(`[TrackingConsumer] Email opened: deliveryLogId=${deliveryLog.id}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}`);
}
}
async processEmailClick(deliveryLog, timestamp, url) {
// Обновляем время клика, если еще не было установлено
if (!deliveryLog.clicked_at) {
await deliveryLog.update({
clicked_at: new Date(timestamp)
});
console.log(`[TrackingConsumer] Email clicked: deliveryLogId=${deliveryLog.id}, campaignId=${deliveryLog.campaign_id}, subscriberId=${deliveryLog.subscriber_id}, url=${url}`);
}
}
// Метод для остановки consumer
async stop() {
if (!this.isRunning) {
console.log('[TrackingConsumer] Not running');
return;
}
try {
await this.disconnect();
console.log('[TrackingConsumer] Stopped');
} catch (error) {
console.error('[TrackingConsumer] Error stopping consumer:', error);
}
}
// Проверка статуса consumer
isActive() {
return this.isRunning;
}
}
// Экспортируем экземпляр для использования в других модулях
export const trackingConsumer = new TrackingConsumer();

View File

@ -0,0 +1,71 @@
import { topicManager } from './topicManager.js';
export class TrackingService {
constructor() {
this.topicName = 'email-tracking-events';
}
// Отправка события открытия письма
async trackEmailOpen(deliveryLogId, campaignId, subscriberId) {
const event = {
type: 'email_open',
deliveryLogId,
campaignId,
subscriberId,
timestamp: new Date().toISOString(),
metadata: {
userAgent: null, // Можно добавить в будущем
ipAddress: null // Можно добавить в будущем
}
};
try {
await topicManager.sendMessage(this.topicName, event);
console.log(`[TrackingService] Email open event sent to Kafka: deliveryLogId=${deliveryLogId}`);
return true;
} catch (error) {
console.error('[TrackingService] Error sending email open event to Kafka:', error);
return false;
}
}
// Отправка события клика по ссылке
async trackEmailClick(deliveryLogId, campaignId, subscriberId, url) {
const event = {
type: 'email_click',
deliveryLogId,
campaignId,
subscriberId,
url,
timestamp: new Date().toISOString(),
metadata: {
userAgent: null, // Можно добавить в будущем
ipAddress: null // Можно добавить в будущем
}
};
try {
await topicManager.sendMessage(this.topicName, event);
console.log(`[TrackingService] Email click event sent to Kafka: deliveryLogId=${deliveryLogId}, url=${url}`);
return true;
} catch (error) {
console.error('[TrackingService] Error sending email click event to Kafka:', error);
return false;
}
}
// Инициализация топика для событий отслеживания
async initializeTopic() {
try {
await topicManager.createTopic(this.topicName, 3, 1); // 3 партиции, 1 реплика
console.log(`[TrackingService] Initialized topic: ${this.topicName}`);
return true;
} catch (error) {
console.error('[TrackingService] Error initializing tracking topic:', error);
return false;
}
}
}
// Экспортируем экземпляр для использования в других модулях
export const trackingService = new TrackingService();