ads-marketing/mail-service/src/index.js
2025-08-17 18:34:30 +05:00

83 lines
2.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import dotenv from 'dotenv';
dotenv.config();
import express from 'express';
import { sequelize } from './models/index.js';
import routes from './routes/index.js';
import { processScheduledCampaigns } from './service/queueFillerJob.js';
import { dynamicConsumer } from './service/dynamicConsumer.js';
import { topicManager } from './service/topicManager.js';
import { topicRegistry } from './service/topicRegistry.js';
import authMiddleware from './middleware/auth.js';
const app = express();
app.use(express.json());
// Middleware
app.use('/api/mail', authMiddleware);
// Routes
app.use('/api/mail', routes);
const PORT = process.env.PORT || 3000;
(async () => {
try {
await sequelize.authenticate();
// Отключаем автоматическую синхронизацию, так как есть проблемы с ключами
await sequelize.sync({ alter: true });
console.log('Database connected');
} catch (err) {
console.error('Unable to connect to the database:', err);
process.exit(1);
}
})();
let isQueueFilling = false;
// Периодически заполняем очередь для scheduled кампаний
setInterval(async () => {
if (isQueueFilling) return;
isQueueFilling = true;
try {
await processScheduledCampaigns();
} catch (err) {
console.error('Queue fill error:', err);
} finally {
isQueueFilling = false;
}
}, 1000);
// Запускаем динамический consumer и инициализируем реестр
(async () => {
try {
await dynamicConsumer.connect();
// Синхронизируем реестр с существующими топиками
await topicManager.syncRegistry();
// Очищаем старые топики перед началом работы
console.log('[index] Clearing old topics...');
await topicManager.clearTopics('mail-send-');
// Подписываемся на существующие топики
const existingTopics = await topicManager.getAllTopics();
if (existingTopics.length > 0) {
console.log(`[index] Found ${existingTopics.length} existing topics:`, existingTopics);
await dynamicConsumer.subscribeToTopics(existingTopics);
}
// Периодически проверяем новые топики и синхронизируем реестр
setInterval(async () => {
await dynamicConsumer.autoSubscribeToNewTopics();
await topicManager.syncRegistry();
}, 5000); // Проверяем каждые 5 секунд
console.log('[index] Dynamic consumer started with registry synchronization');
} catch (err) {
console.error('Dynamic consumer error:', err);
}
})();
app.listen(PORT, () => {
console.log(`Mail service running on port ${PORT}`);
});