feat(migration): ADR-028 migration architecture refactor

- เพิ่ม POST /api/ai/jobs + GET /api/ai/jobs/:jobId endpoints (FR-001, FR-002)
- เพิ่ม BullMQ Worker MigrateDocumentWorker + OCR auto-detect (FR-003, FR-004)
- เพิ่ม cleanup-temp-files + expire-pending-reviews workers (FR-005, FR-005a/b)
- สร้าง SQL deltas: tags, correspondence_tags, alter migration_review_queue (FR-006, ADR-009)
- เพิ่ม MigrationReviewService.commitRecord() + SELECT FOR UPDATE (FR-007, FR-007a)
- เพิ่ม CASL permission migration.commit + MigrationReviewController (FR-007)
- สร้าง TagsModule + TagsService + TagsController (US3)
- สร้าง Migration Review Queue frontend page + ReviewQueueTable (US2)
- อัปเดต n8n guide: deterministic Idempotency-Key + token pre-flight (FR-001a, FR-010a/b)
- สร้าง spec.md, plan.md, tasks.md, data-model.md, contracts/, quickstart.md
- สร้าง ADR-028 document + validation-report.md (PASS 32/32 tasks, 173/173 tests)
This commit is contained in:
2026-05-22 17:10:07 +07:00
parent 990d80e16d
commit a2973be208
55 changed files with 4256 additions and 107 deletions
+39
View File
@@ -53,8 +53,10 @@ import { AiRagQueryDto } from './dto/ai-rag-query.dto';
import { ExtractDocumentDto } from './dto/extract-document.dto';
import { AiCallbackDto } from './dto/ai-callback.dto';
import { CreateAiJobDto } from './dto/create-ai-job.dto';
import { SubmitAiJobDto } from './dto/submit-ai-job.dto';
import { MigrationUpdateDto } from './dto/migration-update.dto';
import { MigrationQueryDto } from './dto/migration-query.dto';
import { ValidationException } from '../../common/exceptions';
import {
ApproveLegacyMigrationDto,
LegacyMigrationIngestDto,
@@ -171,6 +173,43 @@ export class AiController {
return this.aiService.getAiJobStatus(jobId);
}
@Post('jobs')
@UseGuards(JwtAuthGuard, AiEnabledGuard, RbacGuard)
@ApiBearerAuth()
@RequirePermission('ai.suggest')
@HttpCode(HttpStatus.ACCEPTED)
@ApiOperation({
summary: 'Submit AI migration job — ส่งงานย้ายเอกสารให้ AI ประมวลผล',
description:
'รับ tempAttachmentId/documentNumber แล้วส่งงานย้ายเอกสารเข้า BullMQ เพื่อรอการประมวลผล',
})
@ApiHeader({
name: 'Idempotency-Key',
description: 'Unique key เพื่อป้องกัน duplicate AI job',
required: true,
})
async submitMigrationJob(
@Body() dto: SubmitAiJobDto,
@Headers('idempotency-key') idempotencyKey: string
) {
if (!idempotencyKey) {
throw new ValidationException('Idempotency-Key header is required');
}
return this.aiService.submitMigrationJob(dto, idempotencyKey);
}
@Get('jobs/:jobId')
@UseGuards(JwtAuthGuard, RbacGuard)
@ApiBearerAuth()
@RequirePermission('ai.suggest')
@ApiOperation({
summary: 'AI Job Status polling by jobId',
})
@ApiParam({ name: 'jobId', description: 'BullMQ job id' })
async getAiJobStatusById(@Param('jobId') jobId: string) {
return this.aiService.getAiJobStatus(jobId);
}
@Post('extract')
@UseGuards(JwtAuthGuard, AiEnabledGuard, RbacGuard)
@ApiBearerAuth()
+9
View File
@@ -5,6 +5,7 @@
// - 2026-05-19: เพิ่ม IntentClassifierModule (ADR-024 Intent Classification).
// - 2026-05-19: เพิ่ม AiToolModule (ADR-025 AI Tool Layer).
// - 2026-05-21: ลงทะเบียน SystemSetting, AiSettingsService และ AiEnabledGuard สำหรับ ADR-027.
// - 2026-05-22: นำเข้าและลงทะเบียน CleanupTempFilesWorker (T016) เพื่อลบไฟล์แนบชั่วคราวหมดอายุ
// Module สำหรับ AI Gateway — ลงทะเบียน Services และ Controllers (ADR-023)
import { Logger, Module, OnModuleInit } from '@nestjs/common';
@@ -36,7 +37,10 @@ import { SystemSetting } from './entities/system-setting.entity';
import { AiEnabledGuard } from './guards/ai-enabled.guard';
import { UserModule } from '../user/user.module';
import { MigrationModule } from '../migration/migration.module';
import { TagsModule } from '../tags/tags.module';
import { FileStorageModule } from '../../common/file-storage/file-storage.module';
import { ImportTransaction } from '../migration/entities/import-transaction.entity';
import { MigrationReviewQueue } from '../migration/entities/migration-review-queue.entity';
import { AuditLogModule } from '../audit-log/audit-log.module';
import { AuditLog } from '../../common/entities/audit-log.entity';
import { Attachment } from '../../common/file-storage/entities/attachment.entity';
@@ -46,6 +50,7 @@ import { CorrespondenceType } from '../correspondence/entities/correspondence-ty
import { RbacGuard } from '../../common/guards/rbac.guard';
import { IntentClassifierModule } from './intent-classifier/intent-classifier.module';
import { AiToolModule } from './tool/ai-tool.module';
import { CleanupTempFilesWorker } from './workers/cleanup-temp-files.worker';
import {
QUEUE_AI_BATCH,
QUEUE_AI_INGEST,
@@ -67,6 +72,8 @@ import {
Project,
Organization,
CorrespondenceType,
ImportTransaction,
MigrationReviewQueue,
]),
BullModule.registerQueue(
@@ -108,6 +115,7 @@ import {
// UserModule สำหรับ RbacGuard (ต้องการ UserService)
UserModule,
MigrationModule,
TagsModule,
FileStorageModule,
AuditLogModule,
@@ -137,6 +145,7 @@ import {
// RbacGuard ต้องการ UserService จาก UserModule
RbacGuard,
AiEnabledGuard,
CleanupTempFilesWorker,
],
exports: [
AiService,
+5 -4
View File
@@ -25,6 +25,7 @@ import {
} from '../common/constants/queue.constants';
import { OllamaService } from './services/ollama.service';
import { AiQdrantService } from './qdrant.service';
import { ImportTransaction } from '../migration/entities/import-transaction.entity';
const DEFAULT_REDIS_TOKEN = 'default_IORedisModuleConnectionToken';
@@ -117,8 +118,6 @@ describe('AiService', () => {
beforeEach(async () => {
jest.clearAllMocks();
// ตั้งค่า default return values
mockMigrationLogRepo.create.mockReturnValue({
publicId: '019505a1-7c3e-7000-8000-abc123def456',
sourceFile: 'test-file-uuid',
@@ -131,7 +130,6 @@ describe('AiService', () => {
mockAuditLogRepo.save.mockResolvedValue({});
mockMainAuditLogRepo.create.mockReturnValue({});
mockMainAuditLogRepo.save.mockResolvedValue({});
const module: TestingModule = await Test.createTestingModule({
providers: [
AiService,
@@ -144,6 +142,10 @@ describe('AiService', () => {
provide: getRepositoryToken(AuditLog),
useValue: mockMainAuditLogRepo,
},
{
provide: getRepositoryToken(ImportTransaction),
useValue: { findOne: jest.fn(), create: jest.fn(), save: jest.fn() },
},
{ provide: getQueueToken(QUEUE_AI_REALTIME), useValue: mockQueue },
{ provide: getQueueToken(QUEUE_AI_BATCH), useValue: mockQueue },
{ provide: ConfigService, useValue: mockConfigService },
@@ -154,7 +156,6 @@ describe('AiService', () => {
{ provide: DEFAULT_REDIS_TOKEN, useValue: mockRedis },
],
}).compile();
service = module.get<AiService>(AiService);
});
+71
View File
@@ -19,6 +19,7 @@ import {
ValidationException,
SystemException,
BusinessException,
ConflictException,
} from '../../common/exceptions';
import {
MigrationLog,
@@ -32,6 +33,9 @@ import { MigrationUpdateDto } from './dto/migration-update.dto';
import { MigrationQueryDto } from './dto/migration-query.dto';
import { AiValidationService } from './ai-validation.service';
import { CreateAiJobDto } from './dto/create-ai-job.dto';
import { SubmitAiJobDto } from './dto/submit-ai-job.dto';
import { ImportTransaction } from '../migration/entities/import-transaction.entity';
import { Project } from '../project/entities/project.entity';
import {
QUEUE_AI_BATCH,
QUEUE_AI_REALTIME,
@@ -159,6 +163,8 @@ export class AiService {
private readonly aiAuditLogRepo: Repository<AiAuditLog>,
@InjectRepository(AuditLog)
private readonly auditLogRepo: Repository<AuditLog>,
@InjectRepository(ImportTransaction)
private readonly importTransactionRepo: Repository<ImportTransaction>,
@Optional()
@InjectQueue(QUEUE_AI_REALTIME)
private readonly aiRealtimeQueue?: Queue<AiRealtimeJobData>,
@@ -254,6 +260,71 @@ export class AiService {
}
}
/** ส่งคำขอเปิดงานประมวลผลการย้ายเอกสารของ AI (migrate-document) เข้า BullMQ */
async submitMigrationJob(
dto: SubmitAiJobDto,
idempotencyKey: string
): Promise<AiQueueResult> {
if (!this.aiBatchQueue) {
const error = new Error('AI batch queue is not registered');
this.logger.error('AI job queue failed', {
documentPublicId: dto.payload.tempAttachmentId,
error,
});
return { success: false, error };
}
const existingTx = await this.importTransactionRepo.findOne({
where: {
documentNumber: dto.payload.documentNumber,
batchId: dto.payload.batchId,
},
});
if (existingTx && existingTx.statusCode !== 500) {
throw new ConflictException(
'MIGRATION_DUPLICATE_TRANSACTION',
`Document ${dto.payload.documentNumber} already imported in batch ${dto.payload.batchId}`,
'เอกสารนี้ได้รับการนำเข้าในระบบ Staging/Production แล้ว'
);
}
const activeJob = await this.aiBatchQueue.getJob(idempotencyKey);
if (activeJob) {
return { success: true, jobId: String(activeJob.id) };
}
const defaultProject = await this.importTransactionRepo.manager.findOne(
Project,
{ where: {} }
);
const projectPublicId =
defaultProject?.publicId ?? '00000000-0000-0000-0000-000000000000';
try {
const job = await this.aiBatchQueue.add(
'migrate-document',
{
jobType: 'migrate-document',
documentPublicId: dto.payload.tempAttachmentId,
projectPublicId,
payload: {
documentNumber: dto.payload.documentNumber,
title: dto.payload.title,
batchId: dto.payload.batchId,
existingTags: dto.payload.existingTags,
systemCategories: dto.payload.systemCategories,
},
idempotencyKey,
},
{ jobId: idempotencyKey }
);
return { success: true, jobId: String(job.id) };
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err));
this.logger.error('AI job queue failed', {
documentPublicId: dto.payload.tempAttachmentId,
error,
});
return { success: false, error };
}
}
/** อ่านสถานะ job จาก ai-realtime หรือ ai-batch เพื่อให้ frontend polling ได้ */
async getAiJobStatus(jobId: string): Promise<AiJobStatusResult> {
const realtimeJob = await this.aiRealtimeQueue?.getJob(jobId);
@@ -0,0 +1,89 @@
// File: src/modules/ai/dto/ai-job-result.dto.ts
// Change Log:
// - 2026-05-22: สร้าง AiJobResultDto สำหรับจัดรูปแบบและตรวจสอบผลลัพธ์ของงาน AI (ADR-028)
import { ApiProperty } from '@nestjs/swagger';
import {
IsBoolean,
IsNumber,
IsString,
IsArray,
ValidateNested,
IsOptional,
} from 'class-validator';
import { Type } from 'class-transformer';
/**
* แท็กที่ AI แนะนำจากการวิเคราะห์เอกสาร
*/
export class SuggestedTagDto {
@ApiProperty({ description: 'ชื่อแท็กที่แนะนำ' })
@IsString()
name!: string;
@ApiProperty({ description: 'คำอธิบายเกี่ยวกับแท็ก' })
@IsString()
@IsOptional()
description?: string;
@ApiProperty({ description: 'ระบุว่าเป็นแท็กใหม่ในระบบหรือไม่' })
@IsBoolean()
isNew!: boolean;
@ApiProperty({ description: 'ระดับความมั่นใจของ AI ต่อแท็กนี้ (0.01.0)' })
@IsNumber()
confidence!: number;
}
/**
* ผลลัพธ์จากการวิเคราะห์เอกสารของ AI สำหรับการย้ายระบบ
*/
export class AiJobResultDto {
@ApiProperty({ description: 'เอกสารมีความถูกต้องและสมบูรณ์หรือไม่' })
@IsBoolean()
isValid!: boolean;
@ApiProperty({
description: 'ระดับความมั่นใจเฉลี่ยโดยรวมของเอกสาร (0.0–1.0)',
})
@IsNumber()
confidence!: number;
@ApiProperty({ description: 'หมวดหมู่ของเอกสารโต้ตอบที่แนะนำ' })
@IsString()
category!: string;
@ApiProperty({ description: 'บทสรุปโดยย่อของเอกสาร' })
@IsString()
summary!: string;
@ApiProperty({
type: [SuggestedTagDto],
description: 'รายการแท็กที่ AI แนะนำ',
})
@IsArray()
@ValidateNested({ each: true })
@Type(() => SuggestedTagDto)
suggestedTags!: SuggestedTagDto[];
@ApiProperty({
type: [String],
description: 'รายการจุดผิดพลาดหรือข้อควรระวังที่พบในเอกสาร',
})
@IsArray()
@IsString({ each: true })
detectedIssues!: string[];
@ApiProperty({
enum: ['fast-path', 'slow-path'],
description: 'วิธีการสกัดข้อความจากเอกสาร',
})
@IsString()
ocrMethod!: 'fast-path' | 'slow-path';
@ApiProperty({
description: 'ระยะเวลาที่ใช้ในการสกัดข้อมูลและวิเคราะห์ (ms)',
})
@IsNumber()
processingTimeMs!: number;
}
@@ -0,0 +1,96 @@
// File: src/modules/ai/dto/submit-ai-job.dto.ts
// Change Log:
// - 2026-05-22: สร้าง SubmitAiJobDto สำหรับรับงานประมวลผลการย้ายเอกสารของ AI (ADR-028)
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import {
IsUUID,
IsString,
IsNotEmpty,
IsOptional,
IsArray,
ValidateNested,
IsObject,
IsIn,
} from 'class-validator';
import { Type } from 'class-transformer';
/**
* ตัวเลือกแท็กประกอบการวิเคราะห์ของ AI
*/
export class TagOptionDto {
@ApiPropertyOptional({ description: 'UUID ของแท็กที่มีอยู่แล้วในโครงการ' })
@IsUUID()
@IsOptional()
publicId?: string;
@ApiProperty({ description: 'ชื่อแท็ก' })
@IsString()
@IsNotEmpty()
tagName!: string;
@ApiPropertyOptional({ description: 'รหัสสีของแท็ก' })
@IsString()
@IsOptional()
colorCode?: string;
}
/**
* Payload ข้อมูลเอกสารเก่าสำหรับการทำ Migration
*/
export class MigrateDocumentPayloadDto {
@ApiProperty({ description: 'UUID ของ temp attachment ในระบบ' })
@IsUUID()
tempAttachmentId!: string;
@ApiProperty({ description: 'เลขที่เอกสารเก่า' })
@IsString()
@IsNotEmpty()
documentNumber!: string;
@ApiProperty({ description: 'ชื่อเรื่องเอกสาร' })
@IsString()
@IsNotEmpty()
title!: string;
@ApiProperty({
type: [TagOptionDto],
description: 'รายการแท็กโครงการที่มีอยู่ก่อนแล้ว',
})
@IsArray()
@IsOptional()
@ValidateNested({ each: true })
@Type(() => TagOptionDto)
existingTags?: TagOptionDto[];
@ApiProperty({ type: [String], description: 'หมวดหมู่เอกสารหลักที่มีในระบบ' })
@IsArray()
@IsString({ each: true })
@IsOptional()
systemCategories?: string[];
@ApiProperty({ description: 'รหัสกลุ่มการนำเข้า (Batch ID)' })
@IsString()
@IsNotEmpty()
batchId!: string;
}
/**
* DTO สำหรับส่งคำขอเปิดงานประมวลผล AI (AI processing job submission)
*/
export class SubmitAiJobDto {
@ApiProperty({
example: 'migrate-document',
description: 'ชนิดงานประมวลผล AI',
})
@IsString()
@IsNotEmpty()
@IsIn(['migrate-document'])
type!: string;
@ApiProperty({ type: MigrateDocumentPayloadDto })
@IsObject()
@ValidateNested()
@Type(() => MigrateDocumentPayloadDto)
payload!: MigrateDocumentPayloadDto;
}
@@ -3,6 +3,7 @@
// - 2026-05-21: สร้าง Unit Test สำหรับ AiBatchProcessor ครอบคลุม embed-document และ sandbox-rag (T032).
// - 2026-05-21: เพิ่มการทดสอบ sandbox-extract พร้อม mock OcrService, OllamaService และ Redis (T039).
// - 2026-05-21: แก้ไข ESLint unexpected any และ unsafe member access โดยกำหนด type ให้ redis เป็น Record<string, jest.Mock>
// - 2026-05-22: เพิ่ม Mock dependencies (ProjectRepository, AiAuditLogRepository, TagsService, MigrationService) เพื่อแก้ปัญหา Nest resolve dependency ใน unit test และปรับโครงสร้างฟังก์ชันไม่มีบรรทัดว่าง (Zero Blank Lines) ตามกฎเหล็ก
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
@@ -14,6 +15,10 @@ import { AiRagService } from '../ai-rag.service';
import { Attachment } from '../../../common/file-storage/entities/attachment.entity';
import { OcrService } from '../services/ocr.service';
import { OllamaService } from '../services/ollama.service';
import { Project } from '../../project/entities/project.entity';
import { AiAuditLog } from '../entities/ai-audit-log.entity';
import { TagsService } from '../../tags/tags.service';
import { MigrationService } from '../../migration/migration.service';
describe('AiBatchProcessor', () => {
let processor: AiBatchProcessor;
@@ -38,13 +43,17 @@ describe('AiBatchProcessor', () => {
.mockResolvedValue({ text: 'OCR text LCBP3-CIV-001 Civil' }),
};
const mockOllamaService = {
getMainModelName: jest.fn().mockReturnValue('gemma4:e4b'),
generate: jest.fn().mockResolvedValue(
JSON.stringify({
documentNumber: 'LCBP3-CIV-001',
subject: 'Foundation Inspection Report',
discipline: 'Civil',
category: 'Correspondence',
date: '2026-05-20',
confidence: 0.95,
tags: ['foundation'],
summary: 'summary text',
})
),
};
@@ -52,8 +61,35 @@ describe('AiBatchProcessor', () => {
setex: jest.fn().mockResolvedValue('OK'),
};
const mockAttachmentRepo = {
findOne: jest.fn().mockResolvedValue({
id: 1,
publicId: 'doc-uuid-123',
filePath: '/files/test.pdf',
uploadedByUserId: 10,
}),
update: jest.fn().mockResolvedValue({ affected: 1 }),
};
const mockProjectRepo = {
findOne: jest.fn().mockResolvedValue({
id: 2,
publicId: 'proj-uuid-456',
}),
};
const mockAiAuditLogRepo = {
create: jest.fn().mockReturnValue({}),
save: jest.fn().mockResolvedValue({}),
};
const mockTagsService = {
findOrCreateTags: jest
.fn()
.mockResolvedValue([
{ id: 5, publicId: 'tag-uuid-999', tagName: 'foundation' },
]),
};
const mockMigrationService = {
createError: jest.fn().mockResolvedValue(undefined),
enqueueRecord: jest.fn().mockResolvedValue(undefined),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
@@ -67,6 +103,16 @@ describe('AiBatchProcessor', () => {
provide: getRepositoryToken(Attachment),
useValue: mockAttachmentRepo,
},
{
provide: getRepositoryToken(Project),
useValue: mockProjectRepo,
},
{
provide: getRepositoryToken(AiAuditLog),
useValue: mockAiAuditLogRepo,
},
{ provide: TagsService, useValue: mockTagsService },
{ provide: MigrationService, useValue: mockMigrationService },
],
}).compile();
processor = module.get<AiBatchProcessor>(AiBatchProcessor);
@@ -148,4 +194,42 @@ describe('AiBatchProcessor', () => {
expect.stringContaining('completed')
);
});
it('ควรประมวลผล migrate-document โดยจำลอง OCR, AI และเรียก migrationService.enqueueRecord', async () => {
const job = {
id: 'job-migrate',
data: {
jobType: 'migrate-document',
documentPublicId: 'doc-uuid-123',
projectPublicId: 'proj-uuid-456',
payload: {
documentNumber: 'LEGACY-001',
title: 'Legacy Title',
senderOrgId: 1,
receiverOrgId: 2,
},
idempotencyKey: 'idem-migrate-123',
batchId: 'batch-999',
},
} as unknown as Job<AiBatchJobData>;
await processor.process(job);
expect(attachmentRepo.findOne).toHaveBeenCalledWith({
where: { publicId: 'doc-uuid-123' },
});
expect(ocrService.detectAndExtract).toHaveBeenCalledWith({
pdfPath: '/files/test.pdf',
});
expect(ollamaService.generate).toHaveBeenCalledTimes(1);
expect(mockTagsService.findOrCreateTags).toHaveBeenCalledTimes(1);
expect(mockMigrationService.enqueueRecord).toHaveBeenCalledWith(
expect.objectContaining({
documentNumber: 'LCBP3-CIV-001',
subject: 'Foundation Inspection Report',
category: 'Correspondence',
isValid: true,
confidence: 0.95,
})
);
expect(mockAiAuditLogRepo.create).toHaveBeenCalledTimes(1);
expect(mockAiAuditLogRepo.save).toHaveBeenCalledTimes(1);
});
});
@@ -5,6 +5,7 @@
// - 2026-05-21: เพิ่มการรองรับ sandbox-rag และ sandbox-extract สำหรับ Superadmin sandbox.
// - 2026-05-21: พัฒนาระบบประมวลผล sandbox-extract พร้อมเชื่อมต่อ OcrService, OllamaService และ Redis cache
// - 2026-05-21: แก้ไข ESLint unused variable สำหรับ parseError ใน catch block
// - 2026-05-22: แก้ไข type compilation error ใน processMigrateDocument และนำช่องว่างภายในฟังก์ชันออก
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
@@ -19,13 +20,29 @@ import { EmbeddingService } from '../services/embedding.service';
import { AiRagService } from '../ai-rag.service';
import { OcrService } from '../services/ocr.service';
import { OllamaService } from '../services/ollama.service';
import { Project } from '../../project/entities/project.entity';
import { AiAuditLog, AiAuditStatus } from '../entities/ai-audit-log.entity';
import { TagsService } from '../../tags/tags.service';
import { MigrationService } from '../../migration/migration.service';
import { MigrationErrorType } from '../../migration/entities/migration-error.entity';
interface MigrateDocumentMetadata extends Record<string, unknown> {
documentNumber?: string;
subject?: string;
category?: string;
date?: string;
confidence?: number;
tags?: string[];
summary?: string;
}
export type AiBatchJobType =
| 'ocr'
| 'extract-metadata'
| 'embed-document'
| 'sandbox-rag'
| 'sandbox-extract';
| 'sandbox-extract'
| 'migrate-document';
export interface AiBatchJobData {
jobType: AiBatchJobType;
@@ -36,6 +53,41 @@ export interface AiBatchJobData {
idempotencyKey: string;
}
const readString = (value: unknown): string | undefined =>
typeof value === 'string' && value.trim().length > 0 ? value : undefined;
const readNumberId = (value: unknown): number | undefined =>
typeof value === 'number'
? value
: typeof value === 'string' && value.trim().length > 0
? Number(value)
: undefined;
const toStringList = (value: unknown): string[] =>
Array.isArray(value)
? value.filter((item): item is string => typeof item === 'string')
: [];
const parseMigrateDocumentMetadata = (
cleanedResponse: string
): MigrateDocumentMetadata => {
const parsed: unknown = JSON.parse(cleanedResponse);
if (!parsed || typeof parsed !== 'object') {
return {};
}
const source = parsed as Record<string, unknown>;
return {
documentNumber: readString(source.documentNumber),
subject: readString(source.subject),
category: readString(source.category),
date: readString(source.date),
confidence:
typeof source.confidence === 'number' ? source.confidence : undefined,
tags: toStringList(source.tags),
summary: readString(source.summary),
};
};
/** Processor สำหรับงาน AI batch ที่รันทีละงานเพื่อคุม VRAM */
@Processor(QUEUE_AI_BATCH, { concurrency: 1 })
export class AiBatchProcessor extends WorkerHost {
@@ -45,10 +97,16 @@ export class AiBatchProcessor extends WorkerHost {
constructor(
@InjectRepository(Attachment)
private readonly attachmentRepo: Repository<Attachment>,
@InjectRepository(Project)
private readonly projectRepo: Repository<Project>,
@InjectRepository(AiAuditLog)
private readonly aiAuditLogRepo: Repository<AiAuditLog>,
private readonly embeddingService: EmbeddingService,
private readonly ragService: AiRagService,
private readonly ocrService: OcrService,
private readonly ollamaService: OllamaService,
private readonly tagsService: TagsService,
private readonly migrationService: MigrationService,
@InjectRedis() private readonly redis: Redis
) {
super();
@@ -97,6 +155,15 @@ export class AiBatchProcessor extends WorkerHost {
);
await this.processSandboxExtract(job.data);
return;
case 'migrate-document':
this.logger.log(
`Migrate document job processing — jobId=${String(job.id)}`
);
await this.processMigrateDocument(job);
if (!isSandbox) {
await this.setAiProcessingStatus(job.data.documentPublicId, 'DONE');
}
return;
default: {
const unreachable: never = job.data.jobType;
throw new Error(
@@ -248,4 +315,193 @@ Return ONLY a valid JSON object matching this schema. Do NOT include markdown co
throw err;
}
}
private async processMigrateDocument(
job: Job<AiBatchJobData>
): Promise<void> {
const startTime = Date.now();
const { documentPublicId, projectPublicId, payload, batchId } = job.data;
const docNumber = payload.documentNumber as string;
const attachment = await this.attachmentRepo.findOne({
where: { publicId: documentPublicId },
});
if (!attachment) {
throw new Error(`ไม่พบ attachment สำหรับ publicId: ${documentPublicId}`);
}
const project = await this.projectRepo.findOne({
where: { publicId: projectPublicId },
});
if (!project) {
throw new Error(`ไม่พบโครงการสำหรับ publicId: ${projectPublicId}`);
}
let ocrResult;
try {
ocrResult = await this.ocrService.detectAndExtract({
pdfPath: attachment.filePath,
});
} catch (err: unknown) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error(`OCR สกัดข้อมูลล้มเหลว: ${errMsg}`);
await this.migrationService.createError({
batchId: batchId || 'unknown',
documentNumber: docNumber,
errorType: MigrationErrorType.FILE_ERROR,
errorMessage: errMsg,
});
await this.saveAiAuditLog({
documentPublicId,
aiModel: 'ocr-engine',
status: AiAuditStatus.FAILED,
errorMessage: errMsg,
processingTimeMs: Date.now() - startTime,
});
throw err;
}
const prompt = `You are a professional document intelligence engine.
Analyze the following OCR text extracted from a legacy project document and extract the metadata fields.
OCR TEXT:
${ocrResult.text}
Extract these fields:
1. documentNumber: The official document number or code. If not found, return null.
2. subject: The main subject, title, or topic of the document. If not found, return null.
3. discipline: Must be exactly one of: "Civil", "Mechanical", "Electrical", "Architectural", or null if not specified.
4. category: Must be exactly one of: "Correspondence", "Transmittal", "Circulation", "RFA", "Shop Drawing", "Contract Drawing", or null if not specified.
5. date: The issue/document date in YYYY-MM-DD format. If not found, return null.
6. confidence: A float between 0.0 and 1.0 indicating your confidence in this extraction.
7. tags: An array of tags/keywords (strings) that describe the document.
8. summary: A short 1-2 sentence summary of the document contents.
Return ONLY a valid JSON object matching this schema. Do NOT include markdown code blocks, HTML, or any conversational text. Example:
{
"documentNumber": "LCBP3-CIV-001",
"subject": "Foundation Inspection Report",
"discipline": "Civil",
"category": "Correspondence",
"date": "2026-05-20",
"confidence": 0.95,
"tags": ["foundation", "inspection", "concrete"],
"summary": "This document is a foundation inspection report for the LCBP3 project, confirming concrete strength."
}`;
let aiResponse: string;
try {
aiResponse = await this.ollamaService.generate(prompt);
} catch (err: unknown) {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error(`การวิเคราะห์ของ AI ล้มเหลว: ${errMsg}`);
await this.migrationService.createError({
batchId: batchId || 'unknown',
documentNumber: docNumber,
errorType: MigrationErrorType.API_ERROR,
errorMessage: errMsg,
});
await this.saveAiAuditLog({
documentPublicId,
aiModel: this.ollamaService.getMainModelName(),
status: AiAuditStatus.FAILED,
errorMessage: errMsg,
processingTimeMs: Date.now() - startTime,
});
throw err;
}
const cleanedResponse = aiResponse
.replace(/```json/g, '')
.replace(/```/g, '')
.trim();
let extractedMetadata: MigrateDocumentMetadata;
try {
extractedMetadata = parseMigrateDocumentMetadata(cleanedResponse);
} catch (_err: unknown) {
const errMsg = `ไม่สามารถแปลงผลลัพธ์ของ AI เป็น JSON ได้: ${cleanedResponse}`;
this.logger.error(errMsg);
await this.migrationService.createError({
batchId: batchId || 'unknown',
documentNumber: docNumber,
errorType: MigrationErrorType.AI_PARSE_ERROR,
errorMessage: errMsg,
rawAiResponse: aiResponse,
});
await this.saveAiAuditLog({
documentPublicId,
aiModel: this.ollamaService.getMainModelName(),
status: AiAuditStatus.FAILED,
errorMessage: errMsg,
processingTimeMs: Date.now() - startTime,
});
throw new Error(errMsg);
}
let mappedTags: Record<string, string>[] = [];
if (extractedMetadata.tags && extractedMetadata.tags.length > 0) {
const tags = await this.tagsService.findOrCreateTags(
project.id,
extractedMetadata.tags,
attachment.uploadedByUserId
);
mappedTags = tags.map((t) => ({
publicId: t.publicId,
tagName: t.tagName,
}));
}
const confidence =
typeof extractedMetadata.confidence === 'number'
? extractedMetadata.confidence
: 0.5;
const isValid = confidence >= 0.6 && !!extractedMetadata.documentNumber;
const payloadTitle = readString(payload.title);
await this.migrationService.enqueueRecord({
documentNumber: extractedMetadata.documentNumber || docNumber,
subject: extractedMetadata.subject || payloadTitle,
originalSubject: payloadTitle,
body: extractedMetadata.summary || '',
category: extractedMetadata.category || 'Correspondence',
aiSummary: extractedMetadata.summary || '',
projectId: project.id,
senderOrgId: readNumberId(payload.senderOrgId),
receiverOrgId: readNumberId(payload.receiverOrgId),
issuedDate: extractedMetadata.date || undefined,
receivedDate: extractedMetadata.date || undefined,
extractedTags: mappedTags,
tempAttachmentId: attachment.id,
isValid,
confidence,
aiJobId: String(job.id),
});
await this.saveAiAuditLog({
documentPublicId,
aiModel: this.ollamaService.getMainModelName(),
status: AiAuditStatus.SUCCESS,
aiSuggestionJson: extractedMetadata,
confidenceScore: confidence,
processingTimeMs: Date.now() - startTime,
});
this.logger.log(
`ประมวลผลเอกสาร ${docNumber} สำเร็จและถูกส่งเข้า Staging Queue แล้ว`
);
}
private async saveAiAuditLog(data: {
documentPublicId: string;
aiModel: string;
status: AiAuditStatus;
aiSuggestionJson?: Record<string, unknown>;
confidenceScore?: number;
processingTimeMs?: number;
errorMessage?: string;
}): Promise<void> {
try {
const log = this.aiAuditLogRepo.create({
documentPublicId: data.documentPublicId,
aiModel: data.aiModel,
modelName: data.aiModel,
status: data.status,
aiSuggestionJson: data.aiSuggestionJson,
confidenceScore: data.confidenceScore,
processingTimeMs: data.processingTimeMs,
errorMessage: data.errorMessage,
});
await this.aiAuditLogRepo.save(log);
} catch (err: unknown) {
this.logger.error(
`บันทึก ai_audit_logs ล้มเหลว: ${err instanceof Error ? err.message : String(err)}`
);
}
}
}
@@ -0,0 +1,88 @@
// File: src/modules/ai/workers/cleanup-temp-files.worker.ts
// Change Log:
// - 2026-05-22: อัปเดตและสร้างตัวล้างไฟล์ชั่วคราว (T016) เพื่อลบไฟล์ที่หมดอายุ 24 ชม.
import { Injectable, Logger } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import * as fs from 'fs-extra';
import { Attachment } from '../../../common/file-storage/entities/attachment.entity';
import {
MigrationReviewQueue,
MigrationReviewStatus,
} from '../../migration/entities/migration-review-queue.entity';
@Injectable()
export class CleanupTempFilesWorker {
private readonly logger = new Logger(CleanupTempFilesWorker.name);
constructor(
@InjectRepository(Attachment)
private readonly attachmentRepository: Repository<Attachment>,
@InjectRepository(MigrationReviewQueue)
private readonly reviewQueueRepository: Repository<MigrationReviewQueue>
) {}
/**
* รันทุกชั่วโมงเพื่อลบไฟล์แนบชั่วคราวที่ครบ 24 ชั่วโมงและไม่ได้ถูกคอมมิต
* ยกเว้นไฟล์ที่ถูกอ้างอิงโดยรายการที่สถานะเป็น PENDING ใน Migration Review Queue
*/
@Cron(CronExpression.EVERY_HOUR)
async handleCleanup(): Promise<void> {
this.logger.log('Starting temporary files cleanup worker...');
try {
const oneDayAgo = new Date();
oneDayAgo.setHours(oneDayAgo.getHours() - 24);
const pendingRecords = await this.reviewQueueRepository.find({
select: ['tempAttachmentId'],
where: { status: MigrationReviewStatus.PENDING },
});
const pendingAttachmentIds = pendingRecords
.map((r) => r.tempAttachmentId)
.filter((id): id is number => id !== undefined && id !== null);
const query = this.attachmentRepository
.createQueryBuilder('attachment')
.where('attachment.isTemporary = :isTemporary', { isTemporary: true })
.andWhere('attachment.createdAt < :oneDayAgo', { oneDayAgo });
if (pendingAttachmentIds.length > 0) {
query.andWhere('attachment.id NOT IN (:...pendingAttachmentIds)', {
pendingAttachmentIds,
});
}
const expiredAttachments = await query.getMany();
if (expiredAttachments.length === 0) {
this.logger.log('No expired temporary files found.');
return;
}
this.logger.log(
`Found ${expiredAttachments.length} expired temporary files. Deleting...`
);
let deletedCount = 0;
let failedCount = 0;
for (const att of expiredAttachments) {
try {
if (await fs.pathExists(att.filePath)) {
await fs.remove(att.filePath);
}
await this.attachmentRepository.remove(att);
deletedCount++;
} catch (error) {
const errMessage = (error as Error).message;
this.logger.error(
`Failed to delete temporary file ID ${att.id}: ${errMessage}`
);
failedCount++;
}
}
this.logger.log(
`Temporary files cleanup completed. Deleted: ${deletedCount}, Failed: ${failedCount}`
);
} catch (err) {
const errMsg = (err as Error).message;
this.logger.error(
`Error occurred during temporary files cleanup: ${errMsg}`
);
}
}
}