Files
lcbp3/specs/200-fullstacks/234-rag-pipeline-enhancements/plan.md
T
admin 26cc71ce60
CI / CD Pipeline / build (push) Successful in 4m54s
CI / CD Pipeline / deploy (push) Successful in 6m19s
690605:2335 ADR-035-135 #1
2026-06-05 23:35:22 +07:00

9.5 KiB

// File: specs/200-fullstacks/234-rag-pipeline-enhancements/plan.md // Change Log: // - 2026-06-05: Initial implementation plan for RAG Pipeline Enhancements

Implementation Plan: RAG Pipeline Enhancements

Branch: 234-rag-pipeline-enhancements | Date: 2026-06-05 | Spec: spec.md ADR Reference: ADR-035


Summary

เพิ่ม BGE-M3 embedding + BGE-Reranker-Large + Semantic Chunking เข้า OCR Sidecar, แปลง Qdrant collection lcbp3_vectors เป็น Hybrid (1024 dims), และ wire RAG Prep trigger ที่ syncStatus() เมื่อเอกสาร Correspondence ผ่าน DRAFT → SUBOWN

แนวทาง: เพิ่ม /embed + /rerank ใน app.py → refactor EmbeddingService + AiQdrantService → เพิ่ม rag-prepare case ใน AiBatchProcessor → hook trigger ใน CorrespondenceWorkflowService


Technical Context

Language/Version: Python 3.11 (Sidecar), TypeScript 5.x / NestJS 11 (Backend) Primary Dependencies: FlagEmbedding>=1.2.0 (BGE-M3 + Reranker), @qdrant/js-client-rest, BullMQ 5, Ollama Storage: Qdrant (vector DB), MariaDB 11.8 (metadata), Redis (job state) Testing: Jest (backend unit + integration) Target Platform: Docker on Desk-5439 (Windows 10, CPU RAM for BGE-M3) Project Type: Web application (backend + sidecar) Performance Goals: embed 50-page doc < 5 min; RAG query < 30s end-to-end Constraints: BGE-M3 ~2.3GB + Reranker ~1.5GB on CPU RAM; BullMQ concurrency=1 (ai-batch) Scale/Scope: Correspondence module only — embed เมื่อ status OUT_OF_DRAFT


Constitution Check

Rule Status Note
ADR-019: ไม่มี parseInt บน UUID Pass ใช้ publicId string ตลอด
ADR-009: No TypeORM migrations Pass เพิ่ม rag_chunking prompt ผ่าน SQL delta
ADR-008: BullMQ สำหรับ background jobs Pass rag-prepare ผ่าน ai-batch queue
ADR-023A: AI boundary — ไม่ bypass queue Pass Controller → Queue → Processor → Sidecar
ADR-023A: projectPublicId mandatory filter Pass enforce ใน AiQdrantService
ADR-029: Prompt จาก ai_prompts DB Pass rag_chunking prompt type ใหม่
ADR-007: Error handling layered Pass retry 3x ใน BullMQ + fallback chunking
ADR-016: CASL guard Pass ใช้ guard เดิมของ ai module

Project Structure

Documentation (this feature)

specs/200-fullstacks/234-rag-pipeline-enhancements/
├── plan.md          ← this file
├── spec.md
├── data-model.md    ← Phase 1
├── contracts/       ← Phase 1
│   ├── POST-embed.md
│   └── POST-rerank.md
└── tasks.md         ← Phase 2 (speckit-tasks)

Source Code

specs/04-Infrastructure-OPS/04-00-docker-compose/Desk-5439/ocr-sidecar/
├── app.py                    ← เพิ่ม /embed + /rerank + BGE-M3 init
└── requirements.txt          ← เพิ่ม FlagEmbedding>=1.2.0

backend/src/modules/ai/
├── qdrant.service.ts         ← Hybrid schema, vector size 1024, payload ครบ 10 fields
├── services/
│   └── embedding.service.ts  ← semantic chunking + BGE-M3 via Sidecar
├── ai-rag.service.ts         ← BGE-M3 embed + Reranker step
├── ai-queue.service.ts       ← เพิ่ม enqueueRagPrepare()
└── processors/
    └── ai-batch.processor.ts ← เพิ่ม case 'rag-prepare'

backend/src/modules/correspondence/
└── correspondence-workflow.service.ts  ← trigger rag-prepare ใน syncStatus()

specs/03-Data-and-Storage/deltas/
└── 2026-06-05-add-rag-chunking-prompt.sql

Structure Decision: Web application — backend NestJS + Python sidecar; ไม่มี frontend changes ใน scope นี้


Phase 0: Research Findings

R1 — BGE-M3 Python API (FlagEmbedding)

from FlagEmbedding import BGEM3FlagModel
model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False)  # CPU mode
output = model.encode(['text'], return_dense=True, return_sparse=True)
# output['dense_vecs']      → list[float] ขนาด 1024
# output['lexical_weights'] → dict {token_id: float}
# แปลง sparse: indices = list(keys), values = list(values)

R2 — BGE-Reranker Python API

from FlagEmbedding import FlagReranker
reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=False)
scores = reranker.compute_score([['query', chunk] for chunk in chunks])
# คืน list[float] — sort descending เพื่อได้ top-N

R3 — Qdrant Hybrid Collection (JS Client)

// drop + recreate
await client.deleteCollection('lcbp3_vectors');
await client.createCollection('lcbp3_vectors', {
  vectors: { bge_dense: { size: 1024, distance: 'Cosine' } },
  sparse_vectors: { bge_sparse: {} }
});

// upsert hybrid point
await client.upsert('lcbp3_vectors', { points: [{
  id: uuid,
  vector: { bge_dense: denseArray, bge_sparse: { indices, values } },
  payload: { doc_public_id, project_public_id, doc_number, doc_type,
             status_code, revision_number, subject, document_date,
             chunk_topic, chunk_index, chunk_text }
}]});

// hybrid search (RRF fusion)
await client.query('lcbp3_vectors', {
  prefetch: [
    { query: { indices, values }, using: 'bge_sparse', limit: 20 },
    { query: denseArray,          using: 'bge_dense',  limit: 20 },
  ],
  query: { fusion: 'rrf' },
  limit: 15,
  filter: { must: [{ key: 'project_public_id', match: { value: projectId } }] }
});

R4 — OCR Text Cache

correspondence_revisions ไม่มี field เก็บ OCR text โดยตรง — rag-prepare job รับ cachedOcrText?: string ใน payload; ถ้าไม่มีให้เรียก Sidecar /ocr ผ่าน attachment path


Phase 1: Implementation Design

API Contracts

Sidecar — POST /embed

Request:  { "text": string }
Response: { "dense": number[1024], "sparse": { "indices": number[], "values": number[] } }
Auth:     X-API-Key header (ค่าเดิมจาก app.py)

Sidecar — POST /rerank

Request:  { "query": string, "chunks": string[] }
Response: { "scores": number[], "ranked_indices": number[] }
Auth:     X-API-Key header

Backend internal type — RagPrepareJobPayload

interface RagPrepareJobPayload {
  documentPublicId: string;
  projectPublicId: string;
  correspondenceNumber: string;
  docType: string;
  statusCode: string;
  revisionNumber: number;
  subject: string;
  documentDate?: string;
  cachedOcrText?: string;
  attachmentPath?: string;
}

Implementation Phases

Phase A — OCR Sidecar (ไม่กระทบ endpoints เดิม)

  1. requirements.txt — เพิ่ม FlagEmbedding>=1.2.0
  2. app.py — โหลด BGE-M3 + Reranker ตอน startup (global singleton, CPU)
  3. app.py — เพิ่ม POST /embed endpoint
  4. app.py — เพิ่ม POST /rerank endpoint

Phase B — AiQdrantService: Hybrid Schema

  1. AI_VECTOR_SIZE = 1024 (เดิม 768)
  2. ensureCollection() → drop + recreate Hybrid collection
  3. upsert() → รับ denseVector + sparseVector + payload ครบ 11 fields (รวม chunk_text)
  4. search() / searchByProject() → Hybrid query (RRF fusion)
  5. deleteByDocumentPublicId() → filter บน doc_public_id payload field

Phase C — EmbeddingService: Semantic Chunking + BGE-M3

  1. semanticChunkText(ocrText) method — call typhoon2.5 ด้วย prompt rag_chunking จาก ai_prompts
  2. parseChunkTags(llmOutput) — parse <chunk topic="..."> tags, fallback fixed-size
  3. embedChunk(text) — เรียก Sidecar POST /embed แทน Ollama nomic

Phase D — AiQueueService + AiBatchProcessor

  1. ai-queue.service.tsenqueueRagPrepare(payload: RagPrepareJobPayload)
  2. ai-batch.processor.ts → กำหนด concurrency = 1 ใน @Processor และเพิ่ม case 'rag-prepare': processRagPrepare(data)
  3. processRagPrepare() — OCR (cached/fallback) → chunk → normalize → embed → delete old → upsert

Phase E — AiRagService: Reranker Integration

  1. embed(text) → เรียก Sidecar /embed แทน Ollama
  2. เพิ่ม rerank step หลัง searchByProject() → Sidecar /rerank → top 3-5 chunks

Phase F — Trigger Hook

  1. CorrespondenceWorkflowService → inject AiQueueService
  2. syncStatus() → หลัง save ถ้า targetCode !== 'DRAFT'enqueueRagPrepare()

Phase G — SQL Delta + Tests

  1. deltas/2026-06-05-add-rag-chunking-prompt.sql — INSERT ai_prompts (rag_chunking)
  2. Unit tests: embedding.service.spec.ts (semantic chunking, fallback)
  3. Unit tests: ai-batch.processor.spec.ts (rag-prepare case)

Risks & Mitigations

Risk Likelihood Mitigation
BGE-M3 ใช้ RAM > 4GB บน Desk-5439 Medium ทดสอบ RAM ก่อน deploy; ใช้ use_fp16=False CPU mode
Qdrant drop collection → Chat Q&A unavailable ชั่วคราว Low deploy off-hours; Flow 4 return empty ไม่ error
Semantic chunking ไม่มี <chunk> tag Medium fallback fixed-size chunking ป้องกัน job fail
syncStatus() trigger ซ้ำซ้อน Low delete + re-embed เป็น idempotent