diff --git a/mail-service/src/index.js b/mail-service/src/index.js index 8202b8a..4e81376 100644 --- a/mail-service/src/index.js +++ b/mail-service/src/index.js @@ -23,8 +23,9 @@ const PORT = process.env.PORT || 3000; (async () => { try { await sequelize.authenticate(); - await sequelize.sync({ alter: true }); - console.log('Database connected and models synced'); + // Отключаем автоматическую синхронизацию, так как есть проблемы с ключами + // await sequelize.sync({ alter: true }); + console.log('Database connected'); } catch (err) { console.error('Unable to connect to the database:', err); process.exit(1); diff --git a/mail-service/src/service/dynamicConsumer.js b/mail-service/src/service/dynamicConsumer.js index f52feef..eb73f4c 100644 --- a/mail-service/src/service/dynamicConsumer.js +++ b/mail-service/src/service/dynamicConsumer.js @@ -55,7 +55,7 @@ export class DynamicConsumer { try { await this.consumer.subscribe({ topic: topicName, - fromBeginning: false + fromBeginning: true }); this.subscribedTopics.add(topicName); console.log(`[DynamicConsumer] Subscribed to topic: ${topicName}`); @@ -88,7 +88,7 @@ export class DynamicConsumer { await this.consumer.subscribe( Array.from(this.subscribedTopics).map(topic => ({ topic, - fromBeginning: false + fromBeginning: true })) ); await this.startConsuming(); @@ -215,17 +215,19 @@ async function processEmailTask(task, topic) { return; } - try { - await new Promise((resolve) => setTimeout(resolve, 60_000)); - - // Создаем запись в DeliveryLog со статусом "sending" - deliveryLog = await DeliveryLog.create({ - campaign_id: task.campaignId, - subscriber_id: task.subscriberId, - status: 'sent', - sent_at: new Date(), - error_message: null - }); + // Создаем запись в DeliveryLog со статусом "sent" сразу (так как ENUM не поддерживает 'sending') + console.log(`[DynamicConsumer] Creating DeliveryLog record for campaign ${task.campaignId}, subscriber ${task.subscriberId}`); + deliveryLog = await DeliveryLog.create({ + campaign_id: task.campaignId, + subscriber_id: task.subscriberId, + status: 'sent', + sent_at: new Date(), + error_message: null + }); + console.log(`[DynamicConsumer] DeliveryLog record created with ID: ${deliveryLog.id}`); + + try { + console.log(`[DynamicConsumer] Getting SMTP server with ID: ${task.smtpServerId}`); // Получаем SMTP-сервер из БД const smtp = await SmtpServer.findByPk(task.smtpServerId); @@ -244,7 +246,9 @@ async function processEmailTask(task, topic) { return; } + console.log(`[DynamicConsumer] Creating transporter for ${smtp.host}:${smtp.port}, secure: ${smtp.secure}`); const transporter = nodemailer.createTransport({ + host: smtp.host, host: smtp.host, port: smtp.port, secure: smtp.secure, @@ -253,6 +257,7 @@ async function processEmailTask(task, topic) { pass: smtp.password, }, }); + console.log(`[DynamicConsumer] Transporter created successfully`); const mailOptions = { from: smtp.from_email, diff --git a/mail-service/src/service/mailSender.js b/mail-service/src/service/mailSender.js deleted file mode 100644 index dd7c1b0..0000000 --- a/mail-service/src/service/mailSender.js +++ /dev/null @@ -1,57 +0,0 @@ -import { Kafka } from 'kafkajs'; -import nodemailer from 'nodemailer'; -import { SmtpServer } from '../models/index.js'; - -const kafka = new Kafka({ - clientId: process.env.KAFKA_CLIENT_ID || 'mail-sender', - brokers: [process.env.KAFKA_BROKER || 'localhost:9092'], -}); -const consumer = kafka.consumer({ groupId: process.env.KAFKA_GROUP_ID || 'mail-sender-group' }); - -export async function startMailSender() { - await consumer.connect(); - console.log('[mailSender] Consumer connected'); - await consumer.subscribe({ topic: /^mail-send-.+$/, fromBeginning: false }); - console.log('[mailSender] Subscribed to topics: mail-send-*'); - await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { - console.log(`[mailSender] Received message: topic=${topic}, partition=${partition}, offset=${message.offset}, value=${message.value.toString()}`); - // Можно раскомментировать для реальной отправки: - // const task = JSON.parse(message.value.toString()); - // await processTask(task, topic); - }, - }); - console.log('[mailSender] Consumer is running and waiting for messages...'); -} - -// Реальная отправка через SMTP -async function processTask(task, topic) { - try { - // Получаем SMTP-сервер из БД - const smtp = await SmtpServer.findByPk(task.smtpServerId); - if (!smtp) { - console.error('SMTP server not found for id', task.smtpServerId); - return; - } - const transporter = nodemailer.createTransport({ - host: smtp.host, - port: smtp.port, - secure: smtp.secure, - auth: { - user: smtp.username, - pass: smtp.password, - }, - }); - const mailOptions = { - from: smtp.from_email, - to: task.email, - subject: 'Test email', - text: 'This is a test email from mailSender', - html: 'This is a test email from mailSender', - }; - const info = await transporter.sendMail(mailOptions); - console.log('Email sent:', info.messageId, 'to', task.email); - } catch (err) { - console.error('Error sending email:', err, 'task:', task); - } -} \ No newline at end of file diff --git a/mail-service/src/service/queueFillerJob.js b/mail-service/src/service/queueFillerJob.js index 53f08ab..8bf3d73 100644 --- a/mail-service/src/service/queueFillerJob.js +++ b/mail-service/src/service/queueFillerJob.js @@ -15,18 +15,12 @@ export async function processScheduledCampaigns() { const campaigns = await Campaign.findAll({ where: { status: 'scheduled', - // Исключаем кампании, которые уже обрабатываются или отправлены - id: { - [Op.notIn]: sequelize.literal(`( - SELECT DISTINCT campaign_id - FROM delivery_logs - WHERE campaign_id = Campaign.id - )`) + scheduled_at: { + [Op.lte]: new Date() // Только кампании, запланированные на текущее время или в прошлом } }, include: [ MailingGroup, - SmtpServer, { model: EmailTemplateVersion, as: 'EmailTemplateVersion' @@ -98,7 +92,8 @@ export async function processScheduledCampaigns() { }); if (campaignLogs.length > 0) { - console.log(`[queueFillerJob] Campaign ${campaign.id} already has delivery logs, skipping`); + console.log(`[queueFillerJob] Campaign ${campaign.id} already has delivery logs, updating status to 'sent'`); + await campaign.update({ status: 'sent' }); continue; }