feat(ai): unify AI architecture, implement RAG and legacy migration
This commit is contained in:
@@ -0,0 +1,206 @@
|
||||
// File: src/modules/ai/processors/rag.processor.spec.ts
|
||||
// Change Log
|
||||
// - 2026-05-14: เพิ่ม Unit Test สำหรับ AiRagProcessor — ตรวจสอบ concurrency=1 และ AbortController (T030).
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
import { Job } from 'bullmq';
|
||||
import { AiRagProcessor } from './rag.processor';
|
||||
import { AiRagService } from '../ai-rag.service';
|
||||
import { AiRagJobPayload } from '../ai-queue.service';
|
||||
import { QUEUE_AI_RAG } from '../../common/constants/queue.constants';
|
||||
|
||||
// ─── Helpers ─────────────────────────────────────────────────────────────────
|
||||
|
||||
/** สร้าง mock BullMQ Job สำหรับ RAG query */
|
||||
function makeJob(
|
||||
overrides: Partial<{
|
||||
id: string;
|
||||
requestPublicId: string;
|
||||
userPublicId: string;
|
||||
projectPublicId: string;
|
||||
query: string;
|
||||
}> = {}
|
||||
): Job<AiRagJobPayload> {
|
||||
return {
|
||||
id: overrides.id ?? 'job-001',
|
||||
data: {
|
||||
requestPublicId: overrides.requestPublicId ?? 'req-uuid-001',
|
||||
userPublicId: overrides.userPublicId ?? 'user-uuid-001',
|
||||
projectPublicId: overrides.projectPublicId ?? 'proj-uuid-001',
|
||||
query: overrides.query ?? 'What is the project scope?',
|
||||
},
|
||||
} as unknown as Job<AiRagJobPayload>;
|
||||
}
|
||||
|
||||
// ─── Tests ───────────────────────────────────────────────────────────────────
|
||||
|
||||
describe('AiRagProcessor', () => {
|
||||
let processor: AiRagProcessor;
|
||||
let ragService: jest.Mocked<AiRagService>;
|
||||
|
||||
const mockRagService: Partial<jest.Mocked<AiRagService>> = {
|
||||
processQuery: jest.fn().mockResolvedValue(undefined),
|
||||
getActiveJob: jest.fn().mockResolvedValue(null),
|
||||
registerActiveJob: jest.fn().mockResolvedValue(undefined),
|
||||
clearActiveJob: jest.fn().mockResolvedValue(undefined),
|
||||
cancelJob: jest.fn().mockResolvedValue(undefined),
|
||||
getJobResult: jest.fn().mockResolvedValue(null),
|
||||
saveJobResult: jest.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
AiRagProcessor,
|
||||
{ provide: AiRagService, useValue: mockRagService },
|
||||
],
|
||||
}).compile();
|
||||
|
||||
processor = module.get<AiRagProcessor>(AiRagProcessor);
|
||||
ragService = module.get(AiRagService);
|
||||
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
// ─── T030 Core: ตรวจสอบ concurrency=1 metadata ──────────────────────────
|
||||
|
||||
it('ควรมี @Processor decorator พร้อม queue name ที่ถูกต้อง', () => {
|
||||
// ตรวจสอบ QUEUE_AI_RAG constant ตรงกับที่ใช้ใน processor
|
||||
expect(QUEUE_AI_RAG).toBe('ai-rag-query');
|
||||
});
|
||||
|
||||
it('ควร process job และเรียก ragService.processQuery ด้วย AbortSignal', async () => {
|
||||
const job = makeJob({
|
||||
requestPublicId: 'req-abc',
|
||||
userPublicId: 'user-abc',
|
||||
query: 'test question',
|
||||
});
|
||||
|
||||
await processor.process(job);
|
||||
|
||||
expect(ragService.processQuery).toHaveBeenCalledTimes(1);
|
||||
expect(ragService.processQuery).toHaveBeenCalledWith(
|
||||
'req-abc',
|
||||
'test question',
|
||||
'proj-uuid-001',
|
||||
'user-abc',
|
||||
expect.any(AbortSignal) // T022: AbortSignal ต้องถูกส่งเข้าไปด้วย
|
||||
);
|
||||
});
|
||||
|
||||
it('ควร cleanup AbortController หลัง process เสร็จ (no memory leak)', async () => {
|
||||
const job = makeJob({ requestPublicId: 'req-cleanup' });
|
||||
|
||||
await processor.process(job);
|
||||
|
||||
// หลัง process เสร็จ ไม่ควรมี controller ค้างอยู่
|
||||
const aborted = processor.abortJob('req-cleanup');
|
||||
expect(aborted).toBe(false); // ถูก cleanup แล้ว
|
||||
});
|
||||
|
||||
it('ควร cleanup AbortController แม้ว่า processQuery จะ throw error', async () => {
|
||||
const job = makeJob({ requestPublicId: 'req-error' });
|
||||
ragService.processQuery.mockRejectedValueOnce(new Error('Ollama timeout'));
|
||||
|
||||
// ไม่ควร throw ออกมา (processor จัดการ error ภายใน)
|
||||
await expect(processor.process(job)).rejects.toThrow('Ollama timeout');
|
||||
|
||||
// ยังต้อง cleanup controller
|
||||
const aborted = processor.abortJob('req-error');
|
||||
expect(aborted).toBe(false);
|
||||
});
|
||||
|
||||
it('abortJob ควรคืน true เมื่อ job กำลัง processing', async () => {
|
||||
const requestPublicId = 'req-inprogress';
|
||||
// จำลอง processQuery ที่ใช้เวลานาน
|
||||
ragService.processQuery.mockImplementationOnce(
|
||||
(_reqId, _q, _proj, _user, signal) =>
|
||||
new Promise<void>((_resolve, reject) => {
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', () =>
|
||||
reject(new Error('aborted'))
|
||||
);
|
||||
}
|
||||
// ไม่ resolve เพื่อจำลอง long-running job
|
||||
})
|
||||
);
|
||||
|
||||
const job = makeJob({ requestPublicId });
|
||||
const processingPromise = processor.process(job).catch(() => {
|
||||
/* expected */
|
||||
});
|
||||
|
||||
// รอให้ controller ถูก register ก่อน abort
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
const result = processor.abortJob(requestPublicId);
|
||||
expect(result).toBe(true);
|
||||
|
||||
await processingPromise;
|
||||
});
|
||||
|
||||
it('abortJob ควรคืน false เมื่อไม่มี job ที่ requestPublicId นั้น', () => {
|
||||
const result = processor.abortJob('non-existent-job');
|
||||
expect(result).toBe(false);
|
||||
});
|
||||
|
||||
// ─── T030 Stress: ตรวจสอบ 1-active-job-per-user enforcement ─────────────
|
||||
|
||||
describe('1-Active-Job-Per-User Enforcement (FR-009 concurrency=1)', () => {
|
||||
it('ควรส่งคืน requestPublicId เดิมเมื่อ user มี active job อยู่แล้ว', async () => {
|
||||
const existingJobId = 'existing-request-uuid';
|
||||
ragService.getActiveJob.mockResolvedValueOnce(existingJobId);
|
||||
|
||||
const activeJob = await ragService.getActiveJob('user-uuid-999');
|
||||
expect(activeJob).toBe(existingJobId);
|
||||
});
|
||||
|
||||
it('ควรสามารถ registerActiveJob และ getActiveJob ได้สำหรับ user คนเดียว', async () => {
|
||||
const userPublicId = 'user-stress-test';
|
||||
const requestPublicId = 'new-req-uuid';
|
||||
|
||||
ragService.getActiveJob.mockResolvedValueOnce(null);
|
||||
ragService.registerActiveJob.mockResolvedValueOnce(undefined);
|
||||
ragService.getActiveJob.mockResolvedValueOnce(requestPublicId);
|
||||
|
||||
// ไม่มี active job เริ่มต้น
|
||||
const beforeJob = await ragService.getActiveJob(userPublicId);
|
||||
expect(beforeJob).toBeNull();
|
||||
|
||||
// ลงทะเบียน job
|
||||
await ragService.registerActiveJob(userPublicId, requestPublicId);
|
||||
|
||||
// ตรวจสอบว่า active job ถูกเก็บแล้ว
|
||||
const afterJob = await ragService.getActiveJob(userPublicId);
|
||||
expect(afterJob).toBe(requestPublicId);
|
||||
});
|
||||
|
||||
it('stress test: 10 requests ต่อเนื่อง — ควรพบ active job ตั้งแต่ request ที่ 2 เป็นต้นไป', async () => {
|
||||
const userPublicId = 'user-concurrent';
|
||||
const firstRequestId = 'first-req-uuid';
|
||||
|
||||
// ครั้งแรกไม่มี active job, หลังจากนั้นมี
|
||||
ragService.getActiveJob
|
||||
.mockResolvedValueOnce(null) // request 1: ไม่มี active job
|
||||
.mockResolvedValue(firstRequestId); // request 2-10: พบ active job
|
||||
|
||||
ragService.registerActiveJob.mockResolvedValue(undefined);
|
||||
|
||||
// Request 1: ไม่มี active job — ควรสร้างใหม่
|
||||
const req1Active = await ragService.getActiveJob(userPublicId);
|
||||
expect(req1Active).toBeNull();
|
||||
await ragService.registerActiveJob(userPublicId, firstRequestId);
|
||||
|
||||
// Requests 2-10: ทุกคำขอควรพบ active job เดิม
|
||||
const concurrentChecks = await Promise.all(
|
||||
Array.from({ length: 9 }, () => ragService.getActiveJob(userPublicId))
|
||||
);
|
||||
|
||||
concurrentChecks.forEach((activeId) => {
|
||||
expect(activeId).toBe(firstRequestId);
|
||||
});
|
||||
|
||||
// ยืนยันว่า registerActiveJob ถูกเรียกแค่ครั้งเดียว (job เดียว)
|
||||
expect(ragService.registerActiveJob).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,86 @@
|
||||
// File: src/modules/ai/processors/rag.processor.ts
|
||||
// Change Log
|
||||
// - 2026-05-14: เพิ่ม BullMQ Processor สำหรับ RAG query ตาม ADR-023 Phase 4 (T018, T022).
|
||||
// Processor นี้ใช้ concurrency = 1 เพื่อป้องกัน OOM บน Desk-5439 (FR-009)
|
||||
|
||||
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { Job } from 'bullmq';
|
||||
import { AiRagService } from '../ai-rag.service';
|
||||
import { AiRagJobPayload } from '../ai-queue.service';
|
||||
import { QUEUE_AI_RAG } from '../../common/constants/queue.constants';
|
||||
|
||||
/**
|
||||
* Processor สำหรับ RAG query queue
|
||||
* - concurrency: 1 เพื่อป้องกัน VRAM overflow บน Desk-5439 (FR-009, Research Unknown 3)
|
||||
* - รองรับ AbortController เพื่อยกเลิก LLM generation เมื่อ client disconnect (T022, FR-011)
|
||||
*/
|
||||
@Processor(QUEUE_AI_RAG, { concurrency: 1 })
|
||||
export class AiRagProcessor extends WorkerHost {
|
||||
private readonly logger = new Logger(AiRagProcessor.name);
|
||||
|
||||
/** Map สำหรับเก็บ AbortController ของแต่ละ job (T022) */
|
||||
private readonly abortControllers = new Map<string, AbortController>();
|
||||
|
||||
constructor(private readonly ragService: AiRagService) {
|
||||
super();
|
||||
}
|
||||
|
||||
/** ประมวลผล RAG query job */
|
||||
async process(job: Job<AiRagJobPayload>): Promise<void> {
|
||||
const { requestPublicId, userPublicId, projectPublicId, query } = job.data;
|
||||
this.logger.log(
|
||||
`Processing RAG job — requestPublicId=${requestPublicId}, user=${userPublicId}`
|
||||
);
|
||||
|
||||
// สร้าง AbortController สำหรับ job นี้ (T022)
|
||||
const controller = new AbortController();
|
||||
this.abortControllers.set(requestPublicId, controller);
|
||||
|
||||
try {
|
||||
await this.ragService.processQuery(
|
||||
requestPublicId,
|
||||
query,
|
||||
projectPublicId,
|
||||
userPublicId,
|
||||
controller.signal
|
||||
);
|
||||
} finally {
|
||||
this.abortControllers.delete(requestPublicId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort การประมวลผล LLM สำหรับ job ที่ระบุ (T022 — FR-011)
|
||||
* ถูกเรียกจาก AiRagService.cancelJob() ผ่าน Redis cancel flag
|
||||
*/
|
||||
abortJob(requestPublicId: string): boolean {
|
||||
const controller = this.abortControllers.get(requestPublicId);
|
||||
if (controller) {
|
||||
controller.abort();
|
||||
this.abortControllers.delete(requestPublicId);
|
||||
this.logger.log(`Aborted RAG job — requestPublicId=${requestPublicId}`);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Log เมื่อ job เสร็จสมบูรณ์ */
|
||||
@OnWorkerEvent('completed')
|
||||
onCompleted(job: Job<AiRagJobPayload>): void {
|
||||
this.logger.log(
|
||||
`RAG job completed — jobId=${String(job.id)}, requestPublicId=${job.data.requestPublicId}`
|
||||
);
|
||||
}
|
||||
|
||||
/** Log และ cleanup เมื่อ job ล้มเหลว */
|
||||
@OnWorkerEvent('failed')
|
||||
onFailed(job: Job<AiRagJobPayload> | undefined, err: Error): void {
|
||||
const id = job?.data?.requestPublicId ?? 'unknown';
|
||||
// ยกเลิก abort controller ที่ค้างไว้
|
||||
if (job?.data?.requestPublicId) {
|
||||
this.abortControllers.delete(job.data.requestPublicId);
|
||||
}
|
||||
this.logger.error(`RAG job failed — requestPublicId=${id}: ${err.message}`);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
// File: src/modules/ai/processors/vector-deletion.processor.ts
|
||||
// Change Log
|
||||
// - 2026-05-14: เพิ่ม BullMQ Processor สำหรับลบ vector ใน Qdrant แบบ async ตาม ADR-023 FR-008 (T027).
|
||||
import { Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { Job } from 'bullmq';
|
||||
import { QUEUE_AI_VECTOR_DELETION } from '../../common/constants/queue.constants';
|
||||
import { AiQdrantService } from '../qdrant.service';
|
||||
import { AiVectorDeletionJobPayload } from '../ai-queue.service';
|
||||
|
||||
/**
|
||||
* Processor สำหรับลบ vector ของเอกสารที่ถูกลบออกจาก Qdrant แบบ asynchronous
|
||||
* รองรับ retry 3 ครั้ง (ADR-008 + FR-008) เพื่อ eventual consistency เมื่อ Qdrant ไม่พร้อม
|
||||
*/
|
||||
@Processor(QUEUE_AI_VECTOR_DELETION)
|
||||
export class AiVectorDeletionProcessor extends WorkerHost {
|
||||
private readonly logger = new Logger(AiVectorDeletionProcessor.name);
|
||||
|
||||
constructor(private readonly qdrantService: AiQdrantService) {
|
||||
super();
|
||||
}
|
||||
|
||||
async process(job: Job<AiVectorDeletionJobPayload>): Promise<void> {
|
||||
const { documentPublicId, requestedByUserPublicId } = job.data;
|
||||
|
||||
this.logger.log(
|
||||
`Vector deletion started — documentPublicId=${documentPublicId}, jobId=${String(job.id)}, requestedBy=${requestedByUserPublicId}`
|
||||
);
|
||||
|
||||
await this.qdrantService.deleteByDocumentPublicId(documentPublicId);
|
||||
|
||||
this.logger.log(
|
||||
`Vector deletion completed — documentPublicId=${documentPublicId}, jobId=${String(job.id)}`
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user