Files
lcbp3/specs/200-fullstacks/234-rag-pipeline-enhancements/plan.md
T
admin 26cc71ce60
CI / CD Pipeline / build (push) Successful in 4m54s
CI / CD Pipeline / deploy (push) Successful in 6m19s
690605:2335 ADR-035-135 #1
2026-06-05 23:35:22 +07:00

239 lines
9.5 KiB
Markdown

// File: specs/200-fullstacks/234-rag-pipeline-enhancements/plan.md
// Change Log:
// - 2026-06-05: Initial implementation plan for RAG Pipeline Enhancements
# Implementation Plan: RAG Pipeline Enhancements
**Branch**: `234-rag-pipeline-enhancements` | **Date**: 2026-06-05 | **Spec**: [spec.md](./spec.md)
**ADR Reference**: [ADR-035](../../06-Decision-Records/ADR-035-ai-pipeline-flow-architecture.md)
---
## Summary
เพิ่ม BGE-M3 embedding + BGE-Reranker-Large + Semantic Chunking เข้า OCR Sidecar, แปลง Qdrant collection `lcbp3_vectors` เป็น Hybrid (1024 dims), และ wire RAG Prep trigger ที่ `syncStatus()` เมื่อเอกสาร Correspondence ผ่าน DRAFT → SUBOWN
แนวทาง: เพิ่ม `/embed` + `/rerank` ใน `app.py` → refactor `EmbeddingService` + `AiQdrantService` → เพิ่ม `rag-prepare` case ใน `AiBatchProcessor` → hook trigger ใน `CorrespondenceWorkflowService`
---
## Technical Context
**Language/Version**: Python 3.11 (Sidecar), TypeScript 5.x / NestJS 11 (Backend)
**Primary Dependencies**: `FlagEmbedding>=1.2.0` (BGE-M3 + Reranker), `@qdrant/js-client-rest`, BullMQ 5, Ollama
**Storage**: Qdrant (vector DB), MariaDB 11.8 (metadata), Redis (job state)
**Testing**: Jest (backend unit + integration)
**Target Platform**: Docker on Desk-5439 (Windows 10, CPU RAM for BGE-M3)
**Project Type**: Web application (backend + sidecar)
**Performance Goals**: embed 50-page doc < 5 min; RAG query < 30s end-to-end
**Constraints**: BGE-M3 ~2.3GB + Reranker ~1.5GB on CPU RAM; BullMQ concurrency=1 (ai-batch)
**Scale/Scope**: Correspondence module only — embed เมื่อ status OUT_OF_DRAFT
---
## Constitution Check
| Rule | Status | Note |
|------|--------|------|
| ADR-019: ไม่มี parseInt บน UUID | ✅ Pass | ใช้ `publicId` string ตลอด |
| ADR-009: No TypeORM migrations | ✅ Pass | เพิ่ม `rag_chunking` prompt ผ่าน SQL delta |
| ADR-008: BullMQ สำหรับ background jobs | ✅ Pass | `rag-prepare` ผ่าน `ai-batch` queue |
| ADR-023A: AI boundary — ไม่ bypass queue | ✅ Pass | Controller → Queue → Processor → Sidecar |
| ADR-023A: `projectPublicId` mandatory filter | ✅ Pass | enforce ใน `AiQdrantService` |
| ADR-029: Prompt จาก `ai_prompts` DB | ✅ Pass | `rag_chunking` prompt type ใหม่ |
| ADR-007: Error handling layered | ✅ Pass | retry 3x ใน BullMQ + fallback chunking |
| ADR-016: CASL guard | ✅ Pass | ใช้ guard เดิมของ ai module |
---
## Project Structure
### Documentation (this feature)
```text
specs/200-fullstacks/234-rag-pipeline-enhancements/
├── plan.md ← this file
├── spec.md
├── data-model.md ← Phase 1
├── contracts/ ← Phase 1
│ ├── POST-embed.md
│ └── POST-rerank.md
└── tasks.md ← Phase 2 (speckit-tasks)
```
### Source Code
```text
specs/04-Infrastructure-OPS/04-00-docker-compose/Desk-5439/ocr-sidecar/
├── app.py ← เพิ่ม /embed + /rerank + BGE-M3 init
└── requirements.txt ← เพิ่ม FlagEmbedding>=1.2.0
backend/src/modules/ai/
├── qdrant.service.ts ← Hybrid schema, vector size 1024, payload ครบ 10 fields
├── services/
│ └── embedding.service.ts ← semantic chunking + BGE-M3 via Sidecar
├── ai-rag.service.ts ← BGE-M3 embed + Reranker step
├── ai-queue.service.ts ← เพิ่ม enqueueRagPrepare()
└── processors/
└── ai-batch.processor.ts ← เพิ่ม case 'rag-prepare'
backend/src/modules/correspondence/
└── correspondence-workflow.service.ts ← trigger rag-prepare ใน syncStatus()
specs/03-Data-and-Storage/deltas/
└── 2026-06-05-add-rag-chunking-prompt.sql
```
**Structure Decision**: Web application — backend NestJS + Python sidecar; ไม่มี frontend changes ใน scope นี้
---
## Phase 0: Research Findings
### R1 — BGE-M3 Python API (FlagEmbedding)
```python
from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False) # CPU mode
output = model.encode(['text'], return_dense=True, return_sparse=True)
# output['dense_vecs'] → list[float] ขนาด 1024
# output['lexical_weights'] → dict {token_id: float}
# แปลง sparse: indices = list(keys), values = list(values)
```
### R2 — BGE-Reranker Python API
```python
from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=False)
scores = reranker.compute_score([['query', chunk] for chunk in chunks])
# คืน list[float] — sort descending เพื่อได้ top-N
```
### R3 — Qdrant Hybrid Collection (JS Client)
```typescript
// drop + recreate
await client.deleteCollection('lcbp3_vectors');
await client.createCollection('lcbp3_vectors', {
vectors: { bge_dense: { size: 1024, distance: 'Cosine' } },
sparse_vectors: { bge_sparse: {} }
});
// upsert hybrid point
await client.upsert('lcbp3_vectors', { points: [{
id: uuid,
vector: { bge_dense: denseArray, bge_sparse: { indices, values } },
payload: { doc_public_id, project_public_id, doc_number, doc_type,
status_code, revision_number, subject, document_date,
chunk_topic, chunk_index, chunk_text }
}]});
// hybrid search (RRF fusion)
await client.query('lcbp3_vectors', {
prefetch: [
{ query: { indices, values }, using: 'bge_sparse', limit: 20 },
{ query: denseArray, using: 'bge_dense', limit: 20 },
],
query: { fusion: 'rrf' },
limit: 15,
filter: { must: [{ key: 'project_public_id', match: { value: projectId } }] }
});
```
### R4 — OCR Text Cache
`correspondence_revisions` ไม่มี field เก็บ OCR text โดยตรง — `rag-prepare` job รับ `cachedOcrText?: string` ใน payload; ถ้าไม่มีให้เรียก Sidecar `/ocr` ผ่าน attachment path
---
## Phase 1: Implementation Design
### API Contracts
**Sidecar — POST /embed**
```
Request: { "text": string }
Response: { "dense": number[1024], "sparse": { "indices": number[], "values": number[] } }
Auth: X-API-Key header (ค่าเดิมจาก app.py)
```
**Sidecar — POST /rerank**
```
Request: { "query": string, "chunks": string[] }
Response: { "scores": number[], "ranked_indices": number[] }
Auth: X-API-Key header
```
**Backend internal type — RagPrepareJobPayload**
```typescript
interface RagPrepareJobPayload {
documentPublicId: string;
projectPublicId: string;
correspondenceNumber: string;
docType: string;
statusCode: string;
revisionNumber: number;
subject: string;
documentDate?: string;
cachedOcrText?: string;
attachmentPath?: string;
}
```
### Implementation Phases
#### Phase A — OCR Sidecar (ไม่กระทบ endpoints เดิม)
1. `requirements.txt` — เพิ่ม `FlagEmbedding>=1.2.0`
2. `app.py` — โหลด BGE-M3 + Reranker ตอน startup (global singleton, CPU)
3. `app.py` — เพิ่ม `POST /embed` endpoint
4. `app.py` — เพิ่ม `POST /rerank` endpoint
#### Phase B — AiQdrantService: Hybrid Schema
5. `AI_VECTOR_SIZE` = 1024 (เดิม 768)
6. `ensureCollection()` → drop + recreate Hybrid collection
7. `upsert()` → รับ `denseVector` + `sparseVector` + payload ครบ 11 fields (รวม `chunk_text`)
8. `search()` / `searchByProject()` → Hybrid query (RRF fusion)
9. `deleteByDocumentPublicId()` → filter บน `doc_public_id` payload field
#### Phase C — EmbeddingService: Semantic Chunking + BGE-M3
10. `semanticChunkText(ocrText)` method — call typhoon2.5 ด้วย prompt `rag_chunking` จาก `ai_prompts`
11. `parseChunkTags(llmOutput)` — parse `<chunk topic="...">` tags, fallback fixed-size
12. `embedChunk(text)` — เรียก Sidecar `POST /embed` แทน Ollama nomic
#### Phase D — AiQueueService + AiBatchProcessor
13. `ai-queue.service.ts``enqueueRagPrepare(payload: RagPrepareJobPayload)`
14. `ai-batch.processor.ts` → กำหนด concurrency = 1 ใน `@Processor` และเพิ่ม `case 'rag-prepare': processRagPrepare(data)`
15. `processRagPrepare()` — OCR (cached/fallback) → chunk → normalize → embed → delete old → upsert
#### Phase E — AiRagService: Reranker Integration
16. `embed(text)` → เรียก Sidecar `/embed` แทน Ollama
17. เพิ่ม rerank step หลัง `searchByProject()` → Sidecar `/rerank` → top 3-5 chunks
#### Phase F — Trigger Hook
18. `CorrespondenceWorkflowService` → inject `AiQueueService`
19. `syncStatus()` → หลัง save ถ้า `targetCode !== 'DRAFT'``enqueueRagPrepare()`
#### Phase G — SQL Delta + Tests
20. `deltas/2026-06-05-add-rag-chunking-prompt.sql` — INSERT `ai_prompts` (`rag_chunking`)
21. Unit tests: `embedding.service.spec.ts` (semantic chunking, fallback)
22. Unit tests: `ai-batch.processor.spec.ts` (rag-prepare case)
---
## Risks & Mitigations
| Risk | Likelihood | Mitigation |
|------|-----------|------------|
| BGE-M3 ใช้ RAM > 4GB บน Desk-5439 | Medium | ทดสอบ RAM ก่อน deploy; ใช้ `use_fp16=False` CPU mode |
| Qdrant drop collection → Chat Q&A unavailable ชั่วคราว | Low | deploy off-hours; Flow 4 return empty ไม่ error |
| Semantic chunking ไม่มี `<chunk>` tag | Medium | fallback fixed-size chunking ป้องกัน job fail |
| `syncStatus()` trigger ซ้ำซ้อน | Low | delete + re-embed เป็น idempotent |