refactor(ai): OCR sidecar canonical naming cleanup — typhoon→np-dms, remove hardcoded keys, asyncio.to_thread, ADR-040/041
CI / CD Pipeline / build (push) Successful in 7m37s
CI / CD Pipeline / deploy (push) Failing after 20m15s

This commit is contained in:
2026-06-20 16:37:04 +07:00
parent d418d791a4
commit a80ebef285
70 changed files with 5762 additions and 452 deletions
@@ -27,56 +27,77 @@
# - 2026-06-17: ลบชื่อ Typhoon ออกจากทุกส่วน: process_with_typhoon_ocr → process_ocr, FastAPI title, comments, ตัวแปรต่างๆ
# - 2026-06-17: เพิ่ม systemPrompt parameter ใน /ocr-upload, _process_pdf_doc, process_ocr เพื่อรองรับ dynamic OCR system prompt injection (T026-T028)
# - 2026-06-18: เพิ่ม MAX_SYSTEM_PROMPT_LENGTH environment variable สำหรับ configurable validation (fix-3)
# - 2026-06-20: ADR-040 Phase 1-4 — ลบ default API key, เพิ่ม path whitelist, และ wire adaptive OCR residency
# - 2026-06-20: ADR-040 Phase 6 — async I/O refactor: async process_ocr, AsyncClient via lifespan, asyncio.to_thread model loading
# - 2026-06-20: ADR-040 Phase 8 — ลบ /normalize endpoint (ไม่มี consumers) และ pythainlp imports
import os
import logging
import re
import base64
import json
import tempfile
import fitz # PyMuPDF (ใช้สำหรับ page count + fast-path text extraction)
import httpx
import asyncio
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
from PIL import Image
import io
from typhoon_ocr import prepare_ocr_messages # External library from SCB10X (PyPI) — provides OCR message preparation for np-dms-ocr
from services.vram_monitor import get_vram_headroom
from services.residency_policy import calculate_ocr_residency
from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Depends, Security, status
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from pythainlp.tokenize import word_tokenize
from pythainlp.util import normalize as thai_normalize
from FlagEmbedding import BGEM3FlagModel, FlagReranker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ocr-sidecar")
app = FastAPI(title="OCR Sidecar", version="2.0.0")
# Initialize BGE-M3 and Reranker singletons
bge_model = None
reranker = None
# Shared AsyncClient สำหรับ Ollama API (T043: สร้างใน lifespan context manager)
ollama_client: httpx.AsyncClient | None = None
@app.on_event("startup")
def load_bge_models():
global bge_model, reranker
def _load_bge_models() -> tuple:
"""โหลด BGE-M3 และ Reranker models บน CPU RAM (T046: เรียกผ่าน asyncio.to_thread)"""
logger.info("Loading BGE-M3 and Reranker models on CPU RAM...")
try:
# BGE-M3: BAAI/bge-m3, use_fp16=False for CPU
bge_model = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False)
# Reranker: BAAI/bge-reranker-large, use_fp16=False for CPU
reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=False)
bge = BGEM3FlagModel('BAAI/bge-m3', use_fp16=False)
rerank = FlagReranker('BAAI/bge-reranker-large', use_fp16=False)
logger.info("BGE-M3 and Reranker models loaded successfully.")
return bge, rerank
except Exception as e:
logger.error(f"Failed to load BGE models: {e}")
return None, None
@asynccontextmanager
async def lifespan(app_instance: FastAPI):
"""T043/T045: Lifespan context manager แทน @app.on_event('startup') — จัดการ AsyncClient และ model loading"""
global bge_model, reranker, ollama_client
# T043: สร้าง shared AsyncClient สำหรับ Ollama API
ollama_client = httpx.AsyncClient(timeout=OCR_TIMEOUT)
logger.info(f"Shared AsyncClient created (timeout={OCR_TIMEOUT}s)")
# T046: โหลด models ผ่าน asyncio.to_thread เพื่อไม่ block startup
bge_model, reranker = await asyncio.to_thread(_load_bge_models)
yield
# Cleanup: ปิด AsyncClient
if ollama_client:
await ollama_client.aclose()
logger.info("Shared AsyncClient closed.")
app = FastAPI(title="OCR Sidecar", version="2.0.0", lifespan=lifespan)
# กำหนดค่าโทเค็นความปลอดภัยของ Sidecar ตามข้อเสนอแนะในการรักษาความมั่นคงปลอดภัย
OCR_SIDECAR_API_KEY = os.getenv("OCR_SIDECAR_API_KEY", "lcbp3-dms-ocr-sidecar-secure-token-2026")
OCR_SIDECAR_API_KEY = os.getenv("OCR_SIDECAR_API_KEY")
if not OCR_SIDECAR_API_KEY:
raise RuntimeError("OCR_SIDECAR_API_KEY is required for OCR sidecar startup")
# กำหนดค่าความยาวสูงสุดของ systemPrompt (fix-3: configurable validation)
MAX_SYSTEM_PROMPT_LENGTH = int(os.getenv("MAX_SYSTEM_PROMPT_LENGTH", "10000"))
@@ -94,6 +115,8 @@ MAX_PAGES = int(os.getenv("OCR_MAX_PAGES", "0")) # 0 = ทุกหน้า
OLLAMA_API_URL = os.getenv("OLLAMA_API_URL", "http://host.docker.internal:11434")
OCR_MODEL = os.getenv("OCR_MODEL", "np-dms-ocr:latest")
OCR_TIMEOUT = int(os.getenv("OCR_TIMEOUT", "360")) # รองรับ cold-start ~65s + inference ~30s/page
OCR_SIDECAR_UPLOAD_BASE = os.getenv("OCR_SIDECAR_UPLOAD_BASE", "/mnt/uploads")
OCR_ACTIVE_PROFILE = os.getenv("OCR_ACTIVE_PROFILE")
logger.info(f"OCR Sidecar initialized (model={OCR_MODEL}, ollama={OLLAMA_API_URL})")
@@ -111,11 +134,29 @@ def filter_ocr_noise(text: str) -> str:
filtered.append(line)
return "\n".join(filtered)
def validate_pdf_path(pdf_path: str) -> Path:
"""Canonicalize path และยืนยันว่าอยู่ใต้ OCR_SIDECAR_UPLOAD_BASE"""
canonical_path = os.path.abspath(os.path.realpath(pdf_path))
canonical_base = os.path.abspath(os.path.realpath(OCR_SIDECAR_UPLOAD_BASE))
try:
common_path = os.path.commonpath([canonical_path, canonical_base])
except ValueError:
common_path = ""
if common_path != canonical_base:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Path outside whitelisted base directory",
)
return Path(canonical_path)
class OcrRequest(BaseModel):
pdfPath: str
maxPages: Optional[int] = None
engine: Optional[str] = None
keep_alive: Optional[int] = None
runtime_params: Optional[dict] = None
system_prompt: Optional[str] = None
dms_tags: Optional[dict] = None
class OcrResponse(BaseModel):
text: str
@@ -133,8 +174,18 @@ def health():
"ollamaUrl": OLLAMA_API_URL,
}
def _process_pdf_doc(doc: fitz.Document, selected_engine: str, max_pages: int, ocr_options: dict = {}, pdf_path: str | None = None, system_prompt: Optional[str] = None) -> OcrResponse:
async def _process_pdf_doc(
doc: fitz.Document,
selected_engine: str,
max_pages: int,
ocr_options: Optional[dict] = None,
pdf_path: str | None = None,
system_prompt: Optional[str] = None,
runtime_params: Optional[dict] = None,
dms_tags: Optional[dict] = None,
) -> OcrResponse:
"""ประมวลผล fitz.Document ด้วย engine ที่เลือก — shared logic สำหรับ /ocr และ /ocr-upload"""
ocr_options = ocr_options or {}
pages_to_process = list(range(min(len(doc), max_pages) if max_pages > 0 else len(doc)))
page_count = len(pages_to_process)
@@ -163,7 +214,16 @@ def _process_pdf_doc(doc: fitz.Document, selected_engine: str, max_pages: int, o
raise ValueError("ไม่สามารถหา PDF path — ต้องส่ง pdf_path เข้ามาด้วย")
ocr_text_parts = []
for i in pages_to_process:
ocr_text_parts.append(process_ocr(resolved_path, page_num=i + 1, options_override=ocr_options, system_prompt=system_prompt))
ocr_text_parts.append(
await process_ocr(
resolved_path,
page_num=i + 1,
options_override=ocr_options,
system_prompt=system_prompt,
runtime_params=runtime_params,
dms_tags=dms_tags,
)
)
ocr_text = filter_ocr_noise("\n".join(ocr_text_parts).strip())
return OcrResponse(
text=ocr_text,
@@ -180,7 +240,16 @@ def _process_pdf_doc(doc: fitz.Document, selected_engine: str, max_pages: int, o
raise ValueError("ไม่สามารถหา PDF path — ต้องส่ง pdf_path เข้ามาด้วย")
fallback_parts = []
for i in pages_to_process:
fallback_parts.append(process_ocr(resolved_path, page_num=i + 1, options_override=ocr_options, system_prompt=system_prompt))
fallback_parts.append(
await process_ocr(
resolved_path,
page_num=i + 1,
options_override=ocr_options,
system_prompt=system_prompt,
runtime_params=runtime_params,
dms_tags=dms_tags,
)
)
fallback_text = filter_ocr_noise("\n".join(fallback_parts).strip())
return OcrResponse(
text=fallback_text,
@@ -190,91 +259,162 @@ def _process_pdf_doc(doc: fitz.Document, selected_engine: str, max_pages: int, o
engineUsed="np-dms-ocr",
)
def process_ocr(pdf_path: str, page_num: int = 1, options_override: dict = {}, system_prompt: Optional[str] = None) -> str:
async def process_ocr(
pdf_path: str,
page_num: int = 1,
options_override: Optional[dict] = None,
system_prompt: Optional[str] = None,
runtime_params: Optional[dict] = None,
dms_tags: Optional[dict] = None,
) -> str:
"""เรียก np-dms-ocr ผ่าน Ollama /v1/chat/completions — รับ PDF path โดยตรง ไม่ต้องแปลง PIL Image"""
options_override = options_override or {}
if "keep_alive" in options_override:
raise ValueError("keep_alive must be calculated by OCR residency policy")
residency = await asyncio.to_thread(calculate_ocr_residency, OCR_ACTIVE_PROFILE)
model_name = OCR_MODEL
# prepare_ocr_messages จัดการ PDF → image ผ่าน poppler/pdftoppm ภายใน
messages = prepare_ocr_messages(pdf_path, task_type="structure", page_num=page_num)
# inject system prompt ถ้ามี (ก่อน DMS tags)
if system_prompt:
messages[0]["content"].append({"type": "text", "text": system_prompt})
# inject DMS-specific extraction tags ต่อท้าย content
messages[0]["content"].append({
"type": "text",
"text": (
# Dynamic dms_tags mapping to prompts
if dms_tags:
dms_text = "Additionally:\n"
for key in dms_tags.keys():
readable_name = re.sub(r'(?<!^)(?=[A-Z])|_', ' ', key).lower()
dms_text += f"- Wrap {readable_name} with <{key}>...</{key}>\n"
dms_text += "If a field is not found, omit the tag."
else:
# Fallback to default DMS extraction tags
dms_text = (
"Additionally:\n"
"- Wrap document number with <document_number>...</document_number>\n"
"- Wrap document date with <document_date>...</document_date>\n"
"- Wrap received date with <received_date>...</received_date>\n"
"If a field is not found, omit the tag."
),
)
# inject DMS-specific extraction tags ต่อท้าย content
messages[0]["content"].append({
"type": "text",
"text": dms_text,
})
# Resolve runtime parameters: remove hardcoded fallback values from sidecar
# Use empty dict if runtime_params not provided to allow Ollama Modelfile default
params = {}
if runtime_params:
if hasattr(runtime_params, "dict"):
params = runtime_params.dict()
elif isinstance(runtime_params, dict):
params = runtime_params
# Options override (e.g., from Sandbox form parameter overrides) takes precedence
merged_params = {}
if params:
merged_params.update(params)
if options_override:
merged_params.update(options_override)
# ค่า default ตาม official; options_override ยัง override ได้บางส่วน
logger.info(
f"OCR residency decision: keep_alive={residency.keep_alive_seconds}s "
f"reason={residency.reason} headroom={residency.vram_headroom_mb}MB"
)
payload = {
"model": model_name,
"messages": messages,
"max_tokens": 16000,
"stream": False,
"repetition_penalty": options_override.get("repeat_penalty", 1.2),
"temperature": options_override.get("temperature", 0.1),
"top_p": options_override.get("top_p", 0.6),
"keep_alive": options_override.get("keep_alive", 0), # Unload model ทันทีหลังเสร็จงานเพื่อคืน VRAM ให้ np-dms-ai ใช้งานได้
"keep_alive": residency.keep_alive_seconds,
}
# ใช้ Ollama OpenAI-compatible endpoint (/v1/chat/completions)
with httpx.Client(timeout=OCR_TIMEOUT) as client:
response = client.post(
f"{OLLAMA_API_URL}/v1/chat/completions",
json=payload,
headers={"Authorization": "Bearer ollama"},
# Only send keys to Ollama if they are defined in merged_params (to support Modelfile fallback)
if "temperature" in merged_params and merged_params["temperature"] is not None:
payload["temperature"] = float(merged_params["temperature"])
if "top_p" in merged_params and merged_params["top_p"] is not None:
payload["top_p"] = float(merged_params["top_p"])
if "repeat_penalty" in merged_params and merged_params["repeat_penalty"] is not None:
payload["repetition_penalty"] = float(merged_params["repeat_penalty"])
elif "repetition_penalty" in merged_params and merged_params["repetition_penalty"] is not None:
payload["repetition_penalty"] = float(merged_params["repetition_penalty"])
if "max_tokens" in merged_params and merged_params["max_tokens"] is not None:
payload["max_tokens"] = int(merged_params["max_tokens"])
# T044: ใช้ shared AsyncClient (ollama_client) แทน httpx.Client แบบ sync
# ถ้า ollama_client ยังไม่ถูกสร้าง (เช่น unit test ที่เรียกตรง) ให้สร้างชั่วคราว
client = ollama_client
if client is None:
client = httpx.AsyncClient(timeout=OCR_TIMEOUT)
response = await client.post(
f"{OLLAMA_API_URL}/v1/chat/completions",
json=payload,
headers={"Authorization": "Bearer ollama"},
)
response.raise_for_status()
data = response.json()
raw_text = str(data.get("choices", [{}])[0].get("message", {}).get("content", "")).strip()
# parse JSON output จาก model (format: {"natural_text": "..."})
try:
result_text = json.loads(raw_text).get("natural_text", raw_text)
except (json.JSONDecodeError, AttributeError):
result_text = raw_text
logger.info(
f"[DIAG] Ollama response — model={model_name} "
f"textLen={len(result_text)} "
f"done={data.get('done')} "
f"done_reason={data.get('done_reason')} "
f"eval_count={data.get('eval_count', 0)}"
)
if not result_text:
logger.warning(
f"[DIAG] Ollama returned empty response — full response keys: {list(data.keys())}"
)
response.raise_for_status()
data = response.json()
raw_text = str(data.get("choices", [{}])[0].get("message", {}).get("content", "")).strip()
# parse JSON output จาก model (format: {"natural_text": "..."})
try:
result_text = json.loads(raw_text).get("natural_text", raw_text)
except (json.JSONDecodeError, AttributeError):
result_text = raw_text
logger.info(
f"[DIAG] Ollama response — model={model_name} "
f"textLen={len(result_text)} "
f"done={data.get('done')} "
f"done_reason={data.get('done_reason')} "
f"eval_count={data.get('eval_count', 0)}"
)
if not result_text:
logger.warning(
f"[DIAG] Ollama returned empty response — full response keys: {list(data.keys())}"
)
return result_text
# ปิด temporary client ถ้าสร้างชั่วคราว
if ollama_client is None:
await client.aclose()
return result_text
@app.post("/ocr", response_model=OcrResponse, dependencies=[Depends(get_api_key)])
def ocr_extract(req: OcrRequest):
async def ocr_extract(req: OcrRequest):
"""OCR จาก path (legacy — ใช้เมื่อ sidecar และ backend เข้าถึง storage เดียวกัน)"""
pdf_path = Path(req.pdfPath)
if req.keep_alive is not None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="keep_alive is managed by OCR residency policy")
pdf_path = validate_pdf_path(req.pdfPath)
if not pdf_path.exists():
raise HTTPException(status_code=404, detail=f"ไม่พบไฟล์: {req.pdfPath}")
selected_engine = (req.engine or "auto").strip().lower()
max_pages = req.maxPages or MAX_PAGES
ocr_options = {}
if req.keep_alive is not None:
ocr_options["keep_alive"] = req.keep_alive
try:
doc = fitz.open(str(pdf_path))
except Exception as e:
raise HTTPException(status_code=422, detail=f"เปิดไฟล์ PDF ล้มเหลว: {e}")
return _process_pdf_doc(doc, selected_engine, max_pages, ocr_options)
return await _process_pdf_doc(
doc,
selected_engine,
max_pages,
ocr_options,
pdf_path=str(pdf_path),
system_prompt=req.system_prompt,
runtime_params=req.runtime_params,
dms_tags=req.dms_tags,
)
@app.post("/ocr-upload", response_model=OcrResponse, dependencies=[Depends(get_api_key)])
def ocr_upload(
async def ocr_upload(
file: UploadFile = File(...),
engine: str = Form(default="auto"),
maxPages: int = Form(default=0),
temperature: Optional[float] = Form(default=None),
topP: Optional[float] = Form(default=None),
repeatPenalty: Optional[float] = Form(default=None),
maxTokens: Optional[int] = Form(default=None),
keep_alive: Optional[int] = Form(default=None),
systemPrompt: Optional[str] = Form(default=None),
dmsTags: Optional[str] = Form(default=None),
runtimeParams: Optional[str] = Form(default=None),
):
"""OCR จาก multipart file upload — ไม่ต้องการ shared volume mount"""
# Validate systemPrompt ถ้ามีส่งมา (gap-1: sidecar validation)
@@ -292,6 +432,22 @@ def ocr_upload(
)
selected_engine = engine.strip().lower()
max_pages = maxPages or MAX_PAGES
# Parse runtimeParams and dmsTags from form-data JSON strings if provided
runtime_params_dict = {}
if runtimeParams:
try:
runtime_params_dict = json.loads(runtimeParams)
except Exception as e:
logger.warning(f"Failed to parse runtimeParams JSON: {e}")
dms_tags_dict = None
if dmsTags:
try:
dms_tags_dict = json.loads(dmsTags)
except Exception as e:
logger.warning(f"Failed to parse dmsTags JSON: {e}")
# รวม options override สำหรับ np-dms-ocr (ถ้า frontend ส่งมา)
ocr_options: dict = {}
if temperature is not None:
@@ -300,10 +456,11 @@ def ocr_upload(
ocr_options["top_p"] = topP
if repeatPenalty is not None:
ocr_options["repeat_penalty"] = repeatPenalty
if maxTokens is not None:
ocr_options["max_tokens"] = maxTokens
if keep_alive is not None:
ocr_options["keep_alive"] = keep_alive
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="keep_alive is managed by OCR residency policy")
pdf_bytes = file.file.read()
import tempfile
tmp_pdf_path: str | None = None
try:
# บันทึก PDF เป็น temp file เพื่อให้ prepare_ocr_messages อ่านได้ผ่าน path
@@ -315,29 +472,20 @@ def ocr_upload(
except Exception as e:
raise HTTPException(status_code=422, detail=f"เปิดไฟล์ PDF ล้มเหลว: {e}")
logger.info(f"OCR upload: {file.filename} engine={selected_engine} options={ocr_options or 'modelfile-defaults'}")
return _process_pdf_doc(doc, selected_engine, max_pages, ocr_options, pdf_path=tmp_pdf_path, system_prompt=systemPrompt)
return await _process_pdf_doc(
doc,
selected_engine,
max_pages,
ocr_options,
pdf_path=tmp_pdf_path,
system_prompt=systemPrompt,
runtime_params=runtime_params_dict,
dms_tags=dms_tags_dict,
)
finally:
if tmp_pdf_path:
Path(tmp_pdf_path).unlink(missing_ok=True)
class NormalizeRequest(BaseModel):
text: str
class NormalizeResponse(BaseModel):
normalized: str
@app.post("/normalize", response_model=NormalizeResponse, dependencies=[Depends(get_api_key)])
def normalize_text(req: NormalizeRequest):
"""Normalize Thai text ด้วย PyThaiNLP สำหรับ rag-thai-preprocess queue"""
try:
# normalize unicode + ตัดคำแล้วต่อกลับด้วย space เพื่อ embedding
normalized = thai_normalize(req.text)
tokens = word_tokenize(normalized, engine="newmm", keep_whitespace=False)
result = " ".join(tokens)
return NormalizeResponse(normalized=result)
except Exception as e:
logger.warning(f"Thai normalize failed, returning raw text: {e}")
return NormalizeResponse(normalized=req.text)
class EmbedRequest(BaseModel):
text: str
@@ -362,7 +510,7 @@ async def embed_text(req: EmbedRequest):
raise HTTPException(status_code=503, detail="BGE-M3 model not loaded")
threshold_mb = float(os.getenv("VRAM_HEADROOM_THRESHOLD_MB", "3000.0"))
timeout_sec = float(os.getenv("RETRIEVAL_TIMEOUT_SECONDS", "30.0"))
headroom = get_vram_headroom()
headroom = await asyncio.to_thread(get_vram_headroom)
device = "cuda"
reason = "headroom-sufficient"
if not headroom.query_success:
@@ -427,7 +575,7 @@ async def rerank_chunks(req: RerankRequest):
return RerankResponse(scores=[], ranked_indices=[], device="cpu")
threshold_mb = float(os.getenv("VRAM_HEADROOM_THRESHOLD_MB", "3000.0"))
timeout_sec = float(os.getenv("RETRIEVAL_TIMEOUT_SECONDS", "30.0"))
headroom = get_vram_headroom()
headroom = await asyncio.to_thread(get_vram_headroom)
device = "cuda"
reason = "headroom-sufficient"
if not headroom.query_success: