diff --git a/specs/03-Data-and-Storage/03-07-OpenRAG.md b/specs/03-Data-and-Storage/03-07-OpenRAG.md new file mode 100644 index 0000000..1527c86 --- /dev/null +++ b/specs/03-Data-and-Storage/03-07-OpenRAG.md @@ -0,0 +1,931 @@ +# 03-07: RAG (Retrieval-Augmented Generation) — Future Architecture Spec + +**Document ID:** DMS-RAG-001 +**Status:** Draft / Exploratory +**Version:** 1.8.1 +**Date:** 2026-03-13 +**Related Documents:** +- [ADR-017: Ollama Data Migration](../06-Decision-Records/ADR-017-ollama-data-migration.md) +- [ADR-018: AI Boundary Hardening](../06-Decision-Records/Patch%201.8.1.md) +- [n8n Migration Setup Guide](./03-05-n8n-migration-setup-guide.md) +- [Legacy Data Migration](./03-04-legacy-data-migration.md) +- [OpenRAG (openr.ag)](https://www.openr.ag/) — IBM open-source RAG: Docling + OpenSearch + Langflow + +> ⚠️ **หมายเหตุ:** เอกสารนี้ออกแบบ RAG Pipeline **2 ส่วน**: +> 1. **OpenRAG (Extraction Phase)** — ทำหน้าที่ "พนักงานคัดกรองข้อมูล" อ่าน PDF ทั้ง Folder แล้วเขียน JSON ลง `rag-output/` บน Shared NAS +> 2. **n8n + Ollama + Elasticsearch (Integration & Search Phase)** — Poll ไฟล์ JSON จาก `rag-output/` ทีละไฟล์ แล้วนำเข้า DMS +> +> ทั้งหมดทำงาน **On-Premise** เท่านั้น — ไม่ส่งข้อมูลออกนอกเครือข่าย (ADR-018 AI Isolation) +> +> **Integration Model: File-based Queue (Pull)** +> - Admin Desktop mount `R:\` (Drive Letter) → QNAP NAS Shared Folder (`staging_ai`) +> - OpenRAG เขียน JSON ลง `R:\staging_ai\rag-output\` → n8n อ่านจาก `staging_ai/rag-output/` +> - **ไม่มี HTTP ระหว่าง OpenRAG กับ n8n** — NAS Folder เป็น Shared Queue + +--- + +## 🎯 วัตถุประสงค์ (Objective) + +เพิ่มความสามารถ **Semantic Search และ Document Q&A** ให้กับระบบ DMS โดยใช้ Infrastructure ที่มีอยู่แล้ว: +- ไม่ส่งข้อมูลออกนอกเครือข่ายองค์กร (Data Privacy) +- ไม่มีค่าใช้จ่ายต่อ Query (Zero Cost) +- ต่อยอดจากสถาปัตยกรรม Migration ที่ผ่าน Validate แล้ว (ADR-017/018) + +--- + +## 🏗️ สถาปัตยกรรม Infrastructure (Binding) + +ตาม Patch 1.8.1 (ADR-018) Infrastructure Layout ที่กำหนดไว้: + +| Component | Host | บทบาทใน RAG Pipeline | +| ---------------------- | ------------- | ------------------------------------------------- | +| **OpenRAG** (Docling + OpenSearch + Langflow) | Admin Desktop | **Phase 0: Extraction** — สกัด Metadata + Text จาก PDF เป็น JSON | +| **Tika** (Fallback OCR) | QNAP | สกัดข้อความจาก PDF กรณีไม่ใช้ OpenRAG หรือ Fallback | +| **Elasticsearch 8.11** | QNAP | Vector Store + Full-text Index | +| **n8n** | QNAP | Orchestrator — Poll JSON จาก `rag-output/` (ทีละไฟล์) แล้วนำเข้า DMS | +| **DMS Backend (NestJS)**| QNAP | API Gateway — รับ Query / ส่งผล / บันทึก Metadata | +| **Ollama** | Admin Desktop | AI Inference (Embedding + Generate) บน RTX 2060 SUPER | +| **MariaDB 11.8** | QNAP | Document Metadata (Authoritative DB) | +| **Redis 7.2** | QNAP | Cache (Query Result Cache) | + +> ⛔ **ข้อห้าม (ADR-018):** OpenRAG และ Ollama **ห้ามอยู่บน QNAP** และห้ามเข้า DB โดยตรง +> ✅ OpenRAG เขียนผล JSON ลง `rag-output/` บน Shared NAS (R:\ บน Admin Desktop = `staging_ai` บน QNAP) + +--- + +## 🔄 RAG Data Flow (4 Phase) + +--- + +### Phase 0: OpenRAG — Batch Extraction Phase ("พนักงานคัดกรองข้อมูล") + +OpenRAG ทำงานบน **Admin Desktop** อ่าน PDF ทั้ง Folder แล้วเขียน JSON ทีละไฟล์ลง Shared NAS: + +``` + R:\staging_ai\*.pdf (Admin Desktop — Network Drive จาก QNAP) + │ + ▼ + ┌───────────────────────────────────────────────────────────┐ + │ OpenRAG Langflow Batch Runner (Admin Desktop) │ + │ │ + │ [Loop Folder Component] │ + │ สำหรับแต่ละ .pdf ใน R:\staging_ai\ │ + │ │ │ + │ ▼ │ + │ [Docling Component] ← Parse PDF Structure │ + │ │ │ + │ ▼ │ + │ [Ollama LLM Component] ← Extract Metadata → JSON │ + │ │ │ + │ ▼ │ + │ [File Write Component] │ + │ เขียน JSON → R:\staging_ai\rag-output\.json │ + │ (Skip ถ้า .json ไฟล์นั้นมีอยู่แล้ว — Idempotent) │ + └───────────────────────────────────────────────────────────┘ + + ────────────── Shared NAS Folder (staging_ai) ────────────── + + staging_ai/ + ├── rag-output/ + │ ├── TCC-COR-2024-001.json ← OpenRAG เขียน + │ ├── TCC-COR-2024-002.json + │ └── ... ← n8n อ่านทีละไฟล์ + ├── TCC-COR-2024-001.pdf + └── ... + + ────────────── n8n บน QNAP (Schedule Trigger) ────────────── + + [n8n: Schedule Trigger ทุก 5 นาที] + → อ่าน staging_ai/rag-output/*.json ทีละ 1 ไฟล์ + → Process → เปลี่ยนชื่อเป็น .done (หรือ ลบ) + → Loop ต่อจนหมด Queue +``` + +**JSON Output Contract (เขียนลง `rag-output/.json`):** + +```json +{ + "source_file": "TCC-COR-2024-001.pdf", + "processed_at": "2026-03-13T10:00:00+07:00", + "is_valid": true, + "confidence": 0.91, + "extracted_text": "เนื้อหาเต็มของเอกสาร...", + "metadata": { + "correspondence_number": "TCC-COR-2024-001", + "title": "ส่งแบบ Shop Drawing งวดที่ 3", + "document_date": "2024-03-15", + "sender_org": "TCC", + "receiver_org": "LCB", + "project_code": "LCBP3", + "suggested_category": "Correspondence", + "detected_issues": [] + }, + "chunks": [ + { "chunk_index": 0, "page": 1, "text": "..." }, + { "chunk_index": 1, "page": 2, "text": "..." } + ] +} +``` + +> ✅ **File Naming Convention:** `.json` +> ตัวอย่าง: `TCC-COR-2024-001.pdf` → `TCC-COR-2024-001.json` +> +> ✅ **Idempotency:** ถ้า `.json` ไฟล์นั้นมีอยู่แล้ว → Skip (ไม่ Process ซ้ำ) +> เพิ่ม field `processed_at` เพื่อ debug ว่า Extract เมื่อไหร่ +> +> ⚠️ **Constraint (ADR-018):** OpenRAG ไม่มีสิทธิ์เข้า MariaDB +> เขียนได้เฉพาะใน `rag-output/` เท่านั้น — ไม่แตะ PDF ต้นฉบับ + +--- + +### Phase 1: n8n Integration — Poll JSON จาก rag-output/ แล้ว Import เข้า DMS + +n8n ทำงานแบบ **Pull (Schedule-based)** — ดึง JSON ทีละไฟล์จาก Shared NAS: + +``` +[n8n: Schedule Trigger ทุก 5 นาที] + │ + ├─── List Files: staging_ai/rag-output/*.json + │ (กรอง: ไม่รวม *.done, *.error) + │ + ├─── [ถ้าไม่มีไฟล์] → หยุด (รอรอบถัดไป) + │ + └─── Loop ทีละ 1 ไฟล์: + │ + ├─── อ่านไฟล์ JSON + ├─── Validate JSON Schema (is_valid, confidence, required fields) + │ + ├─── Confidence Router (ตาม ADR-017) + │ ≥ 0.85 → Auto Ingest via POST /api/migration/import + │ 0.60–0.84 → INSERT migration_review_queue (รอ Human Approve) + │ < 0.60 → Rename → .error (Log เหตุผล) + │ + ├─── [Auto Ingest Path] + │ POST /api/migration/import + │ Header: Idempotency-Key: {correspondence_number}:{batch_id} + │ Body: metadata + source_file_path + │ → Backend StorageService ย้ายไฟล์จาก staging_ai → uploads/YYYY/MM/ + │ + ├─── [สำเร็จ] Rename: .json → .done + ├─── [ล้มเหลว] Rename: .json → .error + │ + └─── [Checkpoint] บันทึก migration_progress ทุก 10 records +``` + +> 📁 **File State Machine ใน `rag-output/`:** +> +> | สถานะ | Filename | ความหมาย | +> |-------|----------|----------| +> | Pending | `TCC-COR-001.json` | รอ n8n ดึงไป Process | +> | Done | `TCC-COR-001.done` | นำเข้า DMS สำเร็จ | +> | Error | `TCC-COR-001.error` | ล้มเหลว — รอ Manual Review | + +### Phase 2: Indexing Pipeline — สร้าง Vector Index ใน Elasticsearch + +``` +PDF ที่ Import แล้ว (อยู่ใน uploads/) + │ + ▼ +[n8n Workflow: RAG Indexer] + │ + ├─── ใช้ Chunks จาก OpenRAG JSON โดยตรง (ไม่ต้อง OCR ซ้ำ) + │ หรือ Fallback: [Tika OCR] กรณีไม่มี chunks + │ + ├─── [Ollama: Embedding] + │ POST http://:11434/api/embeddings + │ Model: nomic-embed-text + │ + └─── [Elasticsearch: Index Chunk] + index: dms_rag_chunks + fields: doc_id, chunk_text, embedding, page_num, metadata +``` + +### Phase 2: Query Pipeline (Real-time) + +``` +User Query (จาก DMS Frontend) + │ + ▼ +[DMS Backend: RAG Controller] + │ GET /api/rag/search?q=...&project_id=... + │ + ├─── Check Redis Cache (TTL 5 นาที) + │ + └─── [n8n Webhook: RAG Query] + │ + ├─── [Ollama: Query Embedding] → Vector ของ Query + │ + ├─── [Elasticsearch: kNN Search] + │ └─ Top-5 Chunks ที่เกี่ยวข้อง + RBAC Filter (project_id, user_id) + │ + ├─── [Ollama: Generate Answer] + │ └─ Prompt: System + Context Chunks + User Query + │ Output: JSON { answer, sources, confidence } + │ + └─── [DMS Backend] → ส่งผลกลับ + Cache ใน Redis +``` + +### Phase 3: Response Contract + +```json +{ + "answer": "เอกสาร RFA-2026-001 ถูก Approve โดย...", + "sources": [ + { + "doc_id": "uuid-xxxx", + "doc_number": "RFA-2026-001", + "page": 3, + "excerpt": "...ข้อความที่ตัดมา...", + "confidence": 0.91 + } + ], + "confidence": 0.87, + "cached": false +} +``` + +--- + +## 📐 Elasticsearch Index Schema + +```json +PUT /dms_rag_chunks +{ + "mappings": { + "properties": { + "doc_id": { "type": "keyword" }, + "doc_number": { "type": "keyword" }, + "project_id": { "type": "keyword" }, + "chunk_text": { "type": "text", "analyzer": "thai" }, + "embedding": { "type": "dense_vector", "dims": 768 }, + "page_num": { "type": "integer" }, + "chunk_index": { "type": "integer" }, + "created_at": { "type": "date" } + } + } +} +``` + +> ⚠️ **ขนาด Embedding Vector:** ขึ้นอยู่กับ Model ที่ใช้ +> - `nomic-embed-text`: 768 dims +> - `llama3.2:3b` (ใช้ layer สุดท้าย): 3072 dims +> ต้องทดสอบ Performance บน RTX 2060 SUPER 8GB ก่อนเลือก + +--- + +## 🛡️ Security & RBAC (สำคัญ) + +- Query **ต้องผ่าน DMS API** — Ollama ไม่รับ Request โดยตรงจาก Frontend +- Elasticsearch Search **ต้องมี Filter** ด้วย `project_id` และ `permission_scope` เสมอ +- ผล RAG **ต้องไม่เปิดเผยเอกสาร** ที่ User ไม่มีสิทธิ์เห็น (CASL Enforcement ที่ Backend Layer) +- Cache Key ใน Redis ต้องรวม `user_id` หรือ `role` เพื่อป้องกัน Cross-user Cache Poisoning + +--- + +## ⚙️ n8n Workflow: OpenRAG Ingestor (Node Overview) + +Poll ไฟล์ JSON จาก Shared NAS ทีละไฟล์ แล้วนำข้อมูลเข้า DMS: + +| Node | ชื่อ | หน้าที่ | +|------|------|----------| +| 0 | Schedule Trigger | ทำงานทุก 5 นาที (หรือ Manual Trigger) | +| 1 | List JSON Files | อ่านรายการ `staging_ai/rag-output/*.json` (กรอง .done/.error) | +| 2 | Loop Items | วนลูปทีละ 1 ไฟล์ | +| 3 | Read JSON File | อ่านเนื้อหา JSON จาก NAS | +| 4 | JSON Schema Validator | ตรวจสอบ field ครบ + ค่า is_valid | +| 5 | Confidence Router | แยก Auto / Review / Reject ตาม Threshold | +| 6A | Auto Ingest | POST `/api/migration/import` พร้อม Idempotency-Key | +| 6B | Review Queue | INSERT `migration_review_queue` เท่านั้น | +| 6C | Rename to .error | Rename ไฟล์ → `.error` + บันทึกเหตุผล | +| 7 | Rename to .done | Rename ไฟล์ → `.done` (กรณีสำเร็จ) | +| 8 | Save Checkpoint | UPDATE `migration_progress` ทุก 10 records | + +--- + +## ⚙️ n8n Workflow: RAG Indexer (Node Overview) + +Index Chunks (จาก OpenRAG JSON หรือ Tika Fallback) เข้า Elasticsearch: + +| Node | ชื่อ | หน้าที่ | +|------|------|----------| +| 0 | Webhook / Schedule Trigger | รับ `doc_id` ที่นำเข้าแล้ว หรือ Batch รายคืน | +| 1 | Fetch Chunks | ดึง chunks จาก OpenRAG JSON หรือเรียก Tika Fallback | +| 2 | Tika OCR (Fallback) | POST `http://tika:9998/tika` กรณีไม่มี chunks จาก OpenRAG | +| 3 | Ollama Embeddings | POST `http://:11434/api/embeddings` | +| 4 | Elasticsearch Ingest | Bulk Index Chunks เข้า `dms_rag_chunks` | +| 5 | Update DMS Index Status | PATCH `/api/documents/{id}` ตั้ง `is_indexed: true` | + +--- + +## ⚙️ n8n Workflow: RAG Query (Node Overview) + +| Node | ชื่อ | หน้าที่ | +|------|------|----------| +| 0 | Webhook | รับ `{ query, project_id, user_id, top_k }` จาก Backend | +| 1 | Ollama: Embed Query | แปลง Query เป็น Vector | +| 2 | Elasticsearch: kNN Search | ค้นหา Top-k Chunks พร้อม RBAC Filter | +| 3 | Build RAG Prompt | รวม Context Chunks + System Prompt + User Query | +| 4 | Ollama: Generate | สร้างคำตอบ, Output JSON เท่านั้น | +| 5 | Return to Backend | Respond Webhook พร้อม `{ answer, sources, confidence }` | + +--- + +## 📏 Confidence & Hallucination Guard + +| ระดับ Confidence | การดำเนินการ | +|-----------------|--------------| +| `>= 0.80` | แสดงผลทันที พร้อม Sources | +| `0.60 – 0.79` | แสดงผลพร้อม Warning "โปรดตรวจสอบเอกสารต้นฉบับ" | +| `< 0.60` | ไม่แสดงคำตอบ — แสดงเฉพาะ Document Links ที่เกี่ยวข้อง | + +> AI ไม่มีสิทธิ์ Write ข้อมูลใดๆ — Output เป็น JSON Read-only เสมอ (ADR-018) + +--- + +## 🚧 ข้อจำกัดและความเสี่ยง + +| ความเสี่ยง | ผลกระทบ | Mitigation | +|-----------|----------|------------| +| NAS Drive R: disconnect ขณะ OpenRAG รัน | เขียน JSON ไม่ได้ | Langflow ตรวจ Drive ก่อนเริ่ม Loop — แจ้งเตือนถ้า mount หาย | +| ไฟล์ JSON เขียนไม่สมบูรณ์ (crash กลางคัน) | n8n อ่าน JSON เสีย | n8n ตรวจ JSON valid ก่อน Process — Rename → .error | +| OpenRAG Process PDF ซ้ำ (Retry) | JSON เขียนทับ | Skip ถ้า `.json` มีอยู่แล้ว (Idempotent by filename) | +| n8n อ่านไฟล์ขณะ OpenRAG ยังเขียนไม่เสร็จ | JSON ไม่สมบูรณ์ | OpenRAG เขียนเป็น `.tmp` ก่อน → Rename เป็น `.json` เมื่อเสร็จ | +| rag-output/ เต็ม (เก่าสะสม) | Disk บน NAS เต็ม | ตั้ง Schedule ลบ `.done` ที่เกิน 30 วัน | +| OpenRAG Metadata ผิด | นำข้อมูลผิดเข้า DMS | Confidence < 0.85 → Human Review Queue (ADR-017 Policy) | +| Embedding Dim Mismatch | Index ใช้งานไม่ได้ | กำหนด Model + Dims ก่อน Index แรก ห้ามเปลี่ยน | +| RTX 2060 SUPER VRAM (8GB) | Timeout ถ้า Model ใหญ่เกินไป | ใช้ `nomic-embed-text` สำหรับ Embedding | +| AI Hallucination | คำตอบผิด | Confidence Threshold + Source Citation บังคับ | +| Cross-project Data Leak | Security Issue | RBAC Filter ทุก Query ที่ Elasticsearch Layer | +| Elasticsearch Storage | Disk Usage สูง | เปิด ILM Policy หรือจำกัดเฉพาะ Project สำคัญ | +| Ollama ไม่พร้อม | Query ล้มเหลว | Graceful Fallback: ใช้ Elasticsearch Full-text เท่านั้น | + +--- + +## 📋 Implementation Gate (ก่อนพัฒนา) + +> **หมายเหตุ:** Feature นี้เป็น Post-UAT / Post-Migration +> ต้องผ่าน Go-Live Gate ของ Migration (ADR-017) ก่อนเริ่มพัฒนา + +**OpenRAG Setup (Admin Desktop):** +- [ ] ติดตั้ง OpenRAG บน Admin Desktop ตาม `## 🛠️ OpenRAG Setup Guide` ด้านล่าง +- [ ] กำหนด Langflow Workflow: PDF Input → Docling Parse → Ollama Extract → JSON Output +- [ ] ตั้งค่า System Prompt ใน Langflow ให้ Output ตรง JSON Contract ด้านบน +- [ ] ทดสอบ Extraction Accuracy กับตัวอย่างเอกสาร 20 ฉบับ (ไทย + อังกฤษ) +- [ ] ยืนยัน OpenRAG ไม่มี DB Credentials และ Mount `staging_ai` เป็น Read-only + +**n8n Webhook Integration:** +- [ ] สร้าง n8n Webhook Endpoint: รับ JSON จาก OpenRAG (validate schema + route ตาม Confidence) +- [ ] ทดสอบ Idempotency-Key กรณี OpenRAG ส่ง Duplicate +- [ ] สร้าง n8n Workflow: RAG Indexer (Dry Run กับ 10 เอกสาร) + +**Search & Query:** +- [ ] Migration v1.8.x เสร็จสมบูรณ์และ Stable (Prerequisite) +- [ ] ทดสอบ `nomic-embed-text` บน Admin Desktop — วัด VRAM + Speed +- [ ] กำหนด Elasticsearch Index Schema + Dims (lock ก่อน Index แรก) +- [ ] ออกแบบ RBAC Filter สำหรับ kNN Search +- [ ] สร้าง n8n Workflow: RAG Query (ทดสอบ Hallucination) +- [ ] เพิ่ม `/api/rag/search` Endpoint ใน DMS Backend +- [ ] เพิ่ม UI Component: RAG Search Panel ใน Frontend +- [ ] Load Test: Query Latency < 5 วินาที สำหรับ Top-5 Results + +--- + +## 🛠️ OpenRAG Setup Guide (Admin Desktop — Step by Step) + +> **สภาพแวดล้อม:** Windows 10/11, i9-9900K, 32GB RAM, RTX 2060 SUPER 8GB +> **ข้อกำหนด:** Ollama ต้องรันอยู่แล้วบน Admin Desktop (Port 11434) +> **เวลาติดตั้งประมาณ:** 20–40 นาที (ขึ้นอยู่กับความเร็ว Internet สำหรับ Pull Images) + +--- + +### ขั้นตอนที่ 1: ติดตั้ง WSL 2 + Docker Desktop + +OpenRAG บน Windows **ต้องรันผ่าน WSL 2** (ข้อกำหนดจาก OpenRAG Official Docs) + +```powershell +# รันใน PowerShell (Admin) บน Admin Desktop +wsl --install -d Ubuntu +# รีสตาร์ท Windows หลังติดตั้งเสร็จ +``` + +จากนั้นติดตั้ง **Docker Desktop for Windows** พร้อม WSL 2 Integration: + +1. ดาวน์โหลด Docker Desktop จาก [docs.docker.com](https://docs.docker.com/desktop/install/windows-install/) +2. ระหว่างติดตั้ง → เปิด **"Use WSL 2 based engine"** +3. หลังติดตั้ง → ไปที่ **Settings → Resources → WSL Integration** +4. เปิด Toggle สำหรับ Ubuntu distribution → **Apply & Restart** + +> ✅ ตรวจสอบ: เปิด WSL Ubuntu แล้วรัน `docker ps` — ต้องไม่มี Error + +--- + +### ขั้นตอนที่ 2: ติดตั้ง uv ใน WSL + +> ℹ️ **Ubuntu 24.04 (Noble) ไม่มี `python3.13` ใน Default Repository** +> ไม่ต้องติดตั้ง Python ด้วย `apt` — **`uv` จัดการ Python 3.13 เองได้โดยอัตโนมัติ** + +```bash +# ติดตั้ง uv (ไม่ต้องการ Python ก่อน) +curl -LsSf https://astral.sh/uv/install.sh | sh +source ~/.bashrc # โหลด PATH ใหม่ + +# ตรวจสอบ +uv --version # ต้องแสดง version เช่น uv 0.5.x +``` + +เมื่อรัน `uvx --python 3.13 openrag` ในขั้นตอนถัดไป `uv` จะ **ดาวน์โหลด Python 3.13 เองโดยอัตโนมัติ** ไม่ต้องติดตั้งแยก + +> **ทางเลือก:** ถ้าต้องการ Python 3.13 ระดับ System จริงๆ (ไม่บังคับ): +> ```bash +> sudo add-apt-repository ppa:deadsnakes/ppa -y +> sudo apt update && sudo apt install -y python3.13 python3.13-venv +> ``` + +--- + +### ขั้นตอนที่ 3: ติดตั้ง OpenRAG + +```bash +# ใน WSL Ubuntu: +mkdir ~/openrag-workspace +cd ~/openrag-workspace + +# ⚠️ ติดตั้งแพ็กเกจระบบที่จำเป็นสำหรับ EasyOCR และ Docling +sudo apt update +sudo apt install -y libgl1 libglib2.0-0 + +# ติดตั้งและรัน OpenRAG (ครั้งแรกใช้เวลา 5–15 นาที) +# จะติดตั้ง easyocr ไปด้วยเพื่อรองรับ PDF ภาษาไทยผ่าน Docling +uvx --with easyocr --python 3.13 openrag +``` + +**ระหว่าง Interactive Setup ตอบดังนี้:** + +| Prompt | คำตอบ (สำหรับระบบ LCBP3) | +|--------|--------------------------| +| OpenSearch Admin password | ตั้งรหัสผ่านแข็งแรง บันทึกไว้ | +| Langflow Admin password | ตั้งรหัสผ่านแข็งแรง บันทึกไว้ | +| OpenAI API key | **กด N / Skip** — เราใช้ Ollama แทน | +| Use custom LLM provider? | **Y** → เลือก **Ollama** | +| Ollama base URL | `http://192.168.20.100:11434` (Internal VLAN — Ollama รันบน Admin Desktop โดยตรง) | +| Configure Langfuse tracing? | **N** | +| Configure cloud connectors? | **N** | +| Start services now? | **Y** | + +> ℹ️ **Ollama รันบน Windows โดยตรง** (ไม่ใช่ใน Docker) ที่ IP `192.168.20.100` — ตรงกับ Config ใน `03-05-n8n-migration-setup-guide.md` +> +> ถ้าตั้งค่าผิดพลาด แก้ไขได้ที่: +> ```bash +> nano ~/.openrag/tui/.env +> # แก้บรรทัด OLLAMA_ENDPOINT=http://192.168.20.100:11434 +> ``` + +--- + +### ขั้นตอนที่ 4: ตรวจสอบ Services ที่รัน + +```bash +# ดู Containers ที่ OpenRAG สร้าง +docker ps + +# ควรเห็น containers เหล่านี้: +# openrag-langflow (Langflow UI + API) +# openrag-opensearch (OpenSearch: Vector Store) +# openrag-opensearch-dashboards (Optional) +``` + +**URL ที่ใช้งานได้:** + +| Service | URL | หมายเหตุ | +|---------|-----|----------| +| OpenRAG UI | `http://localhost:3000` | หน้าหลัก (เหมือน Chat UI) | +| Langflow | `http://localhost:7860` | สร้าง/แก้ไข Workflow | +| OpenSearch | `http://localhost:9200` | Vector Store API | + +--- + +### ขั้นตอนที่ 5: ตรวจสอบ Ollama Connection + +```bash +# ทดสอบว่า OpenRAG เชื่อม Ollama ได้ (รันใน WSL) +curl http://192.168.20.100:11434/api/tags + +# ต้องแสดง JSON รายการ Models ที่มีใน Ollama +# ตรวจสอบว่ามี: +# - llama3.2:3b +# - mistral:7b-instruct-q4_K_M +# - nomic-embed-text (ถ้ายังไม่มี ให้ติดตั้ง) +``` + +```bash +# ติดตั้ง nomic-embed-text (สำหรับ Embedding) +# รันบน Windows Terminal (ไม่ใช่ WSL): +ollama pull nomic-embed-text +``` + +--- + +### ขั้นตอนที่ 6: กำหนด Langflow Workflow (Batch Extraction Pipeline) + +เปิด Langflow ที่ `http://localhost:7860` → **New Flow** → เพิ่ม Component ตามลำดับดังนี้: + +#### ภาพรวม Node Connection + +``` +[Read File] ──▶ [Loop] ──▶ [Parser: Stringify] ──▶ [Prompt Template] ──▶ [Ollama] + │ │ + │ (filename จาก Item) │ + └──────────────────────────────────────────────────────────▶│ + [Custom Code] + │ + เขียน .tmp → rename .json +``` + +--- + +#### Node 1: Read File + +> **Component:** `Read File` (หมวด Data / Helpers) + +| Setting | ค่า | +|---------|-----| +| Files | อัปโหลด หรือ ชี้ไปที่ `/data/staging_ai/` | +| Advanced Parser | `OFF` (ปิด — อ่านเป็น raw text ธรรมดา) | + +**การเชื่อมต่อ:** +- Output `Files` → Input `Inputs` ของ Loop + +> ℹ️ Read File จะโหลดไฟล์ทั้งหมดมาเป็น list แล้วส่งให้ Loop วนลูปทีละไฟล์ +> ถ้าต้องการเลือก folder แบบ dynamic ให้ใช้ **Directory** component แทน + +--- + +#### Node 2: Loop + +> **Component:** `Loop` (หมวด Logic) + +| Setting | ค่า | +|---------|-----| +| Inputs | รับจาก `Read File → Files` | + +**Output ที่ใช้:** +- `Item` → ส่งต่อให้ Parser และ Custom Code (filename) +- `Done` → ไม่ต้องเชื่อมไปไหน (สัญญาณสิ้นสุด Loop) + +> ℹ️ Loop จะปล่อย `Item` ทีละ 1 ไฟล์ ผ่านทุก Node ก่อนวนรอบถัดไป + +--- + +#### Node 3: Parser (Mode: Stringify) + +> **Component:** `Parser` (หมวด Processing) + +| Setting | ค่า | +|---------|-----| +| Mode | **`Stringify`** (ไม่ใช่ Parser) | +| Data or DataFrame | รับจาก `Loop → Item` | + +**การเชื่อมต่อ:** +- Input `Data or DataFrame` ← `Loop → Item` +- Output `Parsed Text` → Input `extracted_text` ของ Prompt Template + +> ⚠️ **ใช้ Mode: Stringify เท่านั้น** +> Mode: Parser ใช้ template เป็น pattern สำหรับดึงค่า — ไม่เหมาะกับงานนี้ +> Mode: Stringify แปลง file object เป็น text content ที่ Ollama อ่านได้ + +--- + +#### Node 4: Prompt Template + +> **Component:** `Prompt Template` (หมวด Prompts) + +| Setting | ค่า | +|---------|-----| +| Template | ใส่ System Prompt จากขั้นตอนที่ 7 ด้านล่าง | +| Variable `{extracted_text}` | เชื่อมกับ `Parser → Parsed Text` | + +**การเชื่อมต่อ:** +- Variable `extracted_text` ← `Parser → Parsed Text` +- Output `Prompt` → Input `Input` ของ Ollama + +> ℹ️ Prompt Template รองรับ `{variable_name}` สำหรับแทรกค่าแบบ dynamic +> ต้องตั้งชื่อ variable ให้ตรงกับที่ใช้ใน template (`{extracted_text}`) + +--- + +#### Node 5: Ollama + +> **Component:** `Ollama` (หมวด Models) + +| Setting | ค่า | +|---------|-----| +| Ollama API URL | `http://localhost:11434` (ถ้ารันบน WSL ไม่ต้องใส่ IP) | +| Model Name | `scb10x/typhoon2.1-gemma3-4b` | +| Format | ไม่ต้องตั้ง — ใช้ Enable Structured Output แทน | +| Temperature | `0.1` | +| Enable Structured Output | `ON` | +| Tool Model Enabled | `ON` (เห็นใน screenshot) | + +**การเชื่อมต่อ:** +- Input `Input` ← `Prompt Template → Prompt` +- Input `System Message` ← ปล่อยว่าง (System Prompt อยู่ใน Prompt Template แล้ว) +- Output `Text` → Input ของ Custom Code (Node 6) + +> ⚠️ **Ollama API URL:** +> ถ้า Langflow รันใน Docker (WSL) → ใช้ `http://host.docker.internal:11434` +> ถ้า Ollama bind บน VLAN IP → ใช้ `http://192.168.20.100:11434` +> ทดสอบด้วย: `curl http://host.docker.internal:11434/api/tags` ใน WSL + +--- + +#### Node 6: Write JSON (Idempotent) + +> **Component:** `Custom Component` (สร้างใหม่) — ทำหน้าที่รับ output JSON จาก Ollama และดึงชื่อไฟล์จาก Loop เพื่อเขียนเป็นไฟล์ `.json` + +**Python Code:** + +```python +from langflow.custom import Component +from langflow.io import StrInput, DataInput, Output +from langflow.schema import Data +import json +import os +from pathlib import Path + +class WriteJsonIdempotent(Component): + display_name = "Write JSON (Idempotent)" + description = "Writes JSON to staging_ai dynamically based on loop item filename" + + inputs = [ + StrInput(name="json_content", display_name="JSON Content"), + DataInput(name="loop_item", display_name="Loop Item (PDF)"), + ] + + outputs = [ + Output(display_name="Result Path", name="result_path", method="write_file") + ] + + def write_file(self) -> Data: + # Extract filename from loop_item + pdf_path = self.loop_item.data.get("file_path", "") + if not pdf_path: + return Data(data={"error": "No file_path in loop item"}) + + base_name = Path(pdf_path).stem + out_dir = Path("/data/staging_ai/rag-output") + out_dir.mkdir(parents=True, exist_ok=True) + + json_path = out_dir / f"{base_name}.json" + + # Idempotency check + if json_path.exists(): + return Data(data={"status": "skipped", "path": str(json_path), "reason": "already exists"}) + + # Parse and write content to ensure it's valid JSON before saving + try: + parsed = json.loads(self.json_content) + # Inject source file name if missing + if not parsed.get("source_file"): + parsed["source_file"] = f"{base_name}.pdf" + + tmp_path = out_dir / f"{base_name}.tmp" + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(parsed, f, ensure_ascii=False, indent=2) + + # Atomic rename + os.replace(tmp_path, json_path) + + return Data(data={"status": "written", "path": str(json_path)}) + except Exception as e: + err_path = out_dir / f"{base_name}.error" + with open(err_path, "w", encoding="utf-8") as f: + f.write(f"Error parsing JSON from API: {str(e)}\\n\\nContent:\\n{self.json_content}") + return Data(data={"status": "error", "path": str(err_path), "error": str(e)}) +``` + +**การเชื่อมต่อ:** +- Input `json_content` ← `Ollama → Text` +- Input `loop_item` ← `Loop → Item` +- Output `result_path` → `Loop → item` (Feedback loop กลับไปบอกว่ารอบนี้จบแล้ว) + +> ✅ **Idempotency:** โค้ดมีการสั่ง `if json_path.exists(): return` เพื่อข้ามไฟล์ที่เขียนไปแล้ว +> ✅ **Atomic Write:** เขียนเป็น `.tmp` ก่อนแล้วใช้ `os.replace` เพื่อป้องกัน n8n มาอ่านตอนที่ยังเขียนไม่เสร็จ +> ✅ **Dynamic Filename:** อ่าน path ต้นฉบับจาก loop item ทำให้ได้ชื่อไฟล์ .json ตรงกับ pdf เสมอ + +--- + +#### สรุปการ Wire ทั้ง Workflow + +| From | Port | To | Port | +|------|------|----|------| +| Read File | Files | Loop | Inputs | +| Loop | Item | Parser | Data or DataFrame | +| Parser | Parsed Text | Prompt Template | extracted_text | +| Prompt Template | Prompt | Ollama | input_value (Input) | +| Ollama | Text | Write JSON (Idempotent) | json_content | +| Loop | Item | Write JSON (Idempotent) | loop_item | +| Write JSON | result_path | Loop | element | + + +**ตั้งค่า Ollama LLM Component:** + +| ฟิลด์ | ค่า | +|-------|-----| +| Model Name | `scb10x/typhoon2.1-gemma3-4b` | +| Base URL | `http://192.168.20.100:11434` | +| Format | `json` (บังคับ JSON Output) | +| Temperature | `0.1` (ลด Hallucination) | +| Max Tokens | `2048` | +| Enable Structured Output | `ON` | + +> ℹ️ **เหตุผลที่เลือก Typhoon 2.1:** +> `scb10x/typhoon2.1-gemma3-4b` โดย SCB10X เป็น Model ที่ออกแบบมาสำหรับภาษาไทยโดยเฉพาะ +> เหมาะกับเอกสารก่อสร้างที่มีทั้งไทยและอังกฤษปนกัน ดีกว่า `llama3.2:3b` มาก +> ต้องติดตั้งก่อน: `ollama pull scb10x/typhoon2.1-gemma3-4b` บน Admin Desktop + +--- + +### ขั้นตอนที่ 7: System Prompt สำหรับ Metadata Extraction + +คัดลอก Prompt นี้ใส่ใน **Prompt Template Component** ของ Langflow: + +> ⚠️ **Langflow Escaping Rule:** ปีกกา `{` `}` ที่เป็น JSON literal ต้องเขียนเป็น `{{` `}}` (double) +> มิฉะนั้น Langflow จะตีความว่าเป็น variable → เกิด error "Invalid variables" +> **ข้อยกเว้น:** `{extracted_text}` ใช้ single เพราะเป็น variable จริงที่รับจาก Parser + +``` +คุณเป็นผู้ช่วย AI สำหรับระบบจัดการเอกสารก่อสร้าง Laem Chabang Port Phase 3 (LCBP3) +หน้าที่ของคุณคือดึงข้อมูล Metadata จากเอกสาร แล้วตอบกลับเป็น JSON ที่ valid เท่านั้น +ห้ามเพิ่มข้อความอื่นนอกจาก JSON +เอกสารอาจเป็นภาษาไทย อังกฤษ หรือผสมกัน + +You are a document metadata extraction assistant for a construction document management system (LCBP3). +Extract the following fields and return ONLY a valid JSON object. +No explanation, no markdown, no text outside the JSON. +Documents may be in Thai, English, or mixed language. + +Return ONLY this JSON structure: +{{ + "source_file": "<ชื่อไฟล์ PDF ที่รับมา>", + "is_valid": true, + "confidence": 0.0, + "extracted_text": "", + "metadata": {{ + "correspondence_number": "", + "title": "", + "document_date": "", + "sender_org": "", + "receiver_org": "", + "project_code": "", + "suggested_category": "", + "detected_issues": [] + }}, + "chunks": [ + {{"chunk_index": 0, "page": 1, "text": ""}} + ] +}} + +Document text to analyze: +{extracted_text} +``` + +> ℹ️ **`{{` `}}` → แสดงเป็น `{` `}` จริงใน prompt ที่ส่งให้ LLM** +> ⚠️ **ห้าม Hardcode** รายการ Category — ดูจาก `GET /api/master/correspondence-types` ตาม ADR-017 + +--- + +### ขั้นตอนที่ 8: ตั้งค่า Volume Mount (AI Isolation — ADR-018) + +แก้ไขไฟล์ `~/.openrag/tui/docker-compose.yml` ที่ OpenRAG สร้างขึ้น: + +```yaml +services: + langflow: + volumes: + # staging_ai mount จาก NAS + # Windows R:\ drive จะปรากฏใน WSL เป็น /mnt/r/ + - /mnt/r/staging_ai:/data/staging_ai # ← Read PDF + Write rag-output/ + # หมายเหตุ: ต้องเขียนได้ที่ rag-output/ จึงไม่ใส่ :ro + + opensearch: + # ไม่ต้อง mount staging_ai — OpenSearch ใช้ Vector Store เท่านั้น +``` + +> ⚠️ **ตรวจสอบ R:\ ใน WSL:** +> ```bash +> # ใน WSL Terminal ตรวจว่า mount อยู่ที่ไหน +> ls /mnt/r/staging_ai/ +> # ต้องเห็นไฟล์ PDF ที่มีอยู่ +> ``` +> +> ✅ **สร้าง rag-output/ ก่อนรัน:** +> ```bash +> mkdir -p /mnt/r/staging_ai/rag-output +> ``` + +```bash +# หลังแก้ไข docker-compose.yml — รีสตาร์ท OpenRAG +cd ~/openrag-workspace +docker compose -f ~/.openrag/tui/docker-compose.yml restart langflow +``` + +--- + +### ขั้นตอนที่ 9: ตรวจสอบ File-based Queue + +ทดสอบว่า OpenRAG เขียนไฟล์ลง NAS ได้ และ n8n อ่านไฟล์จาก NAS ได้: + +**ทดสอบ OpenRAG เขียน (ใน WSL):** + +```bash +# ตรวจสอบว่า mount ใช้งานได้ +ls /mnt/r/staging_ai/*.pdf | head -5 + +# ทดสอบเขียนไฟล์ +echo '{"test": true}' > /mnt/r/staging_ai/rag-output/test.json +ls /mnt/r/staging_ai/rag-output/ +# ต้องเห็น test.json +``` + +**ทดสอบ n8n อ่าน (ใน n8n Workflow):** + +สร้าง Test Workflow ใน n8n: + +| Node | Type | Config | +|------|------|--------| +| Trigger | Manual | - | +| List Files | Read/Write Files from Disk | Path: `staging_ai/rag-output/*.json` | +| Read File | Read/Write Files from Disk | Dynamic path จาก List node | +| Parse JSON | Code | `JSON.parse(items[0].binary.data.toString())` | + +```bash +# ตรวจสอบ path ใน n8n container +docker exec n8n ls /home/node/.n8n/staging_ai/rag-output/ +# ต้องเห็น test.json ที่สร้างไว้ +``` + +> 💡 **Path Mapping:** +> - Admin Desktop (WSL): `/mnt/r/staging_ai/rag-output/` +> - n8n บน QNAP: `staging_ai/rag-output/` (ตาม Volume Mount ใน docker-compose) + +--- + +### ขั้นตอนที่ 10: Pre-Production Verification + +| ลำดับ | รายการ | วิธีตรวจสอบ | +|-------|--------|-------------| +| 1 | Ollama เชื่อมต่อได้ | `curl http://192.168.20.100:11434/api/tags` จาก WSL | +| 2 | `nomic-embed-text` พร้อม | `ollama list` บน Windows Terminal | +| 3 | Langflow รันได้ | เปิด `http://localhost:7860` | +| 4 | R:\ mount เห็น PDF | `ls /mnt/r/staging_ai/*.pdf` ใน WSL | +| 5 | Langflow เขียน rag-output/ ได้ | ดู `/mnt/r/staging_ai/rag-output/` หลังรัน Test | +| 6 | ไม่มี DB Credentials ใน env | ตรวจ `~/.openrag/tui/docker-compose.yml` | +| 7 | Extraction ถูกต้อง ≥ 85% | รัน Batch กับเอกสาร 20 ฉบับ นับ field ที่ถูก | +| 8 | JSON ถูกต้อง (valid JSON) | `python3 -m json.tool rag-output/test.json` | +| 9 | n8n อ่าน JSON จาก NAS ได้ | รัน Test Workflow ใน n8n ดู Execution Log | +| 10 | GPU VRAM < 7.5GB ระหว่างรัน | `nvidia-smi --query-gpu=memory.used --format=csv` | + +```bash +# ตรวจสอบ VRAM บน Admin Desktop (Windows Terminal) +nvidia-smi --query-gpu=memory.used,memory.total --format=csv +``` + +--- + +## 📋 Implementation Gate (ก่อนพัฒนา) + +> **หมายเหตุ:** Feature นี้เป็น Post-UAT / Post-Migration +> ต้องผ่าน Go-Live Gate ของ Migration (ADR-017) ก่อนเริ่มพัฒนา + +**OpenRAG Setup (Admin Desktop):** +- [ ] WSL 2 + Docker Desktop ติดตั้งเสร็จ (ขั้นตอนที่ 1) +- [ ] OpenRAG ติดตั้งผ่าน `uvx --python 3.13 openrag` (ขั้นตอนที่ 2–3) +- [ ] Ollama เชื่อมต่อจาก Docker Container ได้ (ขั้นตอนที่ 5) +- [ ] `nomic-embed-text` พร้อมใช้งานใน Ollama +- [ ] Langflow Batch Workflow สร้างเสร็จพร้อม System Prompt (ขั้นตอนที่ 6–7) +- [ ] Volume Mount `R:\staging_ai` → `/data/staging_ai` (Read+Write) (ขั้นตอนที่ 8) +- [ ] สร้าง folder `staging_ai/rag-output/` บน NAS ก่อนรัน +- [ ] ตรวจสอบ Idempotent: Skip ถ้า `.json` ไฟล์มีอยู่แล้ว +- [ ] ทดสอบ Extraction Accuracy ≥ 85% กับ 20 เอกสารตัวอย่าง (ขั้นตอนที่ 10) +- [ ] ยืนยัน OpenRAG ไม่มี DB Credentials ใน docker-compose.yml + +**n8n File-based Queue Integration:** +- [ ] ตรวจสอบ n8n Volume Mount เห็น `staging_ai/rag-output/` (ขั้นตอนที่ 9) +- [ ] สร้าง n8n Schedule Workflow: List JSON Files → Loop → Read → Validate → Route +- [ ] ทดสอบ Rename ไฟล์ `.json` → `.done` / `.error` ใน n8n +- [ ] n8n Workflow: OpenRAG Ingestor รัน Validation + Confidence Router ได้ +- [ ] ทดสอบ Idempotency-Key กรณีรัน n8n ซ้ำ (ไฟล์ `.done` ไม่ถูก Process ซ้ำ) + +**Search & Query (Post-Migration):** +- [ ] Migration v1.8.x เสร็จสมบูรณ์และ Stable (Prerequisite) +- [ ] กำหนด Elasticsearch Index Schema + Dims (lock ก่อน Index แรก) +- [ ] ออกแบบ RBAC Filter สำหรับ kNN Search +- [ ] สร้าง n8n Workflow: RAG Indexer + RAG Query +- [ ] เพิ่ม `/api/rag/search` Endpoint ใน DMS Backend +- [ ] เพิ่ม UI Component: RAG Search Panel ใน Frontend +- [ ] Load Test: Query Latency < 5 วินาที สำหรับ Top-5 Results + +--- + +*เอกสารนี้เป็น Living Document — อัปเดตเมื่อมีการตัดสินใจ Architecture ใหม่* +**Version:** 1.8.1 | **Author:** Development Team | **Last Updated:** 2026-03-13 diff --git a/specs/03-Data-and-Storage/OpenRAG V0.1.json b/specs/03-Data-and-Storage/OpenRAG V0.1.json new file mode 100644 index 0000000..ec8fe0e --- /dev/null +++ b/specs/03-Data-and-Storage/OpenRAG V0.1.json @@ -0,0 +1,2892 @@ +{ + "data": { + "edges": [ + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "File", + "id": "File-5V2fL", + "name": "dataframe", + "output_types": [ + "DataFrame" + ] + }, + "targetHandle": { + "fieldName": "data", + "id": "LoopComponent-5vFOr", + "inputTypes": [ + "DataFrame" + ], + "type": "other" + } + }, + "id": "xy-edge__File-5V2fL{œdataTypeœ:œFileœ,œidœ:œFile-5V2fLœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}-LoopComponent-5vFOr{œfieldNameœ:œdataœ,œidœ:œLoopComponent-5vFOrœ,œinputTypesœ:[œDataFrameœ],œtypeœ:œotherœ}", + "selected": false, + "source": "File-5V2fL", + "sourceHandle": "{œdataTypeœ:œFileœ,œidœ:œFile-5V2fLœ,œnameœ:œdataframeœ,œoutput_typesœ:[œDataFrameœ]}", + "target": "LoopComponent-5vFOr", + "targetHandle": "{œfieldNameœ:œdataœ,œidœ:œLoopComponent-5vFOrœ,œinputTypesœ:[œDataFrameœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "className": "", + "data": { + "sourceHandle": { + "dataType": "Prompt Template", + "id": "Prompt Template-dKwcS", + "name": "prompt", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "fieldName": "system_message", + "id": "OllamaModel-xJSnu", + "inputTypes": [ + "Message" + ], + "type": "str" + } + }, + "id": "xy-edge__Prompt Template-dKwcS{œdataTypeœ:œPrompt Templateœ,œidœ:œPrompt Template-dKwcSœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}-OllamaModel-xJSnu{œfieldNameœ:œsystem_messageœ,œidœ:œOllamaModel-xJSnuœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", + "selected": false, + "source": "Prompt Template-dKwcS", + "sourceHandle": "{œdataTypeœ:œPrompt Templateœ,œidœ:œPrompt Template-dKwcSœ,œnameœ:œpromptœ,œoutput_typesœ:[œMessageœ]}", + "target": "OllamaModel-xJSnu", + "targetHandle": "{œfieldNameœ:œsystem_messageœ,œidœ:œOllamaModel-xJSnuœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}" + }, + { + "animated": false, + "data": { + "sourceHandle": { + "dataType": "LoopComponent", + "id": "LoopComponent-5vFOr", + "name": "item", + "output_types": [ + "Data" + ] + }, + "targetHandle": { + "fieldName": "input_data", + "id": "ParserComponent-Xspgr", + "inputTypes": [ + "DataFrame", + "Data" + ], + "type": "other" + } + }, + "id": "xy-edge__LoopComponent-5vFOr{œdataTypeœ:œLoopComponentœ,œidœ:œLoopComponent-5vFOrœ,œnameœ:œitemœ,œoutput_typesœ:[œDataœ]}-ParserComponent-Xspgr{œfieldNameœ:œinput_dataœ,œidœ:œParserComponent-Xspgrœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}", + "selected": false, + "source": "LoopComponent-5vFOr", + "sourceHandle": "{œdataTypeœ:œLoopComponentœ,œidœ:œLoopComponent-5vFOrœ,œnameœ:œitemœ,œoutput_typesœ:[œDataœ]}", + "target": "ParserComponent-Xspgr", + "targetHandle": "{œfieldNameœ:œinput_dataœ,œidœ:œParserComponent-Xspgrœ,œinputTypesœ:[œDataFrameœ,œDataœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "data": { + "sourceHandle": { + "dataType": "ParserComponent", + "id": "ParserComponent-Xspgr", + "name": "parsed_text", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "fieldName": "extracted_text", + "id": "Prompt Template-dKwcS", + "inputTypes": [ + "Message" + ], + "type": "str" + } + }, + "id": "xy-edge__ParserComponent-Xspgr{œdataTypeœ:œParserComponentœ,œidœ:œParserComponent-Xspgrœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}-Prompt Template-dKwcS{œfieldNameœ:œextracted_textœ,œidœ:œPrompt Template-dKwcSœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}", + "selected": false, + "source": "ParserComponent-Xspgr", + "sourceHandle": "{œdataTypeœ:œParserComponentœ,œidœ:œParserComponent-Xspgrœ,œnameœ:œparsed_textœ,œoutput_typesœ:[œMessageœ]}", + "target": "Prompt Template-dKwcS", + "targetHandle": "{œfieldNameœ:œextracted_textœ,œidœ:œPrompt Template-dKwcSœ,œinputTypesœ:[œMessageœ],œtypeœ:œstrœ}" + }, + { + "animated": false, + "data": { + "sourceHandle": { + "dataType": "OllamaModel", + "id": "OllamaModel-xJSnu", + "name": "text_output", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "fieldName": "input", + "id": "SaveToFile-M0RUY", + "inputTypes": [ + "Data", + "DataFrame", + "Message" + ], + "type": "other" + } + }, + "id": "xy-edge__OllamaModel-xJSnu{œdataTypeœ:œOllamaModelœ,œidœ:œOllamaModel-xJSnuœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}-SaveToFile-M0RUY{œfieldNameœ:œinputœ,œidœ:œSaveToFile-M0RUYœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}", + "selected": false, + "source": "OllamaModel-xJSnu", + "sourceHandle": "{œdataTypeœ:œOllamaModelœ,œidœ:œOllamaModel-xJSnuœ,œnameœ:œtext_outputœ,œoutput_typesœ:[œMessageœ]}", + "target": "SaveToFile-M0RUY", + "targetHandle": "{œfieldNameœ:œinputœ,œidœ:œSaveToFile-M0RUYœ,œinputTypesœ:[œDataœ,œDataFrameœ,œMessageœ],œtypeœ:œotherœ}" + }, + { + "animated": false, + "data": { + "sourceHandle": { + "dataType": "SaveToFile", + "id": "SaveToFile-M0RUY", + "name": "message", + "output_types": [ + "Message" + ] + }, + "targetHandle": { + "dataType": "LoopComponent", + "id": "LoopComponent-5vFOr", + "name": "item", + "output_types": [ + "Data", + "Message" + ] + } + }, + "id": "xy-edge__SaveToFile-M0RUY{œdataTypeœ:œSaveToFileœ,œidœ:œSaveToFile-M0RUYœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}-LoopComponent-5vFOr{œdataTypeœ:œLoopComponentœ,œidœ:œLoopComponent-5vFOrœ,œnameœ:œitemœ,œoutput_typesœ:[œDataœ,œMessageœ]}", + "selected": false, + "source": "SaveToFile-M0RUY", + "sourceHandle": "{œdataTypeœ:œSaveToFileœ,œidœ:œSaveToFile-M0RUYœ,œnameœ:œmessageœ,œoutput_typesœ:[œMessageœ]}", + "target": "LoopComponent-5vFOr", + "targetHandle": "{œdataTypeœ:œLoopComponentœ,œidœ:œLoopComponent-5vFOrœ,œnameœ:œitemœ,œoutput_typesœ:[œDataœ,œMessageœ]}" + } + ], + "nodes": [ + { + "data": { + "id": "File-5V2fL", + "node": { + "base_classes": [ + "Message" + ], + "beta": false, + "conditional_paths": [], + "custom_fields": {}, + "description": "Loads and returns the content from uploaded files.", + "display_name": "Read File", + "documentation": "https://docs.langflow.org/read-file", + "edited": false, + "field_order": [ + "storage_location", + "path", + "file_path", + "separator", + "silent_errors", + "delete_server_file_after_processing", + "ignore_unsupported_extensions", + "ignore_unspecified_files", + "file_path_str", + "aws_access_key_id", + "aws_secret_access_key", + "bucket_name", + "aws_region", + "s3_file_key", + "service_account_key", + "file_id", + "advanced_mode", + "pipeline", + "ocr_engine", + "md_image_placeholder", + "md_page_break_placeholder", + "doc_key", + "use_multithreading", + "concurrency_multithreading", + "markdown" + ], + "frozen": false, + "icon": "file-text", + "last_updated": "2026-03-13T07:48:58.791Z", + "legacy": false, + "lf_version": "1.8.0", + "metadata": { + "code_hash": "12a5841f1a03", + "dependencies": { + "dependencies": [ + { + "name": "lfx", + "version": null + }, + { + "name": "langchain_core", + "version": "0.3.83" + }, + { + "name": "pydantic", + "version": "2.11.10" + }, + { + "name": "googleapiclient", + "version": "2.154.0" + } + ], + "total_dependencies": 4 + }, + "module": "lfx.components.files_and_knowledge.file.FileComponent" + }, + "minimized": false, + "output_types": [], + "outputs": [ + { + "allows_loop": false, + "cache": true, + "display_name": "Files", + "group_outputs": false, + "hidden": null, + "loop_types": null, + "method": "load_files", + "name": "dataframe", + "options": null, + "required_inputs": null, + "selected": "DataFrame", + "tool_mode": true, + "types": [ + "DataFrame" + ], + "value": "__UNDEFINED__" + } + ], + "pinned": false, + "template": { + "_frontend_node_flow_id": { + "value": "4a538191-04b4-41cf-98d7-8e62aaccf3a8" + }, + "_frontend_node_folder_id": { + "value": "60f723dc-b1f8-4e25-9c31-0a4ee07abd5c" + }, + "_type": "Component", + "advanced_mode": { + "_input_type": "BoolInput", + "advanced": false, + "display_name": "Advanced Parser", + "dynamic": false, + "info": "Enable advanced document processing and export with Docling for PDFs, images, and office documents. Note that advanced document processing can consume significant resources.", + "list": false, + "list_add_label": "Add More", + "name": "advanced_mode", + "override_skip": false, + "placeholder": "", + "real_time_refresh": true, + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": true, + "type": "bool", + "value": false + }, + "aws_access_key_id": { + "_input_type": "SecretStrInput", + "advanced": false, + "display_name": "AWS Access Key ID", + "dynamic": false, + "info": "AWS Access key ID.", + "input_types": [], + "load_from_db": false, + "name": "aws_access_key_id", + "override_skip": false, + "password": true, + "placeholder": "", + "required": true, + "show": false, + "title_case": false, + "track_in_telemetry": false, + "type": "str", + "value": "" + }, + "aws_region": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "AWS Region", + "dynamic": false, + "info": "AWS region (e.g., us-east-1, eu-west-1).", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "aws_region", + "override_skip": false, + "placeholder": "", + "required": false, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": false, + "type": "str", + "value": "" + }, + "aws_secret_access_key": { + "_input_type": "SecretStrInput", + "advanced": false, + "display_name": "AWS Secret Key", + "dynamic": false, + "info": "AWS Secret Key.", + "input_types": [], + "load_from_db": false, + "name": "aws_secret_access_key", + "override_skip": false, + "password": true, + "placeholder": "", + "required": true, + "show": false, + "title_case": false, + "track_in_telemetry": false, + "type": "str", + "value": "" + }, + "bucket_name": { + "_input_type": "StrInput", + "advanced": false, + "display_name": "S3 Bucket Name", + "dynamic": false, + "info": "Enter the name of the S3 bucket.", + "list": false, + "list_add_label": "Add More", + "load_from_db": false, + "name": "bucket_name", + "override_skip": false, + "placeholder": "", + "required": true, + "show": false, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": false, + "type": "str", + "value": "" + }, + "code": { + "advanced": true, + "dynamic": true, + "fileTypes": [], + "file_path": "", + "info": "", + "list": false, + "load_from_db": false, + "multiline": true, + "name": "code", + "password": false, + "placeholder": "", + "required": true, + "show": true, + "title_case": false, + "type": "code", + "value": "\"\"\"Enhanced file component with Docling support and process isolation.\n\nNotes:\n-----\n- ALL Docling parsing/export runs in a separate OS process to prevent memory\n growth and native library state from impacting the main Langflow process.\n- Standard text/structured parsing continues to use existing BaseFileComponent\n utilities (and optional threading via `parallel_load_data`).\n\"\"\"\n\nfrom __future__ import annotations\n\nimport contextlib\nimport json\nimport subprocess\nimport sys\nimport textwrap\nfrom copy import deepcopy\nfrom pathlib import Path\nfrom tempfile import NamedTemporaryFile\nfrom typing import Any\n\nfrom lfx.base.data.base_file import BaseFileComponent\nfrom lfx.base.data.storage_utils import parse_storage_path, read_file_bytes, validate_image_content_type\nfrom lfx.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data\nfrom lfx.inputs import SortableListInput\nfrom lfx.inputs.inputs import DropdownInput, MessageTextInput, StrInput\nfrom lfx.io import BoolInput, FileInput, IntInput, Output, SecretStrInput\nfrom lfx.schema.data import Data\nfrom lfx.schema.dataframe import DataFrame # noqa: TC001\nfrom lfx.schema.message import Message\nfrom lfx.services.deps import get_settings_service, get_storage_service\nfrom lfx.utils.async_helpers import run_until_complete\nfrom lfx.utils.validate_cloud import is_astra_cloud_environment\n\n\ndef _get_storage_location_options():\n \"\"\"Get storage location options, filtering out Local if in Astra cloud environment.\"\"\"\n all_options = [{\"name\": \"AWS\", \"icon\": \"Amazon\"}, {\"name\": \"Google Drive\", \"icon\": \"google\"}]\n if is_astra_cloud_environment():\n return all_options\n return [{\"name\": \"Local\", \"icon\": \"hard-drive\"}, *all_options]\n\n\nclass FileComponent(BaseFileComponent):\n \"\"\"File component with optional Docling processing (isolated in a subprocess).\"\"\"\n\n display_name = \"Read File\"\n # description is now a dynamic property - see get_tool_description()\n _base_description = \"Loads content from one or more files.\"\n documentation: str = \"https://docs.langflow.org/read-file\"\n icon = \"file-text\"\n name = \"File\"\n add_tool_output = True # Enable tool mode toggle without requiring tool_mode inputs\n\n # Extensions that can be processed without Docling (using standard text parsing)\n TEXT_EXTENSIONS = TEXT_FILE_TYPES\n\n # Extensions that require Docling for processing (images, advanced office formats, etc.)\n DOCLING_ONLY_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"jpg\",\n \"jpeg\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"webp\",\n ]\n\n # Docling-supported/compatible extensions; TEXT_FILE_TYPES are supported by the base loader.\n VALID_EXTENSIONS = [\n *TEXT_EXTENSIONS,\n *DOCLING_ONLY_EXTENSIONS,\n ]\n\n # Fixed export settings used when markdown export is requested.\n EXPORT_FORMAT = \"Markdown\"\n IMAGE_MODE = \"placeholder\"\n\n _base_inputs = deepcopy(BaseFileComponent.get_base_inputs())\n\n for input_item in _base_inputs:\n if isinstance(input_item, FileInput) and input_item.name == \"path\":\n input_item.real_time_refresh = True\n input_item.tool_mode = False # Disable tool mode for file upload input\n input_item.required = False # Make it optional so it doesn't error in tool mode\n break\n\n inputs = [\n SortableListInput(\n name=\"storage_location\",\n display_name=\"Storage Location\",\n placeholder=\"Select Location\",\n info=\"Choose where to read the file from.\",\n options=_get_storage_location_options(),\n real_time_refresh=True,\n limit=1,\n value=[{\"name\": \"Local\", \"icon\": \"hard-drive\"}],\n advanced=True,\n ),\n *_base_inputs,\n StrInput(\n name=\"file_path_str\",\n display_name=\"File Path\",\n info=(\n \"Path to the file to read. Used when component is called as a tool. \"\n \"If not provided, will use the uploaded file from 'path' input.\"\n ),\n show=False,\n advanced=True,\n tool_mode=True, # Required for Toolset toggle, but _get_tools() ignores this parameter\n required=False,\n ),\n # AWS S3 specific inputs\n SecretStrInput(\n name=\"aws_access_key_id\",\n display_name=\"AWS Access Key ID\",\n info=\"AWS Access key ID.\",\n show=False,\n advanced=False,\n required=True,\n ),\n SecretStrInput(\n name=\"aws_secret_access_key\",\n display_name=\"AWS Secret Key\",\n info=\"AWS Secret Key.\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"bucket_name\",\n display_name=\"S3 Bucket Name\",\n info=\"Enter the name of the S3 bucket.\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"aws_region\",\n display_name=\"AWS Region\",\n info=\"AWS region (e.g., us-east-1, eu-west-1).\",\n show=False,\n advanced=False,\n ),\n StrInput(\n name=\"s3_file_key\",\n display_name=\"S3 File Key\",\n info=\"The key (path) of the file in S3 bucket.\",\n show=False,\n advanced=False,\n required=True,\n ),\n # Google Drive specific inputs\n SecretStrInput(\n name=\"service_account_key\",\n display_name=\"GCP Credentials Secret Key\",\n info=\"Your Google Cloud Platform service account JSON key as a secret string (complete JSON content).\",\n show=False,\n advanced=False,\n required=True,\n ),\n StrInput(\n name=\"file_id\",\n display_name=\"Google Drive File ID\",\n info=(\"The Google Drive file ID to read. The file must be shared with the service account email.\"),\n show=False,\n advanced=False,\n required=True,\n ),\n BoolInput(\n name=\"advanced_mode\",\n display_name=\"Advanced Parser\",\n value=False,\n real_time_refresh=True,\n info=(\n \"Enable advanced document processing and export with Docling for PDFs, images, and office documents. \"\n \"Note that advanced document processing can consume significant resources.\"\n ),\n # Disabled in cloud\n show=not is_astra_cloud_environment(),\n ),\n DropdownInput(\n name=\"pipeline\",\n display_name=\"Pipeline\",\n info=\"Docling pipeline to use\",\n options=[\"standard\", \"vlm\"],\n value=\"standard\",\n advanced=True,\n real_time_refresh=True,\n ),\n DropdownInput(\n name=\"ocr_engine\",\n display_name=\"OCR Engine\",\n info=\"OCR engine to use. Only available when pipeline is set to 'standard'.\",\n options=[\"None\", \"easyocr\"],\n value=\"easyocr\",\n show=False,\n advanced=True,\n ),\n StrInput(\n name=\"md_image_placeholder\",\n display_name=\"Image placeholder\",\n info=\"Specify the image placeholder for markdown exports.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n StrInput(\n name=\"md_page_break_placeholder\",\n display_name=\"Page break placeholder\",\n info=\"Add this placeholder between pages in the markdown output.\",\n value=\"\",\n advanced=True,\n show=False,\n ),\n MessageTextInput(\n name=\"doc_key\",\n display_name=\"Doc Key\",\n info=\"The key to use for the DoclingDocument column.\",\n value=\"doc\",\n advanced=True,\n show=False,\n ),\n # Deprecated input retained for backward-compatibility.\n BoolInput(\n name=\"use_multithreading\",\n display_name=\"[Deprecated] Use Multithreading\",\n advanced=True,\n value=True,\n info=\"Set 'Processing Concurrency' greater than 1 to enable multithreading.\",\n ),\n IntInput(\n name=\"concurrency_multithreading\",\n display_name=\"Processing Concurrency\",\n advanced=True,\n info=\"When multiple files are being processed, the number of files to process concurrently.\",\n value=1,\n ),\n BoolInput(\n name=\"markdown\",\n display_name=\"Markdown Export\",\n info=\"Export processed documents to Markdown format. Only available when advanced mode is enabled.\",\n value=False,\n show=False,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\", tool_mode=True),\n ]\n\n # ------------------------------ Tool description with file names --------------\n\n def get_tool_description(self) -> str:\n \"\"\"Return a dynamic description that includes the names of uploaded files.\n\n This helps the Agent understand which files are available to read.\n \"\"\"\n base_description = \"Loads and returns the content from uploaded files.\"\n\n # Get the list of uploaded file paths\n file_paths = getattr(self, \"path\", None)\n if not file_paths:\n return base_description\n\n # Ensure it's a list\n if not isinstance(file_paths, list):\n file_paths = [file_paths]\n\n # Extract just the file names from the paths\n file_names = []\n for fp in file_paths:\n if fp:\n name = Path(fp).name\n file_names.append(name)\n\n if file_names:\n files_str = \", \".join(file_names)\n return f\"{base_description} Available files: {files_str}. Call this tool to read these files.\"\n\n return base_description\n\n @property\n def description(self) -> str:\n \"\"\"Dynamic description property that includes uploaded file names.\"\"\"\n return self.get_tool_description()\n\n async def _get_tools(self) -> list:\n \"\"\"Override to create a tool without parameters.\n\n The Read File component should use the files already uploaded via UI,\n not accept file paths from the Agent (which wouldn't know the internal paths).\n \"\"\"\n from langchain_core.tools import StructuredTool\n from pydantic import BaseModel\n\n # Empty schema - no parameters needed\n class EmptySchema(BaseModel):\n \"\"\"No parameters required - uses pre-uploaded files.\"\"\"\n\n async def read_files_tool() -> str:\n \"\"\"Read the content of uploaded files.\"\"\"\n try:\n result = self.load_files_message()\n if hasattr(result, \"get_text\"):\n return result.get_text()\n if hasattr(result, \"text\"):\n return result.text\n return str(result)\n except (FileNotFoundError, ValueError, OSError, RuntimeError) as e:\n return f\"Error reading files: {e}\"\n\n description = self.get_tool_description()\n\n tool = StructuredTool(\n name=\"load_files_message\",\n description=description,\n coroutine=read_files_tool,\n args_schema=EmptySchema,\n handle_tool_error=True,\n tags=[\"load_files_message\"],\n metadata={\n \"display_name\": \"Read File\",\n \"display_description\": description,\n },\n )\n\n return [tool]\n\n # ------------------------------ UI helpers --------------------------------------\n\n def _path_value(self, template: dict) -> list[str]:\n \"\"\"Return the list of currently selected file paths from the template.\"\"\"\n return template.get(\"path\", {}).get(\"file_path\", [])\n\n def _disable_docling_fields_in_cloud(self, build_config: dict[str, Any]) -> None:\n \"\"\"Disable all Docling-related fields in cloud environments.\"\"\"\n if \"advanced_mode\" in build_config:\n build_config[\"advanced_mode\"][\"show\"] = False\n build_config[\"advanced_mode\"][\"value\"] = False\n # Hide all Docling-related fields\n docling_fields = (\"pipeline\", \"ocr_engine\", \"doc_key\", \"md_image_placeholder\", \"md_page_break_placeholder\")\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = False\n # Also disable OCR engine specifically\n if \"ocr_engine\" in build_config:\n build_config[\"ocr_engine\"][\"value\"] = \"None\"\n\n def update_build_config(\n self,\n build_config: dict[str, Any],\n field_value: Any,\n field_name: str | None = None,\n ) -> dict[str, Any]:\n \"\"\"Show/hide Advanced Parser and related fields based on selection context.\"\"\"\n # Update storage location options dynamically based on cloud environment\n if \"storage_location\" in build_config:\n updated_options = _get_storage_location_options()\n build_config[\"storage_location\"][\"options\"] = updated_options\n\n # Handle storage location selection\n if field_name == \"storage_location\":\n # Extract selected storage location\n selected = [location[\"name\"] for location in field_value] if isinstance(field_value, list) else []\n\n # Hide all storage-specific fields first\n storage_fields = [\n \"aws_access_key_id\",\n \"aws_secret_access_key\",\n \"bucket_name\",\n \"aws_region\",\n \"s3_file_key\",\n \"service_account_key\",\n \"file_id\",\n ]\n\n for f_name in storage_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = False\n\n # Show fields based on selected storage location\n if len(selected) == 1:\n location = selected[0]\n\n if location == \"Local\":\n # Show file upload input for local storage\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = True\n\n elif location == \"AWS\":\n # Hide file upload input, show AWS fields\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = False\n\n aws_fields = [\n \"aws_access_key_id\",\n \"aws_secret_access_key\",\n \"bucket_name\",\n \"aws_region\",\n \"s3_file_key\",\n ]\n for f_name in aws_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = True\n build_config[f_name][\"advanced\"] = False\n\n elif location == \"Google Drive\":\n # Hide file upload input, show Google Drive fields\n if \"path\" in build_config:\n build_config[\"path\"][\"show\"] = False\n\n gdrive_fields = [\"service_account_key\", \"file_id\"]\n for f_name in gdrive_fields:\n if f_name in build_config:\n build_config[f_name][\"show\"] = True\n build_config[f_name][\"advanced\"] = False\n # No storage location selected - show file upload by default\n elif \"path\" in build_config:\n build_config[\"path\"][\"show\"] = True\n\n return build_config\n\n if field_name == \"path\":\n paths = self._path_value(build_config)\n\n # Disable in cloud environments\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n else:\n # If all files can be processed by docling, do so\n allow_advanced = all(not file_path.endswith((\".csv\", \".xlsx\", \".parquet\")) for file_path in paths)\n build_config[\"advanced_mode\"][\"show\"] = allow_advanced\n if not allow_advanced:\n build_config[\"advanced_mode\"][\"value\"] = False\n docling_fields = (\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n )\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = False\n\n # Docling Processing\n elif field_name == \"advanced_mode\":\n # Disable in cloud environments - don't show Docling fields even if advanced_mode is toggled\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n else:\n docling_fields = (\n \"pipeline\",\n \"ocr_engine\",\n \"doc_key\",\n \"md_image_placeholder\",\n \"md_page_break_placeholder\",\n )\n for field in docling_fields:\n if field in build_config:\n build_config[field][\"show\"] = bool(field_value)\n if field == \"pipeline\":\n build_config[field][\"advanced\"] = not bool(field_value)\n\n elif field_name == \"pipeline\":\n # Disable in cloud environments - don't show OCR engine even if pipeline is changed\n if is_astra_cloud_environment():\n self._disable_docling_fields_in_cloud(build_config)\n elif field_value == \"standard\":\n build_config[\"ocr_engine\"][\"show\"] = True\n build_config[\"ocr_engine\"][\"value\"] = \"easyocr\"\n else:\n build_config[\"ocr_engine\"][\"show\"] = False\n build_config[\"ocr_engine\"][\"value\"] = \"None\"\n\n return build_config\n\n def update_outputs(self, frontend_node: dict[str, Any], field_name: str, field_value: Any) -> dict[str, Any]: # noqa: ARG002\n \"\"\"Dynamically show outputs based on file count/type and advanced mode.\"\"\"\n if field_name not in [\"path\", \"advanced_mode\", \"pipeline\"]:\n return frontend_node\n\n template = frontend_node.get(\"template\", {})\n paths = self._path_value(template)\n if not paths:\n return frontend_node\n\n frontend_node[\"outputs\"] = []\n if len(paths) == 1:\n file_path = paths[0] if field_name == \"path\" else frontend_node[\"template\"][\"path\"][\"file_path\"][0]\n if file_path.endswith((\".csv\", \".xlsx\", \".parquet\")):\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Structured Content\",\n name=\"dataframe\",\n method=\"load_files_structured\",\n tool_mode=True,\n ),\n )\n elif file_path.endswith(\".json\"):\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Structured Content\", name=\"json\", method=\"load_files_json\", tool_mode=True),\n )\n\n advanced_mode = frontend_node.get(\"template\", {}).get(\"advanced_mode\", {}).get(\"value\", False)\n if advanced_mode:\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Structured Output\",\n name=\"advanced_dataframe\",\n method=\"load_files_dataframe\",\n tool_mode=True,\n ),\n )\n frontend_node[\"outputs\"].append(\n Output(\n display_name=\"Markdown\", name=\"advanced_markdown\", method=\"load_files_markdown\", tool_mode=True\n ),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\", tool_mode=True),\n )\n else:\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Raw Content\", name=\"message\", method=\"load_files_message\", tool_mode=True),\n )\n frontend_node[\"outputs\"].append(\n Output(display_name=\"File Path\", name=\"path\", method=\"load_files_path\", tool_mode=True),\n )\n else:\n # Multiple files => DataFrame output; advanced parser disabled\n frontend_node[\"outputs\"].append(\n Output(display_name=\"Files\", name=\"dataframe\", method=\"load_files\", tool_mode=True)\n )\n\n return frontend_node\n\n # ------------------------------ Core processing ----------------------------------\n\n def _get_selected_storage_location(self) -> str:\n \"\"\"Get the selected storage location from the SortableListInput.\"\"\"\n if hasattr(self, \"storage_location\") and self.storage_location:\n if isinstance(self.storage_location, list) and len(self.storage_location) > 0:\n return self.storage_location[0].get(\"name\", \"\")\n if isinstance(self.storage_location, dict):\n return self.storage_location.get(\"name\", \"\")\n return \"Local\" # Default to Local if not specified\n\n def _validate_and_resolve_paths(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Override to handle file_path_str input from tool mode and cloud storage.\n\n Priority:\n 1. Cloud storage (AWS/Google Drive) if selected\n 2. file_path_str (if provided by the tool call)\n 3. path (uploaded file from UI)\n \"\"\"\n storage_location = self._get_selected_storage_location()\n\n # Handle AWS S3\n if storage_location == \"AWS\":\n return self._read_from_aws_s3()\n\n # Handle Google Drive\n if storage_location == \"Google Drive\":\n return self._read_from_google_drive()\n\n # Handle Local storage\n # Check if file_path_str is provided (from tool mode)\n file_path_str = getattr(self, \"file_path_str\", None)\n if file_path_str:\n # Use the string path from tool mode\n from pathlib import Path\n\n from lfx.schema.data import Data\n\n # Use same resolution logic as BaseFileComponent (support storage paths)\n path_str = str(file_path_str)\n if parse_storage_path(path_str):\n try:\n resolved_path = Path(self.get_full_path(path_str))\n except (ValueError, AttributeError):\n resolved_path = Path(self.resolve_path(path_str))\n else:\n resolved_path = Path(self.resolve_path(path_str))\n\n if not resolved_path.exists():\n msg = f\"File or directory not found: {file_path_str}\"\n self.log(msg)\n if not self.silent_errors:\n raise ValueError(msg)\n return []\n\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(resolved_path)})\n return [BaseFileComponent.BaseFile(data_obj, resolved_path, delete_after_processing=False)]\n\n # Otherwise use the default implementation (uses path FileInput)\n return super()._validate_and_resolve_paths()\n\n def _read_from_aws_s3(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Read file from AWS S3.\"\"\"\n from lfx.base.data.cloud_storage_utils import create_s3_client, validate_aws_credentials\n\n # Validate AWS credentials\n validate_aws_credentials(self)\n if not getattr(self, \"s3_file_key\", None):\n msg = \"S3 File Key is required\"\n raise ValueError(msg)\n\n # Create S3 client\n s3_client = create_s3_client(self)\n\n # Download file to temp location\n import tempfile\n\n # Get file extension from S3 key\n file_extension = Path(self.s3_file_key).suffix or \"\"\n\n with tempfile.NamedTemporaryFile(mode=\"wb\", suffix=file_extension, delete=False) as temp_file:\n temp_file_path = temp_file.name\n try:\n s3_client.download_fileobj(self.bucket_name, self.s3_file_key, temp_file)\n except Exception as e:\n # Clean up temp file on failure\n with contextlib.suppress(OSError):\n Path(temp_file_path).unlink()\n msg = f\"Failed to download file from S3: {e}\"\n raise RuntimeError(msg) from e\n\n # Create BaseFile object\n from lfx.schema.data import Data\n\n temp_path = Path(temp_file_path)\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(temp_path)})\n return [BaseFileComponent.BaseFile(data_obj, temp_path, delete_after_processing=True)]\n\n def _read_from_google_drive(self) -> list[BaseFileComponent.BaseFile]:\n \"\"\"Read file from Google Drive.\"\"\"\n import tempfile\n\n from googleapiclient.http import MediaIoBaseDownload\n\n from lfx.base.data.cloud_storage_utils import create_google_drive_service\n\n # Validate Google Drive credentials\n if not getattr(self, \"service_account_key\", None):\n msg = \"GCP Credentials Secret Key is required for Google Drive storage\"\n raise ValueError(msg)\n if not getattr(self, \"file_id\", None):\n msg = \"Google Drive File ID is required\"\n raise ValueError(msg)\n\n # Create Google Drive service with read-only scope\n drive_service = create_google_drive_service(\n self.service_account_key, scopes=[\"https://www.googleapis.com/auth/drive.readonly\"]\n )\n\n # Get file metadata to determine file name and extension\n try:\n file_metadata = drive_service.files().get(fileId=self.file_id, fields=\"name,mimeType\").execute()\n file_name = file_metadata.get(\"name\", \"download\")\n except Exception as e:\n msg = (\n f\"Unable to access file with ID '{self.file_id}'. \"\n f\"Error: {e!s}. \"\n \"Please ensure: 1) The file ID is correct, 2) The file exists, \"\n \"3) The service account has been granted access to this file.\"\n )\n raise ValueError(msg) from e\n\n # Download file to temp location\n file_extension = Path(file_name).suffix or \"\"\n with tempfile.NamedTemporaryFile(mode=\"wb\", suffix=file_extension, delete=False) as temp_file:\n temp_file_path = temp_file.name\n try:\n request = drive_service.files().get_media(fileId=self.file_id)\n downloader = MediaIoBaseDownload(temp_file, request)\n done = False\n while not done:\n _status, done = downloader.next_chunk()\n except Exception as e:\n # Clean up temp file on failure\n with contextlib.suppress(OSError):\n Path(temp_file_path).unlink()\n msg = f\"Failed to download file from Google Drive: {e}\"\n raise RuntimeError(msg) from e\n\n # Create BaseFile object\n from lfx.schema.data import Data\n\n temp_path = Path(temp_file_path)\n data_obj = Data(data={self.SERVER_FILE_PATH_FIELDNAME: str(temp_path)})\n return [BaseFileComponent.BaseFile(data_obj, temp_path, delete_after_processing=True)]\n\n def _is_docling_compatible(self, file_path: str) -> bool:\n \"\"\"Lightweight extension gate for Docling-compatible types.\"\"\"\n docling_exts = (\n \".adoc\",\n \".asciidoc\",\n \".asc\",\n \".bmp\",\n \".csv\",\n \".dotx\",\n \".dotm\",\n \".docm\",\n \".docx\",\n \".htm\",\n \".html\",\n \".jpg\",\n \".jpeg\",\n \".json\",\n \".md\",\n \".pdf\",\n \".png\",\n \".potx\",\n \".ppsx\",\n \".pptm\",\n \".potm\",\n \".ppsm\",\n \".pptx\",\n \".tiff\",\n \".txt\",\n \".xls\",\n \".xlsx\",\n \".xhtml\",\n \".xml\",\n \".webp\",\n )\n return file_path.lower().endswith(docling_exts)\n\n async def _get_local_file_for_docling(self, file_path: str) -> tuple[str, bool]:\n \"\"\"Get a local file path for Docling processing, downloading from S3 if needed.\n\n Args:\n file_path: Either a local path or S3 key (format \"flow_id/filename\")\n\n Returns:\n tuple[str, bool]: (local_path, should_delete) where should_delete indicates\n if this is a temporary file that should be cleaned up\n \"\"\"\n settings = get_settings_service().settings\n if settings.storage_type == \"local\":\n return file_path, False\n\n # S3 storage - download to temp file\n parsed = parse_storage_path(file_path)\n if not parsed:\n msg = f\"Invalid S3 path format: {file_path}. Expected 'flow_id/filename'\"\n raise ValueError(msg)\n\n storage_service = get_storage_service()\n flow_id, filename = parsed\n\n # Get file content from S3\n content = await storage_service.get_file(flow_id, filename)\n\n suffix = Path(filename).suffix\n with NamedTemporaryFile(mode=\"wb\", suffix=suffix, delete=False) as tmp_file:\n tmp_file.write(content)\n temp_path = tmp_file.name\n\n return temp_path, True\n\n def _process_docling_in_subprocess(self, file_path: str) -> Data | None:\n \"\"\"Run Docling in a separate OS process and map the result to a Data object.\n\n We avoid multiprocessing pickling by launching `python -c \"