From 8a425b16e7aa4c4dd7be3fe67762ae3a883c6683 Mon Sep 17 00:00:00 2001 From: romantarkin Date: Tue, 29 Jul 2025 10:59:23 +0500 Subject: [PATCH] fix --- .../src/controllers/deliveryLogController.js | 112 +++++++++++++++++- mail-service/src/routes/deliveryLog.js | 2 + mail-service/src/service/dynamicConsumer.js | 48 +++++++- mail-service/src/service/queueFiller.js | 3 + 4 files changed, 159 insertions(+), 6 deletions(-) diff --git a/mail-service/src/controllers/deliveryLogController.js b/mail-service/src/controllers/deliveryLogController.js index a94a98e..a2414c2 100644 --- a/mail-service/src/controllers/deliveryLogController.js +++ b/mail-service/src/controllers/deliveryLogController.js @@ -1,4 +1,4 @@ -import { DeliveryLog, Campaign, Subscriber } from '../models/index.js'; +import { DeliveryLog, Campaign, Subscriber, sequelize } from '../models/index.js'; import { Op } from 'sequelize'; import { topicManager } from '../service/topicManager.js'; @@ -106,5 +106,115 @@ export default { } catch (err) { res.status(500).json({ error: err.message }); } + }, + + // Получить статистику по статусам доставки + async getDeliveryStatistics(req, res) { + try { + const { campaignId } = req.query; + + let whereClause = {}; + if (campaignId) { + whereClause.campaign_id = campaignId; + } + + // Получаем общую статистику + const totalCount = await DeliveryLog.count({ where: whereClause }); + const sentCount = await DeliveryLog.count({ + where: { ...whereClause, status: 'sent' } + }); + const failedCount = await DeliveryLog.count({ + where: { ...whereClause, status: 'failed' } + }); + const bouncedCount = await DeliveryLog.count({ + where: { ...whereClause, status: 'bounced' } + }); + + // Получаем статистику по дням (последние 7 дней) + const sevenDaysAgo = new Date(); + sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); + + const dailyStats = await DeliveryLog.findAll({ + where: { + ...whereClause, + sent_at: { + [Op.gte]: sevenDaysAgo + } + }, + attributes: [ + [sequelize.fn('DATE', sequelize.col('sent_at')), 'date'], + [sequelize.fn('COUNT', sequelize.col('id')), 'count'], + 'status' + ], + group: ['DATE(sent_at)', 'status'], + order: [[sequelize.fn('DATE', sequelize.col('sent_at')), 'ASC']], + raw: true + }); + + // Получаем топ кампаний по количеству отправленных писем + const topCampaigns = await DeliveryLog.findAll({ + where: whereClause, + include: [{ + model: Campaign, + as: 'Campaign', + attributes: ['id', 'subject_override'] + }], + attributes: [ + 'campaign_id', + [sequelize.fn('COUNT', sequelize.col('id')), 'total_sent'], + [sequelize.fn('COUNT', sequelize.literal('CASE WHEN status = "sent" THEN 1 END')), 'successful_sent'], + [sequelize.fn('COUNT', sequelize.literal('CASE WHEN status = "failed" THEN 1 END')), 'failed_sent'] + ], + group: ['campaign_id'], + order: [[sequelize.fn('COUNT', sequelize.col('id')), 'DESC']], + limit: 10, + raw: true + }); + + res.json({ + summary: { + total: totalCount, + sent: sentCount, + failed: failedCount, + bounced: bouncedCount, + successRate: totalCount > 0 ? ((sentCount / totalCount) * 100).toFixed(2) : 0 + }, + dailyStats, + topCampaigns + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } + }, + + // Получить логи доставки по кампании + async getLogsByCampaign(req, res) { + try { + const { campaignId } = req.params; + const limit = parseInt(req.query.limit) || 50; + const offset = parseInt(req.query.offset) || 0; + const status = req.query.status; // опциональный фильтр по статусу + + let whereClause = { campaign_id: campaignId }; + if (status) { + whereClause.status = status; + } + + const result = await DeliveryLog.findAndCountAll({ + where: whereClause, + include: [Subscriber], + limit, + offset, + order: [['sent_at', 'DESC']] + }); + + res.json({ + campaignId, + count: result.count, + rows: result.rows + }); + } catch (err) { + res.status(500).json({ error: err.message }); + } } }; \ No newline at end of file diff --git a/mail-service/src/routes/deliveryLog.js b/mail-service/src/routes/deliveryLog.js index fa59e3b..5f1653b 100644 --- a/mail-service/src/routes/deliveryLog.js +++ b/mail-service/src/routes/deliveryLog.js @@ -6,6 +6,8 @@ const router = Router(); router.post('/', deliveryLogController.create); router.get('/', deliveryLogController.getAll); router.get('/pending-count', deliveryLogController.getPendingCount); +router.get('/statistics', deliveryLogController.getDeliveryStatistics); +router.get('/campaign/:campaignId', deliveryLogController.getLogsByCampaign); router.get('/:id', deliveryLogController.getById); router.put('/:id', deliveryLogController.update); router.delete('/:id', deliveryLogController.delete); diff --git a/mail-service/src/service/dynamicConsumer.js b/mail-service/src/service/dynamicConsumer.js index bbbb431..3738d06 100644 --- a/mail-service/src/service/dynamicConsumer.js +++ b/mail-service/src/service/dynamicConsumer.js @@ -1,6 +1,6 @@ import { Kafka } from 'kafkajs'; import nodemailer from 'nodemailer'; -import { SmtpServer } from '../models/index.js'; +import { SmtpServer, DeliveryLog } from '../models/index.js'; import { topicManager } from './topicManager.js'; const kafka = new Kafka({ @@ -176,11 +176,31 @@ export class DynamicConsumer { // Обработчик сообщений для отправки email async function processEmailTask(task, topic) { + let deliveryLog = null; + try { + await new Promise((resolve) => setTimeout(resolve, 60_000)); + + // Создаем запись в DeliveryLog со статусом "sending" + deliveryLog = await DeliveryLog.create({ + campaign_id: task.campaignId, + subscriber_id: task.subscriberId, + status: 'sent', + sent_at: new Date(), + error_message: null + }); + // Получаем SMTP-сервер из БД const smtp = await SmtpServer.findByPk(task.smtpServerId); if (!smtp) { - console.error('SMTP server not found for id', task.smtpServerId); + const errorMsg = `SMTP server not found for id ${task.smtpServerId}`; + console.error(errorMsg); + + // Обновляем запись в DeliveryLog с ошибкой + await deliveryLog.update({ + status: 'failed', + error_message: errorMsg + }); return; } @@ -197,15 +217,33 @@ async function processEmailTask(task, topic) { const mailOptions = { from: smtp.from_email, to: task.email, - subject: 'Test email', - text: 'This is a test email from DynamicConsumer', - html: 'This is a test email from DynamicConsumer', + subject: task.subject, + text: task.text, + html: task.html, }; const info = await transporter.sendMail(mailOptions); console.log('Email sent:', info.messageId, 'to', task.email); + + // Обновляем запись в DeliveryLog с успешным статусом + await deliveryLog.update({ + status: 'sent', + sent_at: new Date(), + error_message: null + }); + + console.log(`[DynamicConsumer] DeliveryLog updated for campaign ${task.campaignId}, subscriber ${task.subscriberId}`); + } catch (err) { console.error('Error sending email:', err, 'task:', task); + + // Обновляем запись в DeliveryLog с ошибкой + if (deliveryLog) { + await deliveryLog.update({ + status: 'failed', + error_message: err.message + }); + } } } diff --git a/mail-service/src/service/queueFiller.js b/mail-service/src/service/queueFiller.js index 55e1e6b..ff01cf0 100644 --- a/mail-service/src/service/queueFiller.js +++ b/mail-service/src/service/queueFiller.js @@ -37,6 +37,9 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) { // Отправляем сообщения в топик const messages = subs.map(sub => ({ campaignId: campaign.id, + subject: campaign.subject, + text: campaign.text, + html: campaign.html, mx: domain, // для обратной совместимости subscriberId: sub.id, email: sub.email,