This commit is contained in:
romantarkin 2025-07-29 10:59:23 +05:00
parent 4fabac38e4
commit 8a425b16e7
4 changed files with 159 additions and 6 deletions

View File

@ -1,4 +1,4 @@
import { DeliveryLog, Campaign, Subscriber } from '../models/index.js'; import { DeliveryLog, Campaign, Subscriber, sequelize } from '../models/index.js';
import { Op } from 'sequelize'; import { Op } from 'sequelize';
import { topicManager } from '../service/topicManager.js'; import { topicManager } from '../service/topicManager.js';
@ -106,5 +106,115 @@ export default {
} catch (err) { } catch (err) {
res.status(500).json({ error: err.message }); res.status(500).json({ error: err.message });
} }
},
// Получить статистику по статусам доставки
async getDeliveryStatistics(req, res) {
try {
const { campaignId } = req.query;
let whereClause = {};
if (campaignId) {
whereClause.campaign_id = campaignId;
}
// Получаем общую статистику
const totalCount = await DeliveryLog.count({ where: whereClause });
const sentCount = await DeliveryLog.count({
where: { ...whereClause, status: 'sent' }
});
const failedCount = await DeliveryLog.count({
where: { ...whereClause, status: 'failed' }
});
const bouncedCount = await DeliveryLog.count({
where: { ...whereClause, status: 'bounced' }
});
// Получаем статистику по дням (последние 7 дней)
const sevenDaysAgo = new Date();
sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
const dailyStats = await DeliveryLog.findAll({
where: {
...whereClause,
sent_at: {
[Op.gte]: sevenDaysAgo
}
},
attributes: [
[sequelize.fn('DATE', sequelize.col('sent_at')), 'date'],
[sequelize.fn('COUNT', sequelize.col('id')), 'count'],
'status'
],
group: ['DATE(sent_at)', 'status'],
order: [[sequelize.fn('DATE', sequelize.col('sent_at')), 'ASC']],
raw: true
});
// Получаем топ кампаний по количеству отправленных писем
const topCampaigns = await DeliveryLog.findAll({
where: whereClause,
include: [{
model: Campaign,
as: 'Campaign',
attributes: ['id', 'subject_override']
}],
attributes: [
'campaign_id',
[sequelize.fn('COUNT', sequelize.col('id')), 'total_sent'],
[sequelize.fn('COUNT', sequelize.literal('CASE WHEN status = "sent" THEN 1 END')), 'successful_sent'],
[sequelize.fn('COUNT', sequelize.literal('CASE WHEN status = "failed" THEN 1 END')), 'failed_sent']
],
group: ['campaign_id'],
order: [[sequelize.fn('COUNT', sequelize.col('id')), 'DESC']],
limit: 10,
raw: true
});
res.json({
summary: {
total: totalCount,
sent: sentCount,
failed: failedCount,
bounced: bouncedCount,
successRate: totalCount > 0 ? ((sentCount / totalCount) * 100).toFixed(2) : 0
},
dailyStats,
topCampaigns
});
} catch (err) {
res.status(500).json({ error: err.message });
}
},
// Получить логи доставки по кампании
async getLogsByCampaign(req, res) {
try {
const { campaignId } = req.params;
const limit = parseInt(req.query.limit) || 50;
const offset = parseInt(req.query.offset) || 0;
const status = req.query.status; // опциональный фильтр по статусу
let whereClause = { campaign_id: campaignId };
if (status) {
whereClause.status = status;
}
const result = await DeliveryLog.findAndCountAll({
where: whereClause,
include: [Subscriber],
limit,
offset,
order: [['sent_at', 'DESC']]
});
res.json({
campaignId,
count: result.count,
rows: result.rows
});
} catch (err) {
res.status(500).json({ error: err.message });
}
} }
}; };

View File

@ -6,6 +6,8 @@ const router = Router();
router.post('/', deliveryLogController.create); router.post('/', deliveryLogController.create);
router.get('/', deliveryLogController.getAll); router.get('/', deliveryLogController.getAll);
router.get('/pending-count', deliveryLogController.getPendingCount); router.get('/pending-count', deliveryLogController.getPendingCount);
router.get('/statistics', deliveryLogController.getDeliveryStatistics);
router.get('/campaign/:campaignId', deliveryLogController.getLogsByCampaign);
router.get('/:id', deliveryLogController.getById); router.get('/:id', deliveryLogController.getById);
router.put('/:id', deliveryLogController.update); router.put('/:id', deliveryLogController.update);
router.delete('/:id', deliveryLogController.delete); router.delete('/:id', deliveryLogController.delete);

View File

@ -1,6 +1,6 @@
import { Kafka } from 'kafkajs'; import { Kafka } from 'kafkajs';
import nodemailer from 'nodemailer'; import nodemailer from 'nodemailer';
import { SmtpServer } from '../models/index.js'; import { SmtpServer, DeliveryLog } from '../models/index.js';
import { topicManager } from './topicManager.js'; import { topicManager } from './topicManager.js';
const kafka = new Kafka({ const kafka = new Kafka({
@ -176,11 +176,31 @@ export class DynamicConsumer {
// Обработчик сообщений для отправки email // Обработчик сообщений для отправки email
async function processEmailTask(task, topic) { async function processEmailTask(task, topic) {
let deliveryLog = null;
try { try {
await new Promise((resolve) => setTimeout(resolve, 60_000));
// Создаем запись в DeliveryLog со статусом "sending"
deliveryLog = await DeliveryLog.create({
campaign_id: task.campaignId,
subscriber_id: task.subscriberId,
status: 'sent',
sent_at: new Date(),
error_message: null
});
// Получаем SMTP-сервер из БД // Получаем SMTP-сервер из БД
const smtp = await SmtpServer.findByPk(task.smtpServerId); const smtp = await SmtpServer.findByPk(task.smtpServerId);
if (!smtp) { if (!smtp) {
console.error('SMTP server not found for id', task.smtpServerId); const errorMsg = `SMTP server not found for id ${task.smtpServerId}`;
console.error(errorMsg);
// Обновляем запись в DeliveryLog с ошибкой
await deliveryLog.update({
status: 'failed',
error_message: errorMsg
});
return; return;
} }
@ -197,15 +217,33 @@ async function processEmailTask(task, topic) {
const mailOptions = { const mailOptions = {
from: smtp.from_email, from: smtp.from_email,
to: task.email, to: task.email,
subject: 'Test email', subject: task.subject,
text: 'This is a test email from DynamicConsumer', text: task.text,
html: '<b>This is a test email from DynamicConsumer</b>', html: task.html,
}; };
const info = await transporter.sendMail(mailOptions); const info = await transporter.sendMail(mailOptions);
console.log('Email sent:', info.messageId, 'to', task.email); console.log('Email sent:', info.messageId, 'to', task.email);
// Обновляем запись в DeliveryLog с успешным статусом
await deliveryLog.update({
status: 'sent',
sent_at: new Date(),
error_message: null
});
console.log(`[DynamicConsumer] DeliveryLog updated for campaign ${task.campaignId}, subscriber ${task.subscriberId}`);
} catch (err) { } catch (err) {
console.error('Error sending email:', err, 'task:', task); console.error('Error sending email:', err, 'task:', task);
// Обновляем запись в DeliveryLog с ошибкой
if (deliveryLog) {
await deliveryLog.update({
status: 'failed',
error_message: err.message
});
}
} }
} }

View File

@ -37,6 +37,9 @@ export async function fillQueueForCampaign(campaign, subscribers, smtpServers) {
// Отправляем сообщения в топик // Отправляем сообщения в топик
const messages = subs.map(sub => ({ const messages = subs.map(sub => ({
campaignId: campaign.id, campaignId: campaign.id,
subject: campaign.subject,
text: campaign.text,
html: campaign.html,
mx: domain, // для обратной совместимости mx: domain, // для обратной совместимости
subscriberId: sub.id, subscriberId: sub.id,
email: sub.email, email: sub.email,