9.5 KiB
// File: specs/200-fullstacks/234-rag-pipeline-enhancements/plan.md // Change Log: // - 2026-06-05: Initial implementation plan for RAG Pipeline Enhancements
Implementation Plan: RAG Pipeline Enhancements
Branch: 234-rag-pipeline-enhancements | Date: 2026-06-05 | Spec: spec.md
ADR Reference: ADR-035
Summary
เพิ่ม BGE-M3 embedding + BGE-Reranker-Large + Semantic Chunking เข้า OCR Sidecar, แปลง Qdrant collection lcbp3_vectors เป็น Hybrid (1024 dims), และ wire RAG Prep trigger ที่ syncStatus() เมื่อเอกสาร Correspondence ผ่าน DRAFT → SUBOWN
แนวทาง: เพิ่ม /embed + /rerank ใน app.py → refactor EmbeddingService + AiQdrantService → เพิ่ม rag-prepare case ใน AiBatchProcessor → hook trigger ใน CorrespondenceWorkflowService
Technical Context
Language/Version: Python 3.11 (Sidecar), TypeScript 5.x / NestJS 11 (Backend)
Primary Dependencies: FlagEmbedding>=1.2.0 (BGE-M3 + Reranker), @qdrant/js-client-rest, BullMQ 5, Ollama
Storage: Qdrant (vector DB), MariaDB 11.8 (metadata), Redis (job state)
Testing: Jest (backend unit + integration)
Target Platform: Docker on Desk-5439 (Windows 10, CPU RAM for BGE-M3)
Project Type: Web application (backend + sidecar)
Performance Goals: embed 50-page doc < 5 min; RAG query < 30s end-to-end
Constraints: BGE-M3 ~2.3GB + Reranker ~1.5GB on CPU RAM; BullMQ concurrency=1 (ai-batch)
Scale/Scope: Correspondence module only — embed เมื่อ status OUT_OF_DRAFT
Constitution Check
| Rule | Status | Note |
|---|---|---|
| ADR-019: ไม่มี parseInt บน UUID | ✅ Pass | ใช้ publicId string ตลอด |
| ADR-009: No TypeORM migrations | ✅ Pass | เพิ่ม rag_chunking prompt ผ่าน SQL delta |
| ADR-008: BullMQ สำหรับ background jobs | ✅ Pass | rag-prepare ผ่าน ai-batch queue |
| ADR-023A: AI boundary — ไม่ bypass queue | ✅ Pass | Controller → Queue → Processor → Sidecar |
ADR-023A: projectPublicId mandatory filter |
✅ Pass | enforce ใน AiQdrantService |
ADR-029: Prompt จาก ai_prompts DB |
✅ Pass | rag_chunking prompt type ใหม่ |
| ADR-007: Error handling layered | ✅ Pass | retry 3x ใน BullMQ + fallback chunking |
| ADR-016: CASL guard | ✅ Pass | ใช้ guard เดิมของ ai module |
Project Structure
Documentation (this feature)
specs/200-fullstacks/234-rag-pipeline-enhancements/
├── plan.md ← this file
├── spec.md
├── data-model.md ← Phase 1
├── contracts/ ← Phase 1
│ ├── POST-embed.md
│ └── POST-rerank.md
└── tasks.md ← Phase 2 (speckit-tasks)
Source Code
specs/04-Infrastructure-OPS/04-00-docker-compose/Desk-5439/ocr-sidecar/
├── app.py ← เพิ่ม /embed + /rerank + BGE-M3 init
└── requirements.txt ← เพิ่ม FlagEmbedding>=1.2.0
backend/src/modules/ai/
├── qdrant.service.ts ← Hybrid schema, vector size 1024, payload ครบ 10 fields
├── services/
│ └── embedding.service.ts ← semantic chunking + BGE-M3 via Sidecar
├── ai-rag.service.ts ← BGE-M3 embed + Reranker step
├── ai-queue.service.ts ← เพิ่ม enqueueRagPrepare()
└── processors/
└── ai-batch.processor.ts ← เพิ่ม case 'rag-prepare'
backend/src/modules/correspondence/
└── correspondence-workflow.service.ts ← trigger rag-prepare ใน syncStatus()
specs/03-Data-and-Storage/deltas/
└── 2026-06-05-add-rag-chunking-prompt.sql
Structure Decision: Web application — backend NestJS + Python sidecar; ไม่มี frontend changes ใน scope นี้
Phase 0: Research Findings
R1 — BGE-M3 Python API (FlagEmbedding)
from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False) # CPU mode
output = model.encode(['text'], return_dense=True, return_sparse=True)
# output['dense_vecs'] → list[float] ขนาด 1024
# output['lexical_weights'] → dict {token_id: float}
# แปลง sparse: indices = list(keys), values = list(values)
R2 — BGE-Reranker Python API
from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=False)
scores = reranker.compute_score([['query', chunk] for chunk in chunks])
# คืน list[float] — sort descending เพื่อได้ top-N
R3 — Qdrant Hybrid Collection (JS Client)
// drop + recreate
await client.deleteCollection('lcbp3_vectors');
await client.createCollection('lcbp3_vectors', {
vectors: { bge_dense: { size: 1024, distance: 'Cosine' } },
sparse_vectors: { bge_sparse: {} }
});
// upsert hybrid point
await client.upsert('lcbp3_vectors', { points: [{
id: uuid,
vector: { bge_dense: denseArray, bge_sparse: { indices, values } },
payload: { doc_public_id, project_public_id, doc_number, doc_type,
status_code, revision_number, subject, document_date,
chunk_topic, chunk_index, chunk_text }
}]});
// hybrid search (RRF fusion)
await client.query('lcbp3_vectors', {
prefetch: [
{ query: { indices, values }, using: 'bge_sparse', limit: 20 },
{ query: denseArray, using: 'bge_dense', limit: 20 },
],
query: { fusion: 'rrf' },
limit: 15,
filter: { must: [{ key: 'project_public_id', match: { value: projectId } }] }
});
R4 — OCR Text Cache
correspondence_revisions ไม่มี field เก็บ OCR text โดยตรง — rag-prepare job รับ cachedOcrText?: string ใน payload; ถ้าไม่มีให้เรียก Sidecar /ocr ผ่าน attachment path
Phase 1: Implementation Design
API Contracts
Sidecar — POST /embed
Request: { "text": string }
Response: { "dense": number[1024], "sparse": { "indices": number[], "values": number[] } }
Auth: X-API-Key header (ค่าเดิมจาก app.py)
Sidecar — POST /rerank
Request: { "query": string, "chunks": string[] }
Response: { "scores": number[], "ranked_indices": number[] }
Auth: X-API-Key header
Backend internal type — RagPrepareJobPayload
interface RagPrepareJobPayload {
documentPublicId: string;
projectPublicId: string;
correspondenceNumber: string;
docType: string;
statusCode: string;
revisionNumber: number;
subject: string;
documentDate?: string;
cachedOcrText?: string;
attachmentPath?: string;
}
Implementation Phases
Phase A — OCR Sidecar (ไม่กระทบ endpoints เดิม)
requirements.txt— เพิ่มFlagEmbedding>=1.2.0app.py— โหลด BGE-M3 + Reranker ตอน startup (global singleton, CPU)app.py— เพิ่มPOST /embedendpointapp.py— เพิ่มPOST /rerankendpoint
Phase B — AiQdrantService: Hybrid Schema
AI_VECTOR_SIZE= 1024 (เดิม 768)ensureCollection()→ drop + recreate Hybrid collectionupsert()→ รับdenseVector+sparseVector+ payload ครบ 11 fields (รวมchunk_text)search()/searchByProject()→ Hybrid query (RRF fusion)deleteByDocumentPublicId()→ filter บนdoc_public_idpayload field
Phase C — EmbeddingService: Semantic Chunking + BGE-M3
semanticChunkText(ocrText)method — call typhoon2.5 ด้วย promptrag_chunkingจากai_promptsparseChunkTags(llmOutput)— parse<chunk topic="...">tags, fallback fixed-sizeembedChunk(text)— เรียก SidecarPOST /embedแทน Ollama nomic
Phase D — AiQueueService + AiBatchProcessor
ai-queue.service.ts→enqueueRagPrepare(payload: RagPrepareJobPayload)ai-batch.processor.ts→ กำหนด concurrency = 1 ใน@Processorและเพิ่มcase 'rag-prepare': processRagPrepare(data)processRagPrepare()— OCR (cached/fallback) → chunk → normalize → embed → delete old → upsert
Phase E — AiRagService: Reranker Integration
embed(text)→ เรียก Sidecar/embedแทน Ollama- เพิ่ม rerank step หลัง
searchByProject()→ Sidecar/rerank→ top 3-5 chunks
Phase F — Trigger Hook
CorrespondenceWorkflowService→ injectAiQueueServicesyncStatus()→ หลัง save ถ้าtargetCode !== 'DRAFT'→enqueueRagPrepare()
Phase G — SQL Delta + Tests
deltas/2026-06-05-add-rag-chunking-prompt.sql— INSERTai_prompts(rag_chunking)- Unit tests:
embedding.service.spec.ts(semantic chunking, fallback) - Unit tests:
ai-batch.processor.spec.ts(rag-prepare case)
Risks & Mitigations
| Risk | Likelihood | Mitigation |
|---|---|---|
| BGE-M3 ใช้ RAM > 4GB บน Desk-5439 | Medium | ทดสอบ RAM ก่อน deploy; ใช้ use_fp16=False CPU mode |
| Qdrant drop collection → Chat Q&A unavailable ชั่วคราว | Low | deploy off-hours; Flow 4 return empty ไม่ error |
Semantic chunking ไม่มี <chunk> tag |
Medium | fallback fixed-size chunking ป้องกัน job fail |
syncStatus() trigger ซ้ำซ้อน |
Low | delete + re-embed เป็น idempotent |