--- title: Use Message Queues for Background Jobs impact: MEDIUM-HIGH impactDescription: Queues enable reliable background processing tags: microservices, queues, bullmq, background-jobs --- ## Use Message Queues for Background Jobs Use `@nestjs/bullmq` for background job processing. Queues decouple long-running tasks from HTTP requests, enable retry logic, and distribute workload across workers. Use them for emails, file processing, notifications, and any task that shouldn't block user requests. **Incorrect (long-running tasks in HTTP handlers):** ```typescript // Long-running tasks in HTTP handlers @Controller('reports') export class ReportsController { @Post() async generate(@Body() dto: GenerateReportDto): Promise { // This blocks the request for potentially minutes const data = await this.fetchLargeDataset(dto); const report = await this.processData(data); // Slow! await this.sendEmail(dto.email, report); // Can fail! return report; // Client times out } } // Fire-and-forget without retry @Injectable() export class EmailService { async sendWelcome(email: string): Promise { // If this fails, email is never sent await this.mailer.send({ to: email, template: 'welcome' }); // No retry, no tracking, no visibility } } // Use setInterval for scheduled tasks setInterval(async () => { await cleanupOldRecords(); }, 60000); // No error handling, memory leaks ``` **Correct (use BullMQ for background processing):** ```typescript // Configure BullMQ import { BullModule } from '@nestjs/bullmq'; @Module({ imports: [ BullModule.forRoot({ connection: { host: 'localhost', port: 6379, }, defaultJobOptions: { removeOnComplete: 1000, removeOnFail: 5000, attempts: 3, backoff: { type: 'exponential', delay: 1000, }, }, }), BullModule.registerQueue( { name: 'email' }, { name: 'reports' }, { name: 'notifications' }, ), ], }) export class QueueModule {} // Producer: Add jobs to queue @Injectable() export class ReportsService { constructor( @InjectQueue('reports') private reportsQueue: Queue, ) {} async requestReport(dto: GenerateReportDto): Promise<{ jobId: string }> { // Return immediately, process in background const job = await this.reportsQueue.add('generate', dto, { priority: dto.urgent ? 1 : 10, delay: dto.scheduledFor ? Date.parse(dto.scheduledFor) - Date.now() : 0, }); return { jobId: job.id }; } async getJobStatus(jobId: string): Promise { const job = await this.reportsQueue.getJob(jobId); return { status: await job.getState(), progress: job.progress, result: job.returnvalue, }; } } // Consumer: Process jobs @Processor('reports') export class ReportsProcessor { private readonly logger = new Logger(ReportsProcessor.name); @Process('generate') async generateReport(job: Job): Promise { this.logger.log(`Processing report job ${job.id}`); // Update progress await job.updateProgress(10); const data = await this.fetchData(job.data); await job.updateProgress(50); const report = await this.processData(data); await job.updateProgress(90); await this.saveReport(report); await job.updateProgress(100); return report; } @OnQueueActive() onActive(job: Job) { this.logger.log(`Processing job ${job.id}`); } @OnQueueCompleted() onCompleted(job: Job, result: any) { this.logger.log(`Job ${job.id} completed`); } @OnQueueFailed() onFailed(job: Job, error: Error) { this.logger.error(`Job ${job.id} failed: ${error.message}`); } } // Email queue with retry @Processor('email') export class EmailProcessor { @Process('send') async sendEmail(job: Job): Promise { const { to, template, data } = job.data; try { await this.mailer.send({ to, template, context: data, }); } catch (error) { // BullMQ will retry based on job options throw error; } } } // Usage @Injectable() export class NotificationService { constructor(@InjectQueue('email') private emailQueue: Queue) {} async sendWelcome(user: User): Promise { await this.emailQueue.add( 'send', { to: user.email, template: 'welcome', data: { name: user.name }, }, { attempts: 5, backoff: { type: 'exponential', delay: 5000 }, }, ); } } // Scheduled jobs @Injectable() export class ScheduledJobsService implements OnModuleInit { constructor(@InjectQueue('maintenance') private queue: Queue) {} async onModuleInit(): Promise { // Clean up old reports daily at midnight await this.queue.add( 'cleanup', {}, { repeat: { cron: '0 0 * * *' }, jobId: 'daily-cleanup', // Prevent duplicates }, ); // Send digest every hour await this.queue.add( 'digest', {}, { repeat: { every: 60 * 60 * 1000 }, jobId: 'hourly-digest', }, ); } } @Processor('maintenance') export class MaintenanceProcessor { @Process('cleanup') async cleanup(): Promise { await this.cleanupOldReports(); await this.cleanupExpiredSessions(); } @Process('digest') async sendDigest(): Promise { const users = await this.getUsersForDigest(); for (const user of users) { await this.emailQueue.add('send', { to: user.email, template: 'digest' }); } } } // Queue monitoring with Bull Board import { BullBoardModule } from '@bull-board/nestjs'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; @Module({ imports: [ BullBoardModule.forRoot({ route: '/admin/queues', adapter: ExpressAdapter, }), BullBoardModule.forFeature({ name: 'email', adapter: BullMQAdapter, }), BullBoardModule.forFeature({ name: 'reports', adapter: BullMQAdapter, }), ], }) export class AdminModule {} ``` Reference: [NestJS Queues](https://docs.nestjs.com/techniques/queues)