260314:1714 20260314:1700 Refactor Migration #2
Build and Deploy / deploy (push) Successful in 4m18s
Build and Deploy / deploy (push) Successful in 4m18s
This commit is contained in:
@@ -0,0 +1,23 @@
|
||||
import { IsArray, ValidateNested, IsString, IsNotEmpty } from 'class-validator';
|
||||
import { Type } from 'class-transformer';
|
||||
import { ImportCorrespondenceDto } from './import-correspondence.dto';
|
||||
|
||||
export class CommitBatchItemDto {
|
||||
@IsNotEmpty()
|
||||
queueId!: number;
|
||||
|
||||
@ValidateNested()
|
||||
@Type(() => ImportCorrespondenceDto)
|
||||
dto!: ImportCorrespondenceDto;
|
||||
}
|
||||
|
||||
export class CommitBatchDto {
|
||||
@IsArray()
|
||||
@ValidateNested({ each: true })
|
||||
@Type(() => CommitBatchItemDto)
|
||||
items!: CommitBatchItemDto[];
|
||||
|
||||
@IsString()
|
||||
@IsNotEmpty()
|
||||
batchId!: string;
|
||||
}
|
||||
@@ -2,6 +2,7 @@ import { Controller, Post, Body, Headers, UseGuards, Get, Param, Query, Res, Par
|
||||
import { MigrationService } from './migration.service';
|
||||
import { ImportCorrespondenceDto } from './dto/import-correspondence.dto';
|
||||
import { EnqueueMigrationDto } from './dto/enqueue-migration.dto';
|
||||
import { CommitBatchDto } from './dto/commit-batch.dto';
|
||||
import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard';
|
||||
import { CurrentUser } from '../../common/decorators/current-user.decorator';
|
||||
import { ApiTags, ApiOperation, ApiBearerAuth, ApiHeader, ApiQuery, ApiParam } from '@nestjs/swagger';
|
||||
@@ -31,6 +32,23 @@ export class MigrationController {
|
||||
return this.migrationService.importCorrespondence(dto, idempotencyKey, userId);
|
||||
}
|
||||
|
||||
@Post('commit_batch')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
@ApiOperation({ summary: 'Batch approve and import migration review queue items' })
|
||||
@ApiHeader({
|
||||
name: 'Idempotency-Key',
|
||||
description: 'Unique key for the entire batch to prevent duplicate execution',
|
||||
required: true,
|
||||
})
|
||||
async commitBatch(
|
||||
@Body() dto: CommitBatchDto,
|
||||
@Headers('idempotency-key') idempotencyKey: string,
|
||||
@CurrentUser() user: any
|
||||
) {
|
||||
const userId = user?.id || user?.userId || 5;
|
||||
return this.migrationService.commitBatch(dto, idempotencyKey, userId);
|
||||
}
|
||||
|
||||
@Post('queue')
|
||||
@UseGuards(JwtAuthGuard)
|
||||
@ApiOperation({ summary: 'Enqueue a record into the staging migration review queue' })
|
||||
|
||||
@@ -9,6 +9,7 @@ import { InjectRepository } from '@nestjs/typeorm';
|
||||
import { Repository, DataSource } from 'typeorm';
|
||||
import { ImportCorrespondenceDto } from './dto/import-correspondence.dto';
|
||||
import { EnqueueMigrationDto } from './dto/enqueue-migration.dto';
|
||||
import { CommitBatchDto } from './dto/commit-batch.dto';
|
||||
import { ImportTransaction } from './entities/import-transaction.entity';
|
||||
import { Correspondence } from '../correspondence/entities/correspondence.entity';
|
||||
import { CorrespondenceRevision } from '../correspondence/entities/correspondence-revision.entity';
|
||||
@@ -484,11 +485,11 @@ export class MigrationService {
|
||||
async approveQueueItem(id: number, dto: ImportCorrespondenceDto, idempotencyKey: string, userId: number) {
|
||||
const queueItem = await this.reviewQueueRepo.findOne({ where: { id } });
|
||||
if (!queueItem) {
|
||||
throw new BadRequestException('Queue item not found');
|
||||
throw new BadRequestException(`Queue item ${id} not found`);
|
||||
}
|
||||
|
||||
if (queueItem.status !== MigrationReviewStatus.PENDING) {
|
||||
throw new BadRequestException(`Queue item is already ${queueItem.status}`);
|
||||
throw new BadRequestException(`Queue item ${id} is already ${queueItem.status}`);
|
||||
}
|
||||
|
||||
// Attempt the import
|
||||
@@ -503,6 +504,45 @@ export class MigrationService {
|
||||
return result;
|
||||
}
|
||||
|
||||
async commitBatch(dto: CommitBatchDto, idempotencyKey: string, userId: number) {
|
||||
if (!idempotencyKey) {
|
||||
throw new BadRequestException('Idempotency-Key header is required');
|
||||
}
|
||||
|
||||
const results = [];
|
||||
const errors = [];
|
||||
|
||||
// We let each import have its own transaction via approveQueueItem
|
||||
// to avoid one bad record failing the entire batch of valid ones.
|
||||
|
||||
for (const item of dto.items) {
|
||||
// Create a unique sub-key for each item to avoid idempotency conflicts
|
||||
// when using a batch idempotency key.
|
||||
const subKey = `${idempotencyKey}_${item.queueId}`;
|
||||
|
||||
// Force batchId on the item dto
|
||||
item.dto.batch_id = dto.batchId;
|
||||
|
||||
try {
|
||||
const result = await this.approveQueueItem(item.queueId, item.dto, subKey, userId);
|
||||
results.push({ queueId: item.queueId, result });
|
||||
} catch (err: unknown) {
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
errors.push({ queueId: item.queueId, error: errorMessage });
|
||||
this.logger.error(`Batch commit failed for queue ID ${item.queueId}: ${errorMessage}`);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
message: 'Batch processing completed',
|
||||
batchId: dto.batchId,
|
||||
processed: results.length,
|
||||
failed: errors.length,
|
||||
results,
|
||||
errors
|
||||
};
|
||||
}
|
||||
|
||||
async rejectQueueItem(id: number, userId: number) {
|
||||
const queueItem = await this.reviewQueueRepo.findOne({ where: { id } });
|
||||
if (!queueItem) {
|
||||
|
||||
Reference in New Issue
Block a user