251125:0000 Phase 6 wait start dev Check
This commit is contained in:
@@ -5,6 +5,7 @@ import {
|
||||
CreateDateColumn,
|
||||
ManyToOne,
|
||||
JoinColumn,
|
||||
PrimaryColumn, // ✅ [Fix] เพิ่ม Import นี้
|
||||
} from 'typeorm';
|
||||
import { User } from '../../user/entities/user.entity';
|
||||
|
||||
@@ -44,7 +45,9 @@ export class Notification {
|
||||
@Column({ name: 'entity_id', nullable: true })
|
||||
entityId?: number;
|
||||
|
||||
// ✅ [Fix] รวม Decorator ไว้ที่นี่ที่เดียว (เป็นทั้ง CreateDate และ PrimaryColumn สำหรับ Partition)
|
||||
@CreateDateColumn({ name: 'created_at' })
|
||||
@PrimaryColumn()
|
||||
createdAt!: Date;
|
||||
|
||||
// --- Relations ---
|
||||
|
||||
@@ -1,26 +1,43 @@
|
||||
import { Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { Job } from 'bullmq';
|
||||
// File: src/modules/notification/notification.processor.ts
|
||||
|
||||
import { Processor, WorkerHost, InjectQueue } from '@nestjs/bullmq';
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { InjectRedis } from '@nestjs-modules/ioredis';
|
||||
import Redis from 'ioredis';
|
||||
import * as nodemailer from 'nodemailer';
|
||||
import axios from 'axios';
|
||||
|
||||
import { UserService } from '../user/user.service';
|
||||
|
||||
interface NotificationPayload {
|
||||
userId: number;
|
||||
title: string;
|
||||
message: string;
|
||||
link: string;
|
||||
type: 'EMAIL' | 'LINE' | 'SYSTEM';
|
||||
}
|
||||
|
||||
@Processor('notifications')
|
||||
export class NotificationProcessor extends WorkerHost {
|
||||
private readonly logger = new Logger(NotificationProcessor.name);
|
||||
private mailerTransport: nodemailer.Transporter;
|
||||
|
||||
// ค่าคงที่สำหรับ Digest (เช่น รอ 5 นาที)
|
||||
private readonly DIGEST_DELAY = 5 * 60 * 1000;
|
||||
|
||||
constructor(
|
||||
private configService: ConfigService,
|
||||
private userService: UserService,
|
||||
@InjectQueue('notifications') private notificationQueue: Queue,
|
||||
@InjectRedis() private readonly redis: Redis,
|
||||
) {
|
||||
super();
|
||||
// Setup Nodemailer
|
||||
this.mailerTransport = nodemailer.createTransport({
|
||||
host: this.configService.get('SMTP_HOST'),
|
||||
port: this.configService.get('SMTP_PORT'),
|
||||
port: Number(this.configService.get('SMTP_PORT')),
|
||||
secure: this.configService.get('SMTP_SECURE') === 'true',
|
||||
auth: {
|
||||
user: this.configService.get('SMTP_USER'),
|
||||
@@ -30,59 +47,196 @@ export class NotificationProcessor extends WorkerHost {
|
||||
}
|
||||
|
||||
async process(job: Job<any, any, string>): Promise<any> {
|
||||
this.logger.debug(`Processing job ${job.name} for user ${job.data.userId}`);
|
||||
this.logger.debug(`Processing job ${job.name} (ID: ${job.id})`);
|
||||
|
||||
switch (job.name) {
|
||||
case 'send-email':
|
||||
return this.handleSendEmail(job.data);
|
||||
case 'send-line':
|
||||
return this.handleSendLine(job.data);
|
||||
default:
|
||||
throw new Error(`Unknown job name: ${job.name}`);
|
||||
try {
|
||||
switch (job.name) {
|
||||
case 'dispatch-notification':
|
||||
// Job หลัก: ตัดสินใจว่าจะส่งเลย หรือจะเข้า Digest Queue
|
||||
return this.handleDispatch(job.data);
|
||||
|
||||
case 'process-digest':
|
||||
// Job รอง: ทำงานเมื่อครบเวลา Delay เพื่อส่งแบบรวม
|
||||
return this.handleProcessDigest(job.data.userId, job.data.type);
|
||||
|
||||
default:
|
||||
throw new Error(`Unknown job name: ${job.name}`);
|
||||
}
|
||||
} catch (error) {
|
||||
// ✅ แก้ไขตรงนี้: Type Casting (error as Error)
|
||||
this.logger.error(
|
||||
`Failed to process job ${job.name}: ${(error as Error).message}`,
|
||||
(error as Error).stack,
|
||||
);
|
||||
throw error; // ให้ BullMQ จัดการ Retry
|
||||
}
|
||||
}
|
||||
|
||||
private async handleSendEmail(data: any) {
|
||||
const user = await this.userService.findOne(data.userId);
|
||||
if (!user || !user.email) {
|
||||
this.logger.warn(`User ${data.userId} has no email`);
|
||||
/**
|
||||
* ฟังก์ชันตัดสินใจ (Dispatcher)
|
||||
* ตรวจสอบ User Preferences และ Digest Mode
|
||||
*/
|
||||
private async handleDispatch(data: NotificationPayload) {
|
||||
// 1. ดึง User พร้อม Preferences
|
||||
const user: any = await this.userService.findOne(data.userId);
|
||||
|
||||
if (!user) {
|
||||
this.logger.warn(`User ${data.userId} not found, skipping notification.`);
|
||||
return;
|
||||
}
|
||||
|
||||
const prefs = user.preferences || {
|
||||
notify_email: true,
|
||||
notify_line: true,
|
||||
digest_mode: false,
|
||||
};
|
||||
|
||||
// 2. ตรวจสอบว่า User ปิดรับการแจ้งเตือนหรือไม่
|
||||
if (data.type === 'EMAIL' && !prefs.notify_email) return;
|
||||
if (data.type === 'LINE' && !prefs.notify_line) return;
|
||||
|
||||
// 3. ตรวจสอบ Digest Mode
|
||||
if (prefs.digest_mode) {
|
||||
await this.addToDigest(data);
|
||||
} else {
|
||||
// ส่งทันที (Real-time)
|
||||
if (data.type === 'EMAIL') await this.sendEmailImmediate(user, data);
|
||||
if (data.type === 'LINE') await this.sendLineImmediate(user, data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* เพิ่มข้อความลงใน Redis List และตั้งเวลาส่ง (Delayed Job)
|
||||
*/
|
||||
private async addToDigest(data: NotificationPayload) {
|
||||
const key = `digest:${data.type}:${data.userId}`;
|
||||
|
||||
// 1. Push ข้อมูลลง Redis List
|
||||
await this.redis.rpush(key, JSON.stringify(data));
|
||||
|
||||
// 2. ตรวจสอบว่ามี "ตัวนับเวลาถอยหลัง" (Delayed Job) อยู่หรือยัง?
|
||||
const lockKey = `digest:lock:${data.type}:${data.userId}`;
|
||||
const isLocked = await this.redis.get(lockKey);
|
||||
|
||||
if (!isLocked) {
|
||||
// ถ้ายังไม่มี Job รออยู่ ให้สร้างใหม่
|
||||
await this.notificationQueue.add(
|
||||
'process-digest',
|
||||
{ userId: data.userId, type: data.type },
|
||||
{
|
||||
delay: this.DIGEST_DELAY,
|
||||
jobId: `digest-${data.type}-${data.userId}-${Date.now()}`,
|
||||
},
|
||||
);
|
||||
|
||||
// Set Lock ไว้ตามเวลา Delay เพื่อไม่ให้สร้าง Job ซ้ำ
|
||||
await this.redis.set(lockKey, '1', 'PX', this.DIGEST_DELAY);
|
||||
this.logger.log(
|
||||
`Scheduled digest for User ${data.userId} (${data.type}) in ${this.DIGEST_DELAY}ms`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ประมวลผล Digest (ส่งแบบรวม)
|
||||
*/
|
||||
private async handleProcessDigest(userId: number, type: 'EMAIL' | 'LINE') {
|
||||
const key = `digest:${type}:${userId}`;
|
||||
const lockKey = `digest:lock:${type}:${userId}`;
|
||||
|
||||
// 1. ดึงข้อความทั้งหมดจาก Redis และลบออกทันที
|
||||
const messagesRaw = await this.redis.lrange(key, 0, -1);
|
||||
await this.redis.del(key);
|
||||
await this.redis.del(lockKey); // Clear lock
|
||||
|
||||
if (!messagesRaw || messagesRaw.length === 0) return;
|
||||
|
||||
const messages: NotificationPayload[] = messagesRaw.map((m) =>
|
||||
JSON.parse(m),
|
||||
);
|
||||
const user = await this.userService.findOne(userId);
|
||||
|
||||
if (type === 'EMAIL') {
|
||||
await this.sendEmailDigest(user, messages);
|
||||
} else if (type === 'LINE') {
|
||||
await this.sendLineDigest(user, messages);
|
||||
}
|
||||
}
|
||||
|
||||
// =====================================================
|
||||
// SENDERS (Immediate & Digest)
|
||||
// =====================================================
|
||||
|
||||
private async sendEmailImmediate(user: any, data: NotificationPayload) {
|
||||
if (!user.email) return;
|
||||
await this.mailerTransport.sendMail({
|
||||
from: '"LCBP3 DMS" <no-reply@np-dms.work>',
|
||||
to: user.email,
|
||||
subject: `[DMS] ${data.title}`,
|
||||
html: `
|
||||
<h3>${data.title}</h3>
|
||||
<p>${data.message}</p>
|
||||
<br/>
|
||||
<a href="${data.link}">คลิกเพื่อดูรายละเอียด</a>
|
||||
`,
|
||||
html: `<h3>${data.title}</h3><p>${data.message}</p><br/><a href="${data.link}">คลิกเพื่อดูรายละเอียด</a>`,
|
||||
});
|
||||
this.logger.log(`Email sent to ${user.email}`);
|
||||
}
|
||||
|
||||
private async handleSendLine(data: any) {
|
||||
const user = await this.userService.findOne(data.userId);
|
||||
// ตรวจสอบว่า User มี Line ID หรือไม่ (หรือใช้ Group Token ถ้าเป็นระบบรวม)
|
||||
// ในที่นี้สมมติว่าเรายิงเข้า n8n webhook เพื่อจัดการต่อ
|
||||
const n8nWebhookUrl = this.configService.get('N8N_LINE_WEBHOOK_URL');
|
||||
private async sendEmailDigest(user: any, messages: NotificationPayload[]) {
|
||||
if (!user.email) return;
|
||||
|
||||
if (!n8nWebhookUrl) {
|
||||
this.logger.warn('N8N_LINE_WEBHOOK_URL not configured');
|
||||
return;
|
||||
}
|
||||
// สร้าง HTML List
|
||||
const listItems = messages
|
||||
.map(
|
||||
(msg) =>
|
||||
`<li><strong>${msg.title}</strong>: ${msg.message} <a href="${msg.link}">[View]</a></li>`,
|
||||
)
|
||||
.join('');
|
||||
|
||||
await this.mailerTransport.sendMail({
|
||||
from: '"LCBP3 DMS" <no-reply@np-dms.work>',
|
||||
to: user.email,
|
||||
subject: `[DMS Summary] คุณมีการแจ้งเตือนใหม่ ${messages.length} รายการ`,
|
||||
html: `
|
||||
<h3>สรุปรายการแจ้งเตือน (Digest)</h3>
|
||||
<ul>${listItems}</ul>
|
||||
<p>คุณได้รับอีเมลนี้เพราะเปิดใช้งานโหมดสรุปรายการ</p>
|
||||
`,
|
||||
});
|
||||
this.logger.log(
|
||||
`Digest Email sent to ${user.email} (${messages.length} items)`,
|
||||
);
|
||||
}
|
||||
|
||||
private async sendLineImmediate(user: any, data: NotificationPayload) {
|
||||
const n8nWebhookUrl = this.configService.get('N8N_LINE_WEBHOOK_URL');
|
||||
if (!n8nWebhookUrl) return;
|
||||
|
||||
try {
|
||||
await axios.post(n8nWebhookUrl, {
|
||||
userId: user.user_id, // หรือ user.lineId ถ้ามี
|
||||
userId: user.user_id,
|
||||
message: `${data.title}\n${data.message}`,
|
||||
link: data.link,
|
||||
isDigest: false,
|
||||
});
|
||||
this.logger.log(`Line notification sent via n8n for user ${data.userId}`);
|
||||
} catch (error: any) {
|
||||
throw new Error(`Failed to send Line notification: ${error.message}`);
|
||||
} catch (error) {
|
||||
// ✅ แก้ไขตรงนี้ด้วย: Type Casting (error as Error)
|
||||
this.logger.error(`Line Error: ${(error as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async sendLineDigest(user: any, messages: NotificationPayload[]) {
|
||||
const n8nWebhookUrl = this.configService.get('N8N_LINE_WEBHOOK_URL');
|
||||
if (!n8nWebhookUrl) return;
|
||||
|
||||
const summary = messages.map((m, i) => `${i + 1}. ${m.title}`).join('\n');
|
||||
|
||||
try {
|
||||
await axios.post(n8nWebhookUrl, {
|
||||
userId: user.user_id,
|
||||
message: `สรุป ${messages.length} รายการใหม่:\n${summary}`,
|
||||
link: 'https://lcbp3.np-dms.work/notifications',
|
||||
isDigest: true,
|
||||
});
|
||||
} catch (error) {
|
||||
// ✅ แก้ไขตรงนี้ด้วย: Type Casting (error as Error)
|
||||
this.logger.error(`Line Digest Error: ${(error as Error).message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
// File: src/modules/notification/notification.service.ts
|
||||
|
||||
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
|
||||
import { InjectQueue } from '@nestjs/bullmq';
|
||||
import { Queue } from 'bullmq';
|
||||
@@ -22,9 +23,9 @@ export interface NotificationJobData {
|
||||
title: string;
|
||||
message: string;
|
||||
type: 'EMAIL' | 'LINE' | 'SYSTEM'; // ช่องทางหลักที่ต้องการส่ง (Trigger Type)
|
||||
entityType?: string; // e.g., 'rfa', 'correspondence'
|
||||
entityId?: number; // e.g., rfa_id
|
||||
link?: string; // Deep link to frontend page
|
||||
entityType?: string;
|
||||
entityId?: number;
|
||||
link?: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
@@ -37,109 +38,57 @@ export class NotificationService {
|
||||
private notificationRepo: Repository<Notification>,
|
||||
@InjectRepository(User)
|
||||
private userRepo: Repository<User>,
|
||||
@InjectRepository(UserPreference)
|
||||
private userPrefRepo: Repository<UserPreference>,
|
||||
// ไม่ต้อง Inject UserPrefRepo แล้ว เพราะ Processor จะจัดการเอง
|
||||
private notificationGateway: NotificationGateway,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* ส่งการแจ้งเตือน (Centralized Notification Sender)
|
||||
* 1. บันทึก DB (System Log)
|
||||
* 2. ส่ง Real-time (WebSocket)
|
||||
* 3. ส่ง External (Email/Line) ผ่าน Queue ตาม User Preference
|
||||
*/
|
||||
async send(data: NotificationJobData): Promise<void> {
|
||||
try {
|
||||
// ---------------------------------------------------------
|
||||
// 1. สร้าง Entity และบันทึกลง DB (เพื่อให้มี History ในระบบ)
|
||||
// 1. สร้าง Entity และบันทึกลง DB (System Log)
|
||||
// ---------------------------------------------------------
|
||||
const notification = this.notificationRepo.create({
|
||||
userId: data.userId,
|
||||
title: data.title,
|
||||
message: data.message,
|
||||
notificationType: NotificationType.SYSTEM, // ใน DB เก็บเป็น SYSTEM เสมอเพื่อแสดงใน App
|
||||
notificationType: NotificationType.SYSTEM,
|
||||
entityType: data.entityType,
|
||||
entityId: data.entityId,
|
||||
isRead: false,
|
||||
// link: data.link // ถ้า Entity มี field link ให้ใส่ด้วย
|
||||
});
|
||||
|
||||
const savedNotification = await this.notificationRepo.save(notification);
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 2. Real-time Push (WebSocket) -> ส่งให้ User ทันทีถ้า Online
|
||||
// 2. Real-time Push (WebSocket)
|
||||
// ---------------------------------------------------------
|
||||
this.notificationGateway.sendToUser(data.userId, savedNotification);
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 3. ตรวจสอบ User Preferences เพื่อส่งช่องทางอื่น (Email/Line)
|
||||
// 3. Push Job ลง Redis BullMQ (Dispatch Logic)
|
||||
// เปลี่ยนชื่อ Job เป็น 'dispatch-notification' ตาม Processor
|
||||
// ---------------------------------------------------------
|
||||
const userPref = await this.userPrefRepo.findOne({
|
||||
where: { userId: data.userId },
|
||||
});
|
||||
|
||||
// ใช้ Nullish Coalescing Operator (??)
|
||||
// ถ้าไม่มีค่า (undefined/null) ให้ Default เป็น true
|
||||
const shouldSendEmail = userPref?.notifyEmail ?? true;
|
||||
const shouldSendLine = userPref?.notifyLine ?? true;
|
||||
|
||||
const jobs = [];
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 4. เตรียม Job สำหรับ Email Queue
|
||||
// เงื่อนไข: User เปิดรับ Email และ Noti นี้ไม่ได้บังคับส่งแค่ LINE
|
||||
// ---------------------------------------------------------
|
||||
if (shouldSendEmail && data.type !== 'LINE') {
|
||||
jobs.push({
|
||||
name: 'send-email',
|
||||
data: {
|
||||
...data,
|
||||
notificationId: savedNotification.id,
|
||||
target: 'EMAIL',
|
||||
await this.notificationQueue.add(
|
||||
'dispatch-notification',
|
||||
{
|
||||
...data,
|
||||
notificationId: savedNotification.id, // ส่ง ID ไปด้วยเผื่อใช้ Tracking
|
||||
},
|
||||
{
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000,
|
||||
},
|
||||
opts: {
|
||||
attempts: 3, // ลองใหม่ 3 ครั้งถ้าล่ม (Resilience)
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000, // รอ 5s, 10s, 20s...
|
||||
},
|
||||
removeOnComplete: true, // ลบ Job เมื่อเสร็จ (ประหยัด Redis Memory)
|
||||
},
|
||||
});
|
||||
}
|
||||
removeOnComplete: true,
|
||||
},
|
||||
);
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 5. เตรียม Job สำหรับ Line Queue
|
||||
// เงื่อนไข: User เปิดรับ Line และ Noti นี้ไม่ได้บังคับส่งแค่ EMAIL
|
||||
// ---------------------------------------------------------
|
||||
if (shouldSendLine && data.type !== 'EMAIL') {
|
||||
jobs.push({
|
||||
name: 'send-line',
|
||||
data: {
|
||||
...data,
|
||||
notificationId: savedNotification.id,
|
||||
target: 'LINE',
|
||||
},
|
||||
opts: {
|
||||
attempts: 3,
|
||||
backoff: { type: 'fixed', delay: 3000 },
|
||||
removeOnComplete: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------
|
||||
// 6. Push Jobs ลง Redis BullMQ
|
||||
// ---------------------------------------------------------
|
||||
if (jobs.length > 0) {
|
||||
await this.notificationQueue.addBulk(jobs);
|
||||
this.logger.debug(
|
||||
`Queued ${jobs.length} external notifications for user ${data.userId}`,
|
||||
);
|
||||
}
|
||||
this.logger.debug(`Dispatched notification job for user ${data.userId}`);
|
||||
} catch (error) {
|
||||
// Error Handling: ไม่ Throw เพื่อไม่ให้ Flow หลัก (เช่น การสร้างเอกสาร) พัง
|
||||
// แต่บันทึก Error ไว้ตรวจสอบ
|
||||
this.logger.error(
|
||||
`Failed to process notification for user ${data.userId}`,
|
||||
(error as Error).stack,
|
||||
@@ -147,9 +96,8 @@ export class NotificationService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* ดึงรายการแจ้งเตือนของ User (สำหรับ Controller)
|
||||
*/
|
||||
// ... (ส่วน findAll, markAsRead, cleanupOldNotifications เหมือนเดิม ไม่ต้องแก้) ...
|
||||
|
||||
async findAll(userId: number, searchDto: SearchNotificationDto) {
|
||||
const { page = 1, limit = 20, isRead } = searchDto;
|
||||
const skip = (page - 1) * limit;
|
||||
@@ -161,14 +109,11 @@ export class NotificationService {
|
||||
.take(limit)
|
||||
.skip(skip);
|
||||
|
||||
// Filter by Read Status (ถ้ามีการส่งมา)
|
||||
if (isRead !== undefined) {
|
||||
queryBuilder.andWhere('notification.isRead = :isRead', { isRead });
|
||||
}
|
||||
|
||||
const [items, total] = await queryBuilder.getManyAndCount();
|
||||
|
||||
// นับจำนวนที่ยังไม่ได้อ่านทั้งหมด (เพื่อแสดง Badge ที่กระดิ่ง)
|
||||
const unreadCount = await this.notificationRepo.count({
|
||||
where: { userId, isRead: false },
|
||||
});
|
||||
@@ -185,9 +130,6 @@ export class NotificationService {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* อ่านแจ้งเตือน (Mark as Read)
|
||||
*/
|
||||
async markAsRead(id: number, userId: number): Promise<void> {
|
||||
const notification = await this.notificationRepo.findOne({
|
||||
where: { id, userId },
|
||||
@@ -200,15 +142,9 @@ export class NotificationService {
|
||||
if (!notification.isRead) {
|
||||
notification.isRead = true;
|
||||
await this.notificationRepo.save(notification);
|
||||
|
||||
// Update Unread Count via WebSocket (Optional)
|
||||
// this.notificationGateway.sendUnreadCount(userId, ...);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* อ่านทั้งหมด (Mark All as Read)
|
||||
*/
|
||||
async markAllAsRead(userId: number): Promise<void> {
|
||||
await this.notificationRepo.update(
|
||||
{ userId, isRead: false },
|
||||
@@ -216,10 +152,6 @@ export class NotificationService {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* ลบการแจ้งเตือนที่เก่าเกินกำหนด (ใช้กับ Cron Job Cleanup)
|
||||
* เก็บไว้ 90 วัน
|
||||
*/
|
||||
async cleanupOldNotifications(days: number = 90): Promise<number> {
|
||||
const dateLimit = new Date();
|
||||
dateLimit.setDate(dateLimit.getDate() - days);
|
||||
|
||||
Reference in New Issue
Block a user