253 lines
6.1 KiB
Markdown
253 lines
6.1 KiB
Markdown
---
|
|
title: Use Message Queues for Background Jobs
|
|
impact: MEDIUM-HIGH
|
|
impactDescription: Queues enable reliable background processing
|
|
tags: microservices, queues, bullmq, background-jobs
|
|
---
|
|
|
|
## Use Message Queues for Background Jobs
|
|
|
|
Use `@nestjs/bullmq` for background job processing. Queues decouple long-running tasks from HTTP requests, enable retry logic, and distribute workload across workers. Use them for emails, file processing, notifications, and any task that shouldn't block user requests.
|
|
|
|
**Incorrect (long-running tasks in HTTP handlers):**
|
|
|
|
```typescript
|
|
// Long-running tasks in HTTP handlers
|
|
@Controller('reports')
|
|
export class ReportsController {
|
|
@Post()
|
|
async generate(@Body() dto: GenerateReportDto): Promise<Report> {
|
|
// This blocks the request for potentially minutes
|
|
const data = await this.fetchLargeDataset(dto);
|
|
const report = await this.processData(data); // Slow!
|
|
await this.sendEmail(dto.email, report); // Can fail!
|
|
return report; // Client times out
|
|
}
|
|
}
|
|
|
|
// Fire-and-forget without retry
|
|
@Injectable()
|
|
export class EmailService {
|
|
async sendWelcome(email: string): Promise<void> {
|
|
// If this fails, email is never sent
|
|
await this.mailer.send({ to: email, template: 'welcome' });
|
|
// No retry, no tracking, no visibility
|
|
}
|
|
}
|
|
|
|
// Use setInterval for scheduled tasks
|
|
setInterval(async () => {
|
|
await cleanupOldRecords();
|
|
}, 60000); // No error handling, memory leaks
|
|
```
|
|
|
|
**Correct (use BullMQ for background processing):**
|
|
|
|
```typescript
|
|
// Configure BullMQ
|
|
import { BullModule } from '@nestjs/bullmq';
|
|
|
|
@Module({
|
|
imports: [
|
|
BullModule.forRoot({
|
|
connection: {
|
|
host: 'localhost',
|
|
port: 6379,
|
|
},
|
|
defaultJobOptions: {
|
|
removeOnComplete: 1000,
|
|
removeOnFail: 5000,
|
|
attempts: 3,
|
|
backoff: {
|
|
type: 'exponential',
|
|
delay: 1000,
|
|
},
|
|
},
|
|
}),
|
|
BullModule.registerQueue(
|
|
{ name: 'email' },
|
|
{ name: 'reports' },
|
|
{ name: 'notifications' },
|
|
),
|
|
],
|
|
})
|
|
export class QueueModule {}
|
|
|
|
// Producer: Add jobs to queue
|
|
@Injectable()
|
|
export class ReportsService {
|
|
constructor(
|
|
@InjectQueue('reports') private reportsQueue: Queue,
|
|
) {}
|
|
|
|
async requestReport(dto: GenerateReportDto): Promise<{ jobId: string }> {
|
|
// Return immediately, process in background
|
|
const job = await this.reportsQueue.add('generate', dto, {
|
|
priority: dto.urgent ? 1 : 10,
|
|
delay: dto.scheduledFor ? Date.parse(dto.scheduledFor) - Date.now() : 0,
|
|
});
|
|
|
|
return { jobId: job.id };
|
|
}
|
|
|
|
async getJobStatus(jobId: string): Promise<JobStatus> {
|
|
const job = await this.reportsQueue.getJob(jobId);
|
|
return {
|
|
status: await job.getState(),
|
|
progress: job.progress,
|
|
result: job.returnvalue,
|
|
};
|
|
}
|
|
}
|
|
|
|
// Consumer: Process jobs
|
|
@Processor('reports')
|
|
export class ReportsProcessor {
|
|
private readonly logger = new Logger(ReportsProcessor.name);
|
|
|
|
@Process('generate')
|
|
async generateReport(job: Job<GenerateReportDto>): Promise<Report> {
|
|
this.logger.log(`Processing report job ${job.id}`);
|
|
|
|
// Update progress
|
|
await job.updateProgress(10);
|
|
|
|
const data = await this.fetchData(job.data);
|
|
await job.updateProgress(50);
|
|
|
|
const report = await this.processData(data);
|
|
await job.updateProgress(90);
|
|
|
|
await this.saveReport(report);
|
|
await job.updateProgress(100);
|
|
|
|
return report;
|
|
}
|
|
|
|
@OnQueueActive()
|
|
onActive(job: Job) {
|
|
this.logger.log(`Processing job ${job.id}`);
|
|
}
|
|
|
|
@OnQueueCompleted()
|
|
onCompleted(job: Job, result: any) {
|
|
this.logger.log(`Job ${job.id} completed`);
|
|
}
|
|
|
|
@OnQueueFailed()
|
|
onFailed(job: Job, error: Error) {
|
|
this.logger.error(`Job ${job.id} failed: ${error.message}`);
|
|
}
|
|
}
|
|
|
|
// Email queue with retry
|
|
@Processor('email')
|
|
export class EmailProcessor {
|
|
@Process('send')
|
|
async sendEmail(job: Job<SendEmailDto>): Promise<void> {
|
|
const { to, template, data } = job.data;
|
|
|
|
try {
|
|
await this.mailer.send({
|
|
to,
|
|
template,
|
|
context: data,
|
|
});
|
|
} catch (error) {
|
|
// BullMQ will retry based on job options
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Usage
|
|
@Injectable()
|
|
export class NotificationService {
|
|
constructor(@InjectQueue('email') private emailQueue: Queue) {}
|
|
|
|
async sendWelcome(user: User): Promise<void> {
|
|
await this.emailQueue.add(
|
|
'send',
|
|
{
|
|
to: user.email,
|
|
template: 'welcome',
|
|
data: { name: user.name },
|
|
},
|
|
{
|
|
attempts: 5,
|
|
backoff: { type: 'exponential', delay: 5000 },
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
// Scheduled jobs
|
|
@Injectable()
|
|
export class ScheduledJobsService implements OnModuleInit {
|
|
constructor(@InjectQueue('maintenance') private queue: Queue) {}
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
// Clean up old reports daily at midnight
|
|
await this.queue.add(
|
|
'cleanup',
|
|
{},
|
|
{
|
|
repeat: { cron: '0 0 * * *' },
|
|
jobId: 'daily-cleanup', // Prevent duplicates
|
|
},
|
|
);
|
|
|
|
// Send digest every hour
|
|
await this.queue.add(
|
|
'digest',
|
|
{},
|
|
{
|
|
repeat: { every: 60 * 60 * 1000 },
|
|
jobId: 'hourly-digest',
|
|
},
|
|
);
|
|
}
|
|
}
|
|
|
|
@Processor('maintenance')
|
|
export class MaintenanceProcessor {
|
|
@Process('cleanup')
|
|
async cleanup(): Promise<void> {
|
|
await this.cleanupOldReports();
|
|
await this.cleanupExpiredSessions();
|
|
}
|
|
|
|
@Process('digest')
|
|
async sendDigest(): Promise<void> {
|
|
const users = await this.getUsersForDigest();
|
|
for (const user of users) {
|
|
await this.emailQueue.add('send', { to: user.email, template: 'digest' });
|
|
}
|
|
}
|
|
}
|
|
|
|
// Queue monitoring with Bull Board
|
|
import { BullBoardModule } from '@bull-board/nestjs';
|
|
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
|
|
|
|
@Module({
|
|
imports: [
|
|
BullBoardModule.forRoot({
|
|
route: '/admin/queues',
|
|
adapter: ExpressAdapter,
|
|
}),
|
|
BullBoardModule.forFeature({
|
|
name: 'email',
|
|
adapter: BullMQAdapter,
|
|
}),
|
|
BullBoardModule.forFeature({
|
|
name: 'reports',
|
|
adapter: BullMQAdapter,
|
|
}),
|
|
],
|
|
})
|
|
export class AdminModule {}
|
|
```
|
|
|
|
Reference: [NestJS Queues](https://docs.nestjs.com/techniques/queues)
|