feat(rfa-ai): Complete RFA Approval Refactor and AI Model Revision
CI / CD Pipeline / build (push) Successful in 4m54s
CI / CD Pipeline / deploy (push) Failing after 12m9s

This commit is contained in:
2026-05-16 10:59:53 +07:00
parent 6cb3ae10ee
commit 1a162bf320
105 changed files with 5088 additions and 1083 deletions
@@ -0,0 +1,113 @@
// File: src/modules/ai/processors/ai-batch.processor.ts
// Change Log
// - 2026-05-15: เพิ่ม processor สำหรับ ai-batch queue ตาม ADR-023A.
// - 2026-05-15: เพิ่ม EmbeddingService สำหรับ embed-document logic (T022).
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Attachment } from '../../../common/file-storage/entities/attachment.entity';
import { QUEUE_AI_BATCH } from '../../common/constants/queue.constants';
import { EmbeddingService } from '../services/embedding.service';
export type AiBatchJobType = 'ocr' | 'extract-metadata' | 'embed-document';
export interface AiBatchJobData {
jobType: AiBatchJobType;
documentPublicId: string;
projectPublicId: string;
payload: Record<string, unknown>;
batchId?: string;
idempotencyKey: string;
}
/** Processor สำหรับงาน AI batch ที่รันทีละงานเพื่อคุม VRAM */
@Processor(QUEUE_AI_BATCH, { concurrency: 1 })
export class AiBatchProcessor extends WorkerHost {
private readonly logger = new Logger(AiBatchProcessor.name);
constructor(
@InjectRepository(Attachment)
private readonly attachmentRepo: Repository<Attachment>,
private readonly embeddingService: EmbeddingService
) {
super();
}
/** Dispatch งาน batch ตาม jobType */
async process(job: Job<AiBatchJobData>): Promise<void> {
await this.setAiProcessingStatus(job.data.documentPublicId, 'PROCESSING');
try {
switch (job.data.jobType) {
case 'ocr':
this.logger.log(`OCR batch job processing — jobId=${String(job.id)}`);
// OCR logic handled by OcrService in ai-realtime processor
await this.setAiProcessingStatus(job.data.documentPublicId, 'DONE');
return;
case 'extract-metadata':
this.logger.log(
`Metadata extraction job processing — jobId=${String(job.id)}`
);
// Metadata extraction handled in ai-realtime processor
await this.setAiProcessingStatus(job.data.documentPublicId, 'DONE');
return;
case 'embed-document':
this.logger.log(`Embedding job processing — jobId=${String(job.id)}`);
await this.processEmbedDocument(job.data);
await this.setAiProcessingStatus(job.data.documentPublicId, 'DONE');
return;
default: {
const unreachable: never = job.data.jobType;
throw new Error(
`Unsupported ai-batch jobType: ${String(unreachable)}`
);
}
}
} catch (err) {
this.logger.error(
`Batch job failed — jobType=${job.data.jobType}, documentPublicId=${job.data.documentPublicId}`,
err instanceof Error ? err.stack : String(err)
);
await this.setAiProcessingStatus(job.data.documentPublicId, 'FAILED');
throw err;
}
}
/** ประมวลผล embed-document job ด้วย EmbeddingService (T022) */
private async processEmbedDocument(data: AiBatchJobData): Promise<void> {
const { documentPublicId, projectPublicId, payload } = data;
const pdfPath = payload.pdfPath as string;
const extractedText = payload.extractedText as string | undefined;
if (!pdfPath) {
throw new Error('pdfPath is required for embed-document job');
}
const result = await this.embeddingService.embedDocument(
pdfPath,
documentPublicId,
projectPublicId,
extractedText
);
if (!result.success) {
throw new Error(`Embedding failed: ${result.error ?? 'Unknown error'}`);
}
this.logger.log(
`Embedding completed for document ${documentPublicId}${result.chunksEmbedded} chunks embedded`
);
}
private async setAiProcessingStatus(
documentPublicId: string,
status: 'PENDING' | 'PROCESSING' | 'DONE' | 'FAILED'
): Promise<void> {
await this.attachmentRepo.update(
{ publicId: documentPublicId },
{ aiProcessingStatus: status }
);
}
}
@@ -0,0 +1,228 @@
// File: src/modules/ai/processors/ai-realtime.processor.ts
// Change Log
// - 2026-05-15: เพิ่ม processor สำหรับ ai-realtime queue และ pause/resume ai-batch ตาม ADR-023A.
import {
Processor,
WorkerHost,
OnWorkerEvent,
InjectQueue,
} from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import {
QUEUE_AI_BATCH,
QUEUE_AI_REALTIME,
} from '../../common/constants/queue.constants';
import { AiAuditLog, AiAuditStatus } from '../entities/ai-audit-log.entity';
import { Attachment } from '../../../common/file-storage/entities/attachment.entity';
import { OcrService } from '../services/ocr.service';
import { OllamaService } from '../services/ollama.service';
export type AiRealtimeJobType = 'ai-suggest' | 'rag-query';
export interface AiRealtimeJobData {
jobType: AiRealtimeJobType;
documentPublicId?: string;
projectPublicId: string;
userId?: number;
payload: Record<string, unknown>;
idempotencyKey: string;
}
/** Processor สำหรับงาน AI interactive ที่ต้องกัน batch job ระหว่างใช้ GPU */
@Processor(QUEUE_AI_REALTIME, { concurrency: 1 })
export class AiRealtimeProcessor extends WorkerHost {
private readonly logger = new Logger(AiRealtimeProcessor.name);
constructor(
@InjectQueue(QUEUE_AI_BATCH)
private readonly aiBatchQueue: Queue,
private readonly ocrService: OcrService,
private readonly ollamaService: OllamaService,
@InjectRepository(AiAuditLog)
private readonly aiAuditLogRepo: Repository<AiAuditLog>,
@InjectRepository(Attachment)
private readonly attachmentRepo: Repository<Attachment>
) {
super();
}
/** Dispatch งาน ai-realtime ตาม jobType */
async process(job: Job<AiRealtimeJobData>): Promise<unknown> {
switch (job.data.jobType) {
case 'ai-suggest':
return this.processSuggest(job);
case 'rag-query':
this.logger.log(`RAG query queued — jobId=${String(job.id)}`);
return;
default: {
const unreachable: never = job.data.jobType;
throw new Error(
`Unsupported ai-realtime jobType: ${String(unreachable)}`
);
}
}
}
private async processSuggest(
job: Job<AiRealtimeJobData>
): Promise<Record<string, unknown>> {
const startTime = Date.now();
try {
if (job.data.documentPublicId) {
await this.setAiProcessingStatus(
job.data.documentPublicId,
'PROCESSING'
);
}
const extractedText =
typeof job.data.payload['extractedText'] === 'string'
? job.data.payload['extractedText']
: '';
const pdfPath =
typeof job.data.payload['pdfPath'] === 'string'
? job.data.payload['pdfPath']
: undefined;
const extractedChars =
typeof job.data.payload['extractedChars'] === 'number'
? job.data.payload['extractedChars']
: extractedText.length;
const textResult = await this.ocrService.detectAndExtract({
extractedText,
extractedChars,
pdfPath,
});
const prompt = [
'Extract concise DMS metadata from this engineering document.',
'Return only JSON with fields: title, documentType, category, confidenceScore.',
textResult.text.slice(0, 6000),
].join('\n');
const rawOutput = await this.ollamaService.generate(prompt);
const suggestion = this.parseSuggestion(rawOutput);
const normalizedSuggestion = this.flagUnknownCategories(
suggestion,
job.data.payload['masterDataCategories']
);
await this.aiAuditLogRepo.save(
this.aiAuditLogRepo.create({
documentPublicId: job.data.documentPublicId,
aiModel: 'gemma4',
modelName: this.ollamaService.getMainModelName(),
aiSuggestionJson: normalizedSuggestion,
confidenceScore: this.extractConfidence(normalizedSuggestion),
processingTimeMs: Date.now() - startTime,
status: AiAuditStatus.SUCCESS,
})
);
if (job.data.documentPublicId) {
await this.setAiProcessingStatus(job.data.documentPublicId, 'DONE');
}
return {
suggestion: normalizedSuggestion,
ocrUsed: textResult.ocrUsed,
};
} catch (err) {
if (job.data.documentPublicId) {
await this.setAiProcessingStatus(job.data.documentPublicId, 'FAILED');
}
await this.aiAuditLogRepo.save(
this.aiAuditLogRepo.create({
documentPublicId: job.data.documentPublicId,
aiModel: 'gemma4',
modelName: this.ollamaService.getMainModelName(),
processingTimeMs: Date.now() - startTime,
status: AiAuditStatus.FAILED,
errorMessage: err instanceof Error ? err.message : String(err),
})
);
throw err;
}
}
private parseSuggestion(rawOutput: string): Record<string, unknown> {
try {
const parsed = JSON.parse(rawOutput) as unknown;
if (parsed && typeof parsed === 'object' && !Array.isArray(parsed)) {
return parsed as Record<string, unknown>;
}
} catch {
this.logger.warn('AI suggestion output was not valid JSON');
}
return {
title: rawOutput.slice(0, 250),
confidenceScore: 0,
is_unknown: true,
};
}
private flagUnknownCategories(
suggestion: Record<string, unknown>,
masterDataCategories: unknown
): Record<string, unknown> {
if (!Array.isArray(masterDataCategories)) return suggestion;
const knownValues = new Set(
masterDataCategories
.filter((value): value is string => typeof value === 'string')
.map((value) => value.toLowerCase())
);
const category = suggestion['category'];
if (
typeof category === 'string' &&
!knownValues.has(category.toLowerCase())
) {
return { ...suggestion, is_unknown: true };
}
return suggestion;
}
private extractConfidence(
suggestion: Record<string, unknown>
): number | undefined {
const confidence = suggestion['confidenceScore'];
return typeof confidence === 'number' ? confidence : undefined;
}
private async setAiProcessingStatus(
documentPublicId: string,
status: 'PENDING' | 'PROCESSING' | 'DONE' | 'FAILED'
): Promise<void> {
await this.attachmentRepo.update(
{ publicId: documentPublicId },
{ aiProcessingStatus: status }
);
}
/** เมื่อ interactive job เริ่ม ให้ pause batch queue เพื่อกัน GPU contention */
@OnWorkerEvent('active')
async onActive(job: Job<AiRealtimeJobData>): Promise<void> {
await this.aiBatchQueue.pause();
this.logger.warn(
`ai-batch paused while ai-realtime job is active — jobId=${String(job.id)}`
);
}
/** เมื่อ interactive job เสร็จ ให้ resume batch queue */
@OnWorkerEvent('completed')
async onCompleted(job: Job<AiRealtimeJobData>): Promise<void> {
await this.aiBatchQueue.resume();
this.logger.log(
`ai-batch resumed after ai-realtime completion — jobId=${String(job.id)}`
);
}
/** เมื่อ interactive job fail ให้ resume batch queue เช่นกัน */
@OnWorkerEvent('failed')
async onFailed(job: Job<AiRealtimeJobData> | undefined): Promise<void> {
await this.aiBatchQueue.resume();
this.logger.warn(
`ai-batch resumed after ai-realtime failure — jobId=${String(job?.id ?? 'unknown')}`
);
}
}