Files
lcbp3/.agents/skills/nestjs-best-practices/rules/micro-use-queues.md
admin ef16817f38
All checks were successful
Build and Deploy / deploy (push) Successful in 4m44s
260223:1415 20260223 nextJS & nestJS Best pratices
2026-02-23 14:15:06 +07:00

253 lines
6.1 KiB
Markdown

---
title: Use Message Queues for Background Jobs
impact: MEDIUM-HIGH
impactDescription: Queues enable reliable background processing
tags: microservices, queues, bullmq, background-jobs
---
## Use Message Queues for Background Jobs
Use `@nestjs/bullmq` for background job processing. Queues decouple long-running tasks from HTTP requests, enable retry logic, and distribute workload across workers. Use them for emails, file processing, notifications, and any task that shouldn't block user requests.
**Incorrect (long-running tasks in HTTP handlers):**
```typescript
// Long-running tasks in HTTP handlers
@Controller('reports')
export class ReportsController {
@Post()
async generate(@Body() dto: GenerateReportDto): Promise<Report> {
// This blocks the request for potentially minutes
const data = await this.fetchLargeDataset(dto);
const report = await this.processData(data); // Slow!
await this.sendEmail(dto.email, report); // Can fail!
return report; // Client times out
}
}
// Fire-and-forget without retry
@Injectable()
export class EmailService {
async sendWelcome(email: string): Promise<void> {
// If this fails, email is never sent
await this.mailer.send({ to: email, template: 'welcome' });
// No retry, no tracking, no visibility
}
}
// Use setInterval for scheduled tasks
setInterval(async () => {
await cleanupOldRecords();
}, 60000); // No error handling, memory leaks
```
**Correct (use BullMQ for background processing):**
```typescript
// Configure BullMQ
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: 'localhost',
port: 6379,
},
defaultJobOptions: {
removeOnComplete: 1000,
removeOnFail: 5000,
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
}),
BullModule.registerQueue(
{ name: 'email' },
{ name: 'reports' },
{ name: 'notifications' },
),
],
})
export class QueueModule {}
// Producer: Add jobs to queue
@Injectable()
export class ReportsService {
constructor(
@InjectQueue('reports') private reportsQueue: Queue,
) {}
async requestReport(dto: GenerateReportDto): Promise<{ jobId: string }> {
// Return immediately, process in background
const job = await this.reportsQueue.add('generate', dto, {
priority: dto.urgent ? 1 : 10,
delay: dto.scheduledFor ? Date.parse(dto.scheduledFor) - Date.now() : 0,
});
return { jobId: job.id };
}
async getJobStatus(jobId: string): Promise<JobStatus> {
const job = await this.reportsQueue.getJob(jobId);
return {
status: await job.getState(),
progress: job.progress,
result: job.returnvalue,
};
}
}
// Consumer: Process jobs
@Processor('reports')
export class ReportsProcessor {
private readonly logger = new Logger(ReportsProcessor.name);
@Process('generate')
async generateReport(job: Job<GenerateReportDto>): Promise<Report> {
this.logger.log(`Processing report job ${job.id}`);
// Update progress
await job.updateProgress(10);
const data = await this.fetchData(job.data);
await job.updateProgress(50);
const report = await this.processData(data);
await job.updateProgress(90);
await this.saveReport(report);
await job.updateProgress(100);
return report;
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(`Processing job ${job.id}`);
}
@OnQueueCompleted()
onCompleted(job: Job, result: any) {
this.logger.log(`Job ${job.id} completed`);
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
this.logger.error(`Job ${job.id} failed: ${error.message}`);
}
}
// Email queue with retry
@Processor('email')
export class EmailProcessor {
@Process('send')
async sendEmail(job: Job<SendEmailDto>): Promise<void> {
const { to, template, data } = job.data;
try {
await this.mailer.send({
to,
template,
context: data,
});
} catch (error) {
// BullMQ will retry based on job options
throw error;
}
}
}
// Usage
@Injectable()
export class NotificationService {
constructor(@InjectQueue('email') private emailQueue: Queue) {}
async sendWelcome(user: User): Promise<void> {
await this.emailQueue.add(
'send',
{
to: user.email,
template: 'welcome',
data: { name: user.name },
},
{
attempts: 5,
backoff: { type: 'exponential', delay: 5000 },
},
);
}
}
// Scheduled jobs
@Injectable()
export class ScheduledJobsService implements OnModuleInit {
constructor(@InjectQueue('maintenance') private queue: Queue) {}
async onModuleInit(): Promise<void> {
// Clean up old reports daily at midnight
await this.queue.add(
'cleanup',
{},
{
repeat: { cron: '0 0 * * *' },
jobId: 'daily-cleanup', // Prevent duplicates
},
);
// Send digest every hour
await this.queue.add(
'digest',
{},
{
repeat: { every: 60 * 60 * 1000 },
jobId: 'hourly-digest',
},
);
}
}
@Processor('maintenance')
export class MaintenanceProcessor {
@Process('cleanup')
async cleanup(): Promise<void> {
await this.cleanupOldReports();
await this.cleanupExpiredSessions();
}
@Process('digest')
async sendDigest(): Promise<void> {
const users = await this.getUsersForDigest();
for (const user of users) {
await this.emailQueue.add('send', { to: user.email, template: 'digest' });
}
}
}
// Queue monitoring with Bull Board
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
@Module({
imports: [
BullBoardModule.forRoot({
route: '/admin/queues',
adapter: ExpressAdapter,
}),
BullBoardModule.forFeature({
name: 'email',
adapter: BullMQAdapter,
}),
BullBoardModule.forFeature({
name: 'reports',
adapter: BullMQAdapter,
}),
],
})
export class AdminModule {}
```
Reference: [NestJS Queues](https://docs.nestjs.com/techniques/queues)