260314:1705 20260314:1700 Refactor Migration
Build and Deploy / deploy (push) Successful in 3m25s

This commit is contained in:
admin
2026-03-14 17:05:08 +07:00
parent 81f9609ca2
commit 0211f01aa8
10 changed files with 350 additions and 261 deletions
@@ -0,0 +1,74 @@
import {
IsString,
IsNotEmpty,
IsOptional,
IsNumber,
IsBoolean,
IsArray,
} from 'class-validator';
export class EnqueueMigrationDto {
@IsString()
@IsNotEmpty()
document_number!: string;
@IsString()
@IsOptional()
title?: string;
@IsString()
@IsOptional()
original_title?: string;
@IsString()
@IsOptional()
category?: string;
@IsString()
@IsOptional()
ai_summary?: string;
@IsNumber()
@IsOptional()
project_id?: number;
@IsNumber()
@IsOptional()
sender_org_id?: number;
@IsNumber()
@IsOptional()
receiver_org_id?: number;
@IsString()
@IsOptional()
issued_date?: string;
@IsString()
@IsOptional()
received_date?: string;
@IsString()
@IsOptional()
remarks?: string;
@IsArray()
@IsOptional()
extracted_tags?: any[];
@IsNumber()
@IsOptional()
temp_attachment_id?: number;
@IsBoolean()
@IsOptional()
is_valid?: boolean;
@IsNumber()
@IsOptional()
confidence?: number;
@IsArray()
@IsOptional()
ai_issues?: any[];
}
@@ -20,8 +20,12 @@ export class ImportCorrespondenceDto {
category!: string; category!: string;
@IsString() @IsString()
@IsNotEmpty() @IsOptional()
source_file_path!: string; source_file_path?: string;
@IsNumber()
@IsOptional()
temp_attachment_id?: number;
@IsNumber() @IsNumber()
@IsOptional() @IsOptional()
@@ -56,6 +56,33 @@ export class MigrationReviewQueue {
@Column({ name: 'reviewed_at', type: 'timestamp', nullable: true }) @Column({ name: 'reviewed_at', type: 'timestamp', nullable: true })
reviewedAt?: Date; reviewedAt?: Date;
@Column({ name: 'project_id', type: 'int', nullable: true })
projectId?: number;
@Column({ name: 'sender_organization_id', type: 'int', nullable: true })
senderOrganizationId?: number;
@Column({ name: 'receiver_organization_id', type: 'int', nullable: true })
receiverOrganizationId?: number;
@Column({ name: 'received_date', type: 'date', nullable: true })
receivedDate?: Date;
@Column({ name: 'issued_date', type: 'date', nullable: true })
issuedDate?: Date;
@Column({ type: 'text', nullable: true })
remarks?: string;
@Column({ name: 'ai_summary', type: 'text', nullable: true })
aiSummary?: string;
@Column({ name: 'extracted_tags', type: 'json', nullable: true })
extractedTags?: any;
@Column({ name: 'temp_attachment_id', type: 'int', nullable: true })
tempAttachmentId?: number;
@CreateDateColumn({ name: 'created_at' }) @CreateDateColumn({ name: 'created_at' })
createdAt!: Date; createdAt!: Date;
} }
@@ -1,6 +1,7 @@
import { Controller, Post, Body, Headers, UseGuards, Get, Param, Query, Res, ParseIntPipe } from '@nestjs/common'; import { Controller, Post, Body, Headers, UseGuards, Get, Param, Query, Res, ParseIntPipe } from '@nestjs/common';
import { MigrationService } from './migration.service'; import { MigrationService } from './migration.service';
import { ImportCorrespondenceDto } from './dto/import-correspondence.dto'; import { ImportCorrespondenceDto } from './dto/import-correspondence.dto';
import { EnqueueMigrationDto } from './dto/enqueue-migration.dto';
import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard'; import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard';
import { CurrentUser } from '../../common/decorators/current-user.decorator'; import { CurrentUser } from '../../common/decorators/current-user.decorator';
import { ApiTags, ApiOperation, ApiBearerAuth, ApiHeader, ApiQuery, ApiParam } from '@nestjs/swagger'; import { ApiTags, ApiOperation, ApiBearerAuth, ApiHeader, ApiQuery, ApiParam } from '@nestjs/swagger';
@@ -30,6 +31,13 @@ export class MigrationController {
return this.migrationService.importCorrespondence(dto, idempotencyKey, userId); return this.migrationService.importCorrespondence(dto, idempotencyKey, userId);
} }
@Post('queue')
@UseGuards(JwtAuthGuard)
@ApiOperation({ summary: 'Enqueue a record into the staging migration review queue' })
async enqueueRecord(@Body() dto: EnqueueMigrationDto) {
return this.migrationService.enqueueRecord(dto);
}
@Get('queue') @Get('queue')
@UseGuards(JwtAuthGuard) @UseGuards(JwtAuthGuard)
@ApiOperation({ summary: 'Get migration review queue' }) @ApiOperation({ summary: 'Get migration review queue' })
@@ -8,6 +8,7 @@ import {
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { Repository, DataSource } from 'typeorm'; import { Repository, DataSource } from 'typeorm';
import { ImportCorrespondenceDto } from './dto/import-correspondence.dto'; import { ImportCorrespondenceDto } from './dto/import-correspondence.dto';
import { EnqueueMigrationDto } from './dto/enqueue-migration.dto';
import { ImportTransaction } from './entities/import-transaction.entity'; import { ImportTransaction } from './entities/import-transaction.entity';
import { Correspondence } from '../correspondence/entities/correspondence.entity'; import { Correspondence } from '../correspondence/entities/correspondence.entity';
import { CorrespondenceRevision } from '../correspondence/entities/correspondence-revision.entity'; import { CorrespondenceRevision } from '../correspondence/entities/correspondence-revision.entity';
@@ -21,6 +22,7 @@ import {
} from './entities/migration-review-queue.entity'; } from './entities/migration-review-queue.entity';
import { MigrationError } from './entities/migration-error.entity'; import { MigrationError } from './entities/migration-error.entity';
import { MigrationQueueQueryDto } from './dto/migration-queue-query.dto'; import { MigrationQueueQueryDto } from './dto/migration-queue-query.dto';
import { Attachment } from '../../common/file-storage/entities/attachment.entity';
import { createReadStream, existsSync } from 'fs'; import { createReadStream, existsSync } from 'fs';
import * as path from 'path'; import * as path from 'path';
@Injectable() @Injectable()
@@ -177,7 +179,20 @@ export class MigrationService {
// 4. File Handling // 4. File Handling
let attachmentId: number | null = null; let attachmentId: number | null = null;
if (dto.source_file_path) { if (dto.temp_attachment_id) {
attachmentId = dto.temp_attachment_id;
try {
// Mark attachment as permanent
await queryRunner.manager.update(
Attachment,
{ id: attachmentId },
{ isTemporary: false }
);
} catch (fileError: unknown) {
const errMsg = fileError instanceof Error ? fileError.message : String(fileError);
this.logger.warn(`Failed to update temp_file [id:${attachmentId}]: ${errMsg}`);
}
} else if (dto.source_file_path) {
try { try {
const attachment = await this.fileStorageService.importStagingFile( const attachment = await this.fileStorageService.importStagingFile(
dto.source_file_path, dto.source_file_path,
@@ -360,6 +375,63 @@ export class MigrationService {
await queryRunner.release(); await queryRunner.release();
} }
} }
async enqueueRecord(dto: EnqueueMigrationDto) {
if (!dto.document_number) {
throw new BadRequestException('document_number is required');
}
// Determine status based on confidence policy in ADR-017
let autoStatus = MigrationReviewStatus.PENDING;
if (dto.is_valid === false || (dto.confidence != null && dto.confidence < 0.60)) {
autoStatus = MigrationReviewStatus.REJECTED;
}
// Upsert or create new queue item
let queueItem = await this.reviewQueueRepo.findOne({
where: { documentNumber: dto.document_number },
});
if (!queueItem) {
queueItem = this.reviewQueueRepo.create({
documentNumber: dto.document_number,
});
}
queueItem.title = dto.title;
queueItem.originalTitle = dto.original_title;
queueItem.aiSuggestedCategory = dto.category;
queueItem.aiConfidence = dto.confidence;
queueItem.aiIssues = dto.ai_issues;
queueItem.projectId = dto.project_id;
queueItem.senderOrganizationId = dto.sender_org_id;
queueItem.receiverOrganizationId = dto.receiver_org_id;
queueItem.remarks = dto.remarks;
queueItem.aiSummary = dto.ai_summary;
queueItem.extractedTags = dto.extracted_tags;
queueItem.tempAttachmentId = dto.temp_attachment_id;
queueItem.status = autoStatus;
if (dto.issued_date) {
const parsed = new Date(dto.issued_date);
if (!isNaN(parsed.getTime())) queueItem.issuedDate = parsed;
}
if (dto.received_date) {
const parsed = new Date(dto.received_date);
if (!isNaN(parsed.getTime())) queueItem.receivedDate = parsed;
}
await this.reviewQueueRepo.save(queueItem);
this.logger.log(`Enqueued document [${dto.document_number}] to staging queue with status [${autoStatus}]`);
return {
message: 'Document enqueued successfully',
id: queueItem.id,
status: autoStatus,
};
}
async getReviewQueue(query: MigrationQueueQueryDto) { async getReviewQueue(query: MigrationQueueQueryDto) {
const { page = 1, limit = 10, status } = query; const { page = 1, limit = 10, status } = query;
const skip = (page - 1) * limit; const skip = (page - 1) * limit;
@@ -206,206 +206,102 @@ n8n ต้องเก็บ categories นี้ไว้ใน Workflow Variab
#### Node 1: Data Reader & Checkpoint #### Node 1: Data Reader & Checkpoint
- อ่าน Checkpoint จาก **MariaDB Node แยก** (ไม่ใช่ async call ใน Code Node) #### Node 1: Data Reader & Checkpoint
- Batch ทีละ **1020 แถว** ตาม `$env.MIGRATION_BATCH_SIZE`
- ติด `original_index` ทุก Item
**Encoding Normalization:** - อ่าน Checkpoint จาก **MariaDB Node แยก**
```javascript - Batch ทีละ **50100 แถว** ตาม `$env.MIGRATION_BATCH_SIZE` (ควรจำกัด Batch Size ป้องกัน DB Connection Overload)
// Normalize ข้อมูลจาก Excel เป็น UTF-8 NFC ก่อนประมวลผล - ติด `original_index` ทุก Item และ Normalize Encoding (UTF-8 NFC) สำหรับ ชื่อไฟล์ และ เลขเอกสารเก่า
const normalize = (str) => {
if (!str) return '';
return Buffer.from(str, 'utf8').toString('utf8').normalize('NFC');
};
return items.map(item => ({ #### Node 2: DB Lookup & Data Augmentation
...item,
json: {
...item.json,
document_number: normalize(item.json.document_number),
title: normalize(item.json.title),
// Mapping เลขอ้างอิงเก่า (Legacy Number) เพื่อนำไปเก็บใน details JSON
legacy_document_number: item.json.document_number
}
}));
```
#### Node 2: File Validator & Sanitizer - **Task:** ให้ n8n นำข้อมูลจาก Excel (เช่น รหัสโปรเจ็กต์, รหัสผู้ส่ง) ยิงคำสั่ง Query ไปยัง MariaDB เพื่อแปลงเป็น `id`
- **Queries:**
1. แปลง `project_code` -> `project_id`
2. แปลง `sender_code` -> `sender_organization_id`
3. แปลง `receiver_code` -> `receiver_organization_id`
4. หา Tags ที่มีอยู่ในโปรเจ็กต์: `SELECT * FROM tags WHERE project_id = {{project_id}}`
- **Output:** n8n เก็บ `project_id`, `organization_ids` และ `existing_tags_json` ไว้ในแต่ละ item
- *ถ้าหารหัสโปรเจ็กต์ไม่เจอ ให้ส่งเข้า Error Log ไม่ทำต่อ*
- ตรวจสอบไฟล์ PDF มีอยู่จริงบน NAS #### Node 3: File Processor (Extract PDF Text & Temp Upload)
- Normalize ชื่อไฟล์เป็น **UTF-8 NFC**
- Path Traversal Guard: resolved path ต้องอยู่ใน `/share/np-dms/staging_ai` เท่านั้น
- **Output 0** → valid → Node 3
- **Output 1** → error → Node 5D (ไม่หายเงียบ)
#### Node 3: AI Analysis (Sequential เท่านั้น) - ตรวจสอบไฟล์ PDF มีอยู่จริงบน NAS `/share/np-dms/staging_ai`
- **Extract PDF Text:** ใช้ Apache Tika สกัดข้อความจากเอกสาร
- **Two-Phase Storage (Upload):**
- n8n ยิง `POST /api/storage/upload` ส่งไฟล์ PDF เข้า Backend
- Backend อัพโหลดไฟล์, กำหนด `is_temporary = TRUE`
- Backend ส่งคืน `attachment_id` ให้ n8n (จะเรียกว่า `temp_attachment_id`)
#### Node 4: AI Analysis (Sequential เท่านั้น)
**System Prompt:** **System Prompt:**
```text ```text
You are a Document Controller for a large construction project. You are a Document Controller for a large construction project.
Your task is to validate document metadata and suggest relevant tags. Your task is to validate document metadata, summarize content, and suggest relevant tags.
You MUST respond ONLY with valid JSON. No explanation, no markdown, no extra text. You MUST respond ONLY with valid JSON. No explanation, no markdown.
If there are no issues, "detected_issues" must be an empty array [].
``` ```
**User Prompt (Category List มาจาก Backend ไม่ hardcode):** **User Prompt:**
```text ```text
Validate this document metadata and respond in JSON: Validate and summarize this document. Respond in JSON.
Document Number: {{$json.document_number}} Document Number: {{$json.document_number}}
Title: {{$json.title}} Title: {{$json.title}}
Expected Pattern: [ORG]-[TYPE]-[SEQ] e.g. "TCC-COR-0001" Extracted Text: {{$json.extracted_text}}
Category List (MUST match system enum exactly): {{$workflow.variables.system_categories}}
Analyze the document and suggest relevant tags based on: Existing Project Tags: {{$json.existing_tags_json}}
1. Document content/title keywords (e.g., "Foundation", "Structure", "Electrical", "Safety")
2. Document type indicators (e.g., "Drawing", "Report", "Inspection") Analyze the content to provide:
3. Organization codes present in document number 1. Validation of Subject/Dates with PDF text.
4. Any discipline or phase indicators 2. A 4-5 sentence summary.
3. Suggest tags. Select from Existing Project Tags if applicable. If no existing tag fits, suggest a NEW one (set is_new: true).
Respond ONLY with this exact JSON structure: Respond ONLY with this exact JSON structure:
{ {
"is_valid": true | false, "is_valid": true | false,
"confidence": 0.0 to 1.0, "confidence": 0.0 to 1.0,
"suggested_category": "<one from Category List>", "category": "Correspondence",
"detected_issues": ["<issue1>"], "summary": "<4-5 sentence summary>",
"suggested_title": "<corrected title or null>", "suggested_tags": [
"suggested_tags": ["<tag1>", "<tag2>"], {"name": "Structural", "description": "...", "is_new": false}
"tag_confidence": 0.0 to 1.0 ],
"detected_issues": []
} }
``` ```
**JSON Validation (ตรวจ Category ตรง Enum + Tag Normalization):** #### Node 5: Staging Ingestion (Insert to Review Queue)
```javascript
const systemCategories = $workflow.variables.system_categories;
if (!systemCategories.includes(result.suggested_category)) {
throw new Error(`Category "${result.suggested_category}" not in system enum: ${systemCategories.join(', ')}`);
}
// Tag Validation ข้อมูลทั้งหมดที่ผ่าน n8n และ AI Model **จะต้องไม่ถูกอัพเดทเข้าตารางหลักอัตโนมัติ** แต่จะถูกบังคับนำเข้าตาราง Staging `migration_review_queue` แทน เพื่อรอมนุษย์จัดการผ่าน Frontend UI
if (!Array.isArray(result.suggested_tags)) {
result.suggested_tags = [];
}
// Normalize: trim, lowercase, remove duplicates
result.suggested_tags = [...new Set(result.suggested_tags.map(t => String(t).trim()).filter(t => t.length > 0))];
// Tag confidence validation **Status Routing Policy:**
if (typeof result.tag_confidence !== 'number' || result.tag_confidence < 0 || result.tag_confidence > 1) { - `confidence >= 0.85` และ `is_valid = true` -> Status **`PENDING`** (พร้อมรับ Batch Import)
result.tag_confidence = 0.5; - `confidence >= 0.60` และ `< 0.85` -> Status **`PENDING`** (ติด Flag ให้ระวัง)
} - `confidence < 0.60` หรือ `is_valid = false` -> Status **`REJECTED`**
``` - Parse Error / AI ไม่ตอบ -> **Error Log** (Node ถัดไป)
#### Node 3.5: Fallback Model Manager **Insert into staging:**
- อัปเดต `migration_fallback_state` ทุกครั้งที่เกิด Parse Error
- Auto-switch ไป `OLLAMA_MODEL_FALLBACK` เมื่อ Error ≥ `FALLBACK_ERROR_THRESHOLD`
- ส่ง Alert Email เมื่อ Fallback ถูก Activate
#### Node 4: Confidence Router (4 outputs)
| เงื่อนไข | การดำเนินการ |
| ------------------------------------------ | -------------------------------- |
| `confidence >= 0.85` และ `is_valid = true` | **Output 0** → Auto Ingest |
| `confidence >= 0.60` และ `< 0.85` | **Output 1** → Review Queue |
| `confidence < 0.60` หรือ `is_valid = false` | **Output 2** → Reject Log |
| Parse Error / AI ไม่ตอบ | **Output 3** → Error Log |
| Fallback: Error > 5 ใน 10 Request | สลับ Model / หยุด Workflow + Alert |
**Revision Drift Protection:**
```javascript
// ถ้า Excel มี revision column — ตรวจสอบก่อน route
if (item.json.excel_revision !== undefined) {
const expectedRevision = (item.json.current_db_revision || 0) + 1;
if (parseInt(item.json.excel_revision) !== expectedRevision) {
item.json.review_reason = `Revision drift: Excel=${item.json.excel_revision}, Expected=${expectedRevision}`;
reviewQueue.push(item);
continue;
}
}
```
#### Node 5A: Auto Ingest — Backend API
> ⚠️ **Storage Enforcement:** n8n ส่งแค่ `source_file_path` — Backend จะ generate UUID, enforce path strategy (`/share/np-dms/staging_ai/...`), และ move file atomically ผ่าน StorageService
```http
POST /api/correspondences/import
Authorization: Bearer <MIGRATION_TOKEN>
Idempotency-Key: <document_number>:<batch_id>
Content-Type: application/json
```
**Backend Tag Handling Logic:**
เมื่อ Backend รับ Payload พร้อม `ai_tags` ระบบจะ:
1. **Validate Tags:** ตรวจสอบว่า tag name อยู่ในรูปแบบที่ถูกต้อง (ไม่ว่าง, ไม่มีอักขระพิเศษ)
2. **Create Missing Tags:** ถ้า Tag ไม่มีอยู่ใน `tags` table → สร้างใหม่โดยอัตโนมัติ
```sql ```sql
INSERT INTO tags (tag_name, created_by, created_at) INSERT INTO migration_review_queue (
VALUES ('<tag_name>', (SELECT user_id FROM users WHERE username = 'migration_bot'), NOW()) document_number, title, project_id, sender_organization_id, receiver_organization_id,
ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id); received_date, issued_date, remarks, ai_suggested_category, ai_confidence,
``` ai_issues, ai_summary, extracted_tags, temp_attachment_id, status
3. **Link Document Tags:** บันทึกความสัมพันธ์ใน `correspondence_tags` ) VALUES ( ... )
```sql ON DUPLICATE KEY UPDATE status = VALUES(status), ai_summary = VALUES(ai_summary);
INSERT INTO correspondence_tags (correspondence_id, tag_id)
SELECT LAST_INSERT_ID(), tag_id FROM tags WHERE tag_name IN (<ai_tags>);
```
4. **Tag Confidence Logging:** บันทึก `tag_confidence` ลงใน `details` JSON ของ Revision
Payload:
```json
{
"document_number": "{{document_number}}",
"title": "{{ai_result.suggested_title || title}}",
"category": "{{ai_result.suggested_category}}",
"source_file_path": "{{file_path}}",
"ai_confidence": "{{ai_result.confidence}}",
"ai_issues": "{{ai_result.detected_issues}}",
"ai_tags": "{{ai_result.suggested_tags}}",
"tag_confidence": "{{ai_result.tag_confidence}}",
"migrated_by": "SYSTEM_IMPORT",
"batch_id": "{{$env.MIGRATION_BATCH_ID}}"
}
``` ```
**Audit Log ที่ Backend ต้องสร้าง:** #### Node 6: Error Log & Reject Log
```json
{
"action": "IMPORT",
"source": "MIGRATION",
"batch_id": "migration_20260226",
"created_by": "SYSTEM_IMPORT",
"metadata": {
"migration": true,
"batch_id": "migration_20260226",
"ai_confidence": 0.91
}
}
```
**Checkpoint Update (ทุก 10 Records — ผ่าน IF Node + MariaDB Node):** - Parse Error → เขียนลงไฟล์ `/share/np-dms/n8n/migration_logs/error_log.csv`
```sql - ทุก 10-50 ราบการอัพเดท MariaDB `migration_progress` เพื่อเป็น Checkpoint.
INSERT INTO migration_progress (batch_id, last_processed_index, status)
VALUES ('{{$env.MIGRATION_BATCH_ID}}', {{checkpoint_index}}, 'RUNNING')
ON DUPLICATE KEY UPDATE
last_processed_index = {{checkpoint_index}},
updated_at = NOW();
```
#### Node 5B: Review Queue ---
> ⚠️ **`migration_review_queue` เป็น Temporary Table เท่านั้น** — ห้ามสร้าง Correspondence record จนกว่า Admin จะ Approve ### Phase 4: Frontend Management & Final Commit (UI -> Backend API)
Approval Flow: 1. หน้าจอ **Frontend Management UI** ดึงข้อมูลจาก `migration_review_queue`
``` 2. Admin สามารถ Browse & Edit ข้อมูล
Review → Admin Approve → POST /api/correspondences/import (เหมือน Auto Ingest) 3. **Tag Review:** Admin สามารถพิจารณา Tags ที่เป็น `is_new: true` ว่าควรตีตก หรือเปลี่ยนไปแมตช์ของเดิม
Admin Reject → ลบออกจาก queue ไม่สร้าง record 4. Admin กดปุ่ม **Execute Import** ส่งให้ Backend รัน Final Commit.
``` 5. Backend ยิงคำสั่งสร้าง Correspondence, นำ `temp_attachment_id` ไปผูกกับ Revision, ปรับเป็น `is_temporary = FALSE` และสร้าง/เชื่อม Tags จริง.
#### Node 5C: Reject Log → `/share/np-dms/n8n/migration_logs/reject_log.csv`
#### Node 5D: Error Log → `/share/np-dms/n8n/migration_logs/error_log.csv` + MariaDB
--- ---
@@ -35,8 +35,8 @@
│ └──────┬──────┘ │ │ └──────┬──────┘ │
│ │ │ │ │ │
│ ┌──────▼──────┐ ┌──────────────┐ ┌──────────────┐ │ │ ┌──────▼──────┐ ┌──────────────┐ ┌──────────────┐ │
│ │Pre-flight │───▶│Fetch Categories│──▶│File Validator│ │ │ │Pre-flight │───▶│DB Lookup & │──▶│File Upload & │ │
│ │Checks │ │from Backend │ │+ Sanitize │ │ │ │Checks │ │Data Fetch │ │Temp Storage │ │
│ └─────────────┘ └──────────────┘ └──────┬───────┘ │ │ └─────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │ │ │ │
│ ┌────────────────────────────┤ │ │ ┌────────────────────────────┤ │
@@ -57,9 +57,9 @@
│ ┌─────────┘ │ └─────────┐ │ │ ┌─────────┘ │ └─────────┐ │
│ ▼ ▼ ▼ │ │ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────────┐ ┌────────┐ │ │ ┌──────┐ ┌──────────┐ ┌────────┐ │
│ │Auto │ │ Review │ │Reject │ │ │ │Review│ │ Review │ │Reject │ │
│ │Ingest│ │ Queue │ │Log │ │ │ │Queue │ │ Queue │ │Log │ │
│ │+Chkpt│ │(DB only) │ │(CSV) │ │ │ │(AUTO)│ │(FLAGGED) │ │(CSV) │ │
│ └──────┘ └──────────┘ └────────┘ │ │ └──────┘ └──────────┘ └────────┘ │
│ │ │ │
└─────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────┘
@@ -227,70 +227,43 @@ mysql -h <DB_HOST> -u migration_bot -p lcbp3_production < lcbp3-v1.8.0-migration
- เก็บค่า Config ทั้งหมดใน `$workflow.staticData.config` - เก็บค่า Config ทั้งหมดใน `$workflow.staticData.config`
- อ่านผ่าน `$workflow.staticData.config.KEY` ใน Node อื่น - อ่านผ่าน `$workflow.staticData.config.KEY` ใน Node อื่น
### Node 1-2: Pre-flight Checks ### Node 1: Pre-flight Checks & Data Reader
- ตรวจสอบ Backend Health - ตรวจสอบ Backend Health และ Ollama Ping
- ดึง Categories จาก `/api/master/correspondence-types` - อ่าน Checkpoint (`last_processed_index`) จาก `migration_progress`
- ดึง Tags ที่มีอยู่แล้วจาก `/api/tags` (สำหรับ AI Tag Extraction) - Batch ข้อมูลจาก Excel ตามตาราง `BATCH_SIZE` ปกติ (50-100)
- ตรวจ File Mount (Read-only) - Normalize ข้อมูล UTF-8 (NFC) และสร้าง `original_index`
- เก็บ Categories และ Existing Tags ใน `$workflow.staticData.systemCategories`
### Node 3: Read Checkpoint ### Node 2: DB Lookup & Categories Fetch
- อ่าน `last_processed_index` จาก `migration_progress` - ดึง Categories จาก `/api/meta/categories` เพื่อเตรียม Prompt
- ถ้าไม่มี เริ่มจาก 0 - Query ทะลวง DB: แปลงรหัสใน Excel (`project_code`, `sender`, `receiver`) ให้เป็น IDs จาก MariaDB
- Query ดึง Master Tags ของโปรเจ็กต์: `SELECT tag_name, description FROM tags WHERE project_id = ...`
- Output: แปลง ID เรียบร้อยและเตรียม `existing_tags_json` ให้ Ollama
### Node 4: Process Batch ### Node 3: Text Extraction & Temp Upload
- อ่าน Excel - ใช้ **Apache Tika** (ผ่าน `Extract PDF Text` node หรือ HTTP Request) สกัดข้อความ (OCR/Text) ออกจาก PDF ใน staging
- Normalize UTF-8 (NFC) - แนบไฟล์ไปยัง Backend: ยิง HTTP Request **`POST /api/storage/upload`** ของ Backend
- ตัด Batch ตาม `BATCH_SIZE` - รอรับผลลัพธ์เป็น `temp_attachment_id` (หมายความว่าไฟล์นี้เข้าข่าย Temporary ถูกเก็บจัดการใน NAS เรียบร้อยแล้ว)
- Output: ไฟล์พร้อมใช้งาน, ได้เนื้อหา Text มาเตรียม prompt
### Node 5: File Validator ### Node 4: AI Analysis
- Sanitize filename (replace special chars) - วาง System Prompt บังคับ Output JSON
- Path traversal check - โยน Metadata (Title, Date, DB Lookups) พร้อม Extracted PDF Text คุยกับ **Ollama `llama3.2:3b`**
- ตรวจสอบไฟล์มีอยู่จริง - ให้ AI วิเคราะห์ และสรุปเป็น `ai_summary`
- **Output 2 ทาง**: Valid → AI, Error → Log - ให้ AI แนะนำ Tags ใหม่หรือเลือก Tags เดิมจาก `existing_tags_json`
### Node 6: Build AI Prompt ### Node 5: Parse & Validate
- ดึง Categories จาก `staticData` (ไม่ hardcode) - Schema Validation (ดูให้แน่ใจว่า AI ตอบ `is_valid`, `confidence`, `summary`, `suggested_tags`)
- เลือก Model ตาม Fallback State - Normalizing categories, trimming tags (`is_new: true / false` flag สำคัญมาก)
- สร้าง Prompt ตาม Template พร้อม **Tag Extraction Instructions** - จัดชุดค่า Status ใหม่
- AI จะวิเคราะห์ Title และ Document Number เพื่อสกัด Tags ที่เกี่ยวข้อง
### Node 7: Ollama AI Analysis ### Node 6: Confidence Router & Staging Ingest
- เรียก `POST /api/generate` **แยกสาย 4 สาย:**
- Timeout 30 วินาที 1. **PENDING (Auto Ready):** (`confidence ≥ 0.85` && `is_valid = true`) → INSERT เข้า `migration_review_queue`
- Retry 3 ครั้ง (n8n built-in) 2. **PENDING (Flagged):** (`confidence 0.60 - 0.84`) → INSERT เข้า `migration_review_queue` พร้อม Highlight/Remarks ให้ Admin ดูละเอียด
- AI Response รวม `suggested_tags` และ `tag_confidence` 3. **REJECTED:** (`confidence < 0.60` หรือ `is_valid = false`) → INSERT เข้า `migration_review_queue` สถานะรอแก้แบบ Manual
4. **Error/Parse Fail:** ไปลง CSV Reject Log + DB `migration_errors`
### Node 8: Parse & Validate **สำคัญมาก:** *n8n จะทำหน้าที่สูบข้อมูลและจัดเตรียมเข้า `migration_review_queue` เท่านั้น จะไม่มีการข้ามขั้นตอนไป Import ลงตารางหลัก `correspondences` อัตโนมัติ (Final Commit ต้องทำบน Frontend UI)*
- Parse JSON Response
- Schema Validation (is_valid, confidence, detected_issues)
- **Tag Validation**: Normalize tags (trim, lowercase, deduplicate)
- Enum Validation (ตรวจ Category ว่าอยู่ใน List หรือไม่)
- **Output 2 ทาง**: Success → Router, Error → Fallback
### Node 9: Confidence Router
- **4 Outputs**:
1. Auto Ingest (confidence ≥ 0.85 && is_valid)
2. Review Queue (0.60 ≤ confidence < 0.85)
3. Reject Log (confidence < 0.60 หรือ is_valid = false)
4. Error Log (parse error)
### Node 10A: Auto Ingest
- POST `/api/migration/import`
- Header: `Idempotency-Key: {doc_num}:{batch_id}`
- Payload รวม **ai_tags** และ **tag_confidence**
- Backend จะสร้าง Tags ที่ยังไม่มี และผูกกับเอกสารอัตโนมัติ
- บันทึก Checkpoint ทุก 10 records
### Node 10B: Review Queue
- INSERT เข้า `migration_review_queue` เท่านั้น
- ยังไม่สร้าง Correspondence
### Node 10C: Reject Log
- เขียน CSV ที่ `/home/node/.n8n-files/migration_logs/reject_log.csv`
### Node 10D: Error Log
- เขียน CSV + INSERT เข้า `migration_errors`
--- ---
@@ -189,6 +189,7 @@ T+1 เดือน:
| Dry Run 2 AI Category Accuracy | ≥ 90% (Manual Spot-check 50 docs) | Human Review | | Dry Run 2 AI Category Accuracy | ≥ 90% (Manual Spot-check 50 docs) | Human Review |
| Idempotency Test: รัน Batch ซ้ำ | 0 Duplicate Records | SQL Count | | Idempotency Test: รัน Batch ซ้ำ | 0 Duplicate Records | SQL Count |
| Organization Mapping ครบ | 100% | Lookup Table review | | Organization Mapping ครบ | 100% | Lookup Table review |
| Frontend Review UI พร้อมใช้งาน | ✅ | UAT Passed สำหรับหน้าจออนุมัติ |
| Migration Bot Token Active + Whitelisted | ✅ | API Test | | Migration Bot Token Active + Whitelisted | ✅ | API Test |
| Staging NAS Space: ≥ 500GB free | ✅ | QNAP Dashboard | | Staging NAS Space: ≥ 500GB free | ✅ | QNAP Dashboard |
@@ -202,7 +203,7 @@ T+1 เดือน:
|-------|---------| |-------|---------|
| Tier 1 Migration: 100% เสร็จ + Verified | ✅ | | Tier 1 Migration: 100% เสร็จ + Verified | ✅ |
| Tier 2 Migration: ≥ 90% เสร็จ + Verified | ✅ | | Tier 2 Migration: ≥ 90% เสร็จ + Verified | ✅ |
| Review Queue: ≤ 5% ค้างอยู่ (Critical Tier 1 = 0%) | ✅ | | Review Queue (รวมการพิจารณา AI New Tags): ≤ 5% ค้างอยู่ (Critical Tier 1 = 0%) | ✅ |
| Migration Bot Token: REVOKED | ✅ | | Migration Bot Token: REVOKED | ✅ |
| Integrity Queries ผ่านทั้งหมด | ✅ | | Integrity Queries ผ่านทั้งหมด | ✅ |
| Legacy System ยังเข้าถึงได้ (Read-only Fallback) | ✅ | | Legacy System ยังเข้าถึงได้ (Read-only Fallback) | ✅ |
@@ -229,7 +230,7 @@ T+1 เดือน:
| **Organization Lookup Table** | Superadmin (NAP) | สร้างก่อน T-6 | | **Organization Lookup Table** | Superadmin (NAP) | สร้างก่อน T-6 |
| **Tier 1 Document List** | Document Control ทุก Org | ยืนยัน T-5 | | **Tier 1 Document List** | Document Control ทุก Org | ยืนยัน T-5 |
| **Daily Monitoring (n8n Runs)** | Nattanin P. | T-3 ถึง Go-Live | | **Daily Monitoring (n8n Runs)** | Nattanin P. | T-3 ถึง Go-Live |
| **Admin Review Queue** | Document Control (สค.) | ทุกเช้าวันทำงาน | | **Admin Review Queue & AI Tag Approval** | Document Control (สค.) | ทุกเช้าวันทำงาน (บังคับตรวจสอบ New Tags) |
| **Post-migration Verification** | Nattanin P. | After each Gate | | **Post-migration Verification** | Nattanin P. | After each Gate |
| **Legacy System Archival** | กทท. IT + NAP | T+30 | | **Legacy System Archival** | กทท. IT + NAP | T+30 |
@@ -25,9 +25,18 @@ CREATE TABLE IF NOT EXISTS migration_review_queue (
document_number VARCHAR(100) NOT NULL, document_number VARCHAR(100) NOT NULL,
title TEXT, title TEXT,
original_title TEXT, original_title TEXT,
project_id INT NULL COMMENT 'Project ID จาก Lookups',
sender_organization_id INT NULL COMMENT 'Sender ID จาก Lookups',
receiver_organization_id INT NULL COMMENT 'Receiver ID จาก Lookups',
received_date DATE NULL COMMENT 'วันที่รับเอกสาร',
issued_date DATE NULL COMMENT 'วันที่ออกเอกสาร',
remarks TEXT COMMENT 'หมายเหตุจากหน้างาน (response)',
ai_suggested_category VARCHAR(50), ai_suggested_category VARCHAR(50),
ai_confidence DECIMAL(4, 3), ai_confidence DECIMAL(4, 3),
ai_issues JSON, ai_issues JSON,
ai_summary TEXT COMMENT 'สรุปเนื้อหาจาก AI (4-5 บรรทัด)',
extracted_tags JSON COMMENT 'Tag ที่ AI นำเสนอหรือจับคู่ได้',
temp_attachment_id INT NULL COMMENT 'ID ของไฟล์ชั่วคราวจาก Two-Phase Storage',
review_reason VARCHAR(255), review_reason VARCHAR(255),
STATUS ENUM('PENDING', 'APPROVED', 'REJECTED') DEFAULT 'PENDING', STATUS ENUM('PENDING', 'APPROVED', 'REJECTED') DEFAULT 'PENDING',
reviewed_by VARCHAR(100), reviewed_by VARCHAR(100),
@@ -85,14 +85,15 @@
| Component | รายละเอียด | | Component | รายละเอียด |
| ---------------------- | ------------------------------------------------------------------------------- | | ---------------------- | ------------------------------------------------------------------------------- |
| Migration Orchestrator | n8n (Docker บน QNAP NAS) | | Migration Orchestrator | n8n (Docker บน QNAP NAS) |
| AI Model Primary | Ollama `llama3.2:3b` | | AI Model Primary | Ollama `llama3.2:3b` (Validation, Summarization, Tagging) |
| AI Model Fallback | Ollama `mistral:7b-instruct-q4_K_M` | | AI Model Fallback | Ollama `mistral:7b-instruct-q4_K_M` |
| Hardware | QNAP NAS (Orchestrator) + Desktop Desk-5439 (AI Processing, RTX 2060 SUPER 8GB) | | Hardware | QNAP NAS (Orchestrator) + Desktop Desk-5439 (AI Processing, RTX 2060 SUPER 8GB) |
| Data Ingestion | RESTful API + Migration Token (7 วัน) + Idempotency-Key Header | | DB Lookup (n8n) | n8n ทำการ Query `project_id`, `organization_id` และดึง `Tags` จาก DB ให้ AI |
| Concurrency | Sequential — 1 Request/ครั้ง, Delay 2 วินาที | | Data Ingestion | 1. Staging ลง `migration_review_queue` -> 2. กดยืนยันผ่าน Frontend Management UI -> 3. Final Commit ผ่าน API |
| Checkpoint | MariaDB `migration_progress` | | Concurrency (n8n) | Sequential — Batch Size 50-100 ป้องกัน DB Connection Overload |
| Checkpoint | MariaDB `migration_progress` และการใช้ `ON DUPLICATE KEY UPDATE` ใน Staging |
| Fallback | Auto-switch Model เมื่อ Error ≥ Threshold | | Fallback | Auto-switch Model เมื่อ Error ≥ Threshold |
| Storage | Backend StorageService เท่านั้น — ห้าม move file โดยตรง | | Storage | Two-Phase Storage: 1. `POST /api/storage/upload` (Temp) -> 2. Commit ภายหลัง |
| Expected Runtime | ~16.6 ชั่วโมง (~3–4 คืน) สำหรับ 20,000 records | | Expected Runtime | ~16.6 ชั่วโมง (~3–4 คืน) สำหรับ 20,000 records |
--- ---
@@ -105,31 +106,42 @@
"confidence": 0.92, "confidence": 0.92,
"suggested_category": "Correspondence", "suggested_category": "Correspondence",
"detected_issues": [], "detected_issues": [],
"suggested_title": null "suggested_title": null,
"summary": "This document outlines the revised design specifications for the electrical subsystem in phase 2...",
"suggested_tags": [
{ "name": "Electrical", "description": "Electrical engineering design documents.", "is_new": false },
{ "name": "Phase2-Specs", "description": "Specific requirements for Phase 2 implementation.", "is_new": true }
]
} }
``` ```
| Field | Type | คำอธิบาย | | Field | Type | คำอธิบาย |
| -------------------- | ------------------------- | --------------------------- | | -------------------- | ------------------------- | --------------------------- |
| `is_valid` | boolean | เอกสารผ่านการตรวจสอบหรือไม่ | | `is_valid` | boolean | เอกสารผ่านการตรวจสอบหรือไม่ (เปรียบเทียบ subject vs pdf) |
| `confidence` | float (0.01.0) | ความมั่นใจของ AI | | `confidence` | float (0.01.0) | ความมั่นใจของ AI |
| `suggested_category` | string (enum จาก Backend) | หมวดหมู่ที่ AI แนะนำ | | `suggested_category` | string (enum จาก Backend) | หมวดหมู่ที่ AI แนะนำ |
| `detected_issues` | string[] | รายการปัญหา (array ว่างถ้าไม่มี) | | `detected_issues` | string[] | รายการปัญหา (array ว่างถ้าไม่มี) |
| `suggested_title` | string \| null | Title ที่แก้ไขแล้ว หรือ null | | `suggested_title` | string \| null | Title ที่แก้ไขแล้ว หรือ null |
| `summary` | string | สรุปเนื้อหา 4-5 ประโยค สำหรับใส่ใน `body` |
| `suggested_tags` | array of objects | รายการ Tags ที่จับคู่ได้ หรือ แนะนำให้สร้างใหม่ (`is_new: true`) |
> ⚠️ **Patch:** `suggested_category` ต้องตรงกับ System Enum จาก `GET /api/meta/categories` เท่านั้น — ห้าม hardcode Category List ใน Prompt > ⚠️ **Patch:** `suggested_category` ต้องตรงกับ System Enum จาก `GET /api/meta/categories` เท่านั้น — ห้าม hardcode Category List ใน Prompt
--- ---
## Confidence Threshold Policy ## Confidence Threshold Policy (Staging Logic)
| ระดับ Confidence | การดำเนินการ | **ข้อมูลทุกชุดจาก n8n จะต้องถูกส่งเข้าตาราง `migration_review_queue` เสมอ** โดยจัดสถานะเบื้องต้นตาม Confidence:
| ระดับ Confidence | สถานะใน Review Queue |
| ------------------------------- | --------------------------------------- | | ------------------------------- | --------------------------------------- |
| `>= 0.85` และ `is_valid = true` | Auto Ingest เข้าระบบ | | `>= 0.85` และ `is_valid = true` | `PENDING` (พร้อมให้ Admin เลือก Batch Import) |
| `0.600.84` | ส่งไป Human Review Queue | | `0.600.84` | `PENDING` (ไฮไลต์แจ้งให้ Admin ตรวจสอบข้อมูลก่อน) |
| `< 0.60` หรือ `is_valid = false` | ส่งไป Reject Log รอ Manual Fix | | `< 0.60` หรือ `is_valid = false` | `REJECTED` (รอให้ Admin แก้ไขข้อมูล Manual) |
| AI Parse Error | ส่งไป Error Log + Trigger Fallback Logic | | AI Parse Error | ส่งไป Error Log + Trigger Fallback Logic |
| Revision Drift | ส่งไป Review Queue พร้อม reason | | Revision Drift | `PENDING` พร้อมระบุ reason: "Revision drift" |
> ⚠️ **Tag Review:** ข้อมูลใดที่มี `is_new: true` ใน `suggested_tags` จะถูกบังคับให้ Admin ตรวจสอบบน Frontend UI ก่อน เพื่อป้องกัน AI สร้าง Tags ขยะซ้ำซ้อน
--- ---
@@ -161,32 +173,45 @@ Hard Rules:
--- ---
## Storage Governance (Patch) ## Storage Governance (Two-Phase Storage)
**ข้อห้าม:** **ข้อห้าม:**
``` ```
❌ mv /data/dms/staging_ai/TCC-COR-0001.pdf /final/path/... ❌ mv /data/dms/staging_ai/TCC-COR-0001.pdf /final/path/...
``` ```
**ข้อบังคับ:** **ข้อบังคับ (Two-Phase Strategy):**
**Phase 1: Temp Upload (โดย n8n)**
``` ```
✅ POST /api/correspondences/import ✅ POST /api/storage/upload
body: { source_file_path: "/data/dms/staging_ai/TCC-COR-0001.pdf", ... } (Upload ไฟล์ PDF ได้ผลลัพธ์เป็น attachment_id เช่น 1024)
*ไฟล์จะถูกระบุเป็น `is_temporary = TRUE`*
``` ```
Backend จะ: **Phase 2: Final Commit (โดย Frontend UI -> Backend API)**
1. Generate UUID ```
2. Enforce path strategy: `/data/dms/uploads/YYYY/MM/{uuid}.pdf` ✅ POST /api/migration/commit_batch
3. Move file atomically ผ่าน StorageService body: { queue_ids: [1, 2, 3] }
4. Create revision folder ถ้าจำเป็น ```
Backend จะทำหน้าที่:
1. อ่านข้อมูลจาก `migration_review_queue` ซึ่งมี `temp_attachment_id` อยู่
2. นำ `temp_attachment_id` ไปเชื่อมกับเอกสาร (Link to `correspondence_attachments`)
3. เปลี่ยนสถานะอัพเดต `is_temporary = FALSE`
4. Move ไฟล์ไปที่ `/data/dms/uploads/YYYY/MM/{uuid}.pdf` ผ่าน StorageService อย่างถูกต้อง
--- ---
## Review Queue Contract ## Review Queue Contract & Frontend UI
- `migration_review_queue` เป็น **Temporary Table เท่านั้น** — ไม่ใช่ Business Schema - `migration_review_queue` เป็น **Staging Table หลัก** (ไม่ auto-ingest ข้ามขั้นตอนนี้)
- ห้ามสร้าง Correspondence record จนกว่า Admin จะ Approve - ห้ามสร้าง Correspondence record จนกว่า Admin จะสั่ง Execute การ Import จากหน้าจอ
- Approval Flow: `Review → Admin Approve → POST /api/correspondences/import` - **Approval Flow:**
1. N8N Insert เข้า `migration_review_queue` (พร้อม `temp_attachment_id`)
2. Admin Review บน Frontend UI (ให้ความสำคัญกับการเช็ค `is_new: true` Tags)
3. Admin เลือก Rows แล้วกด **"Execute Import"**
4. Frontend ส่งคำสั่ง `POST /api/migration/commit_batch` ถือว่าเป็นการ Ingest ลงตาราง Business Schema จริง
--- ---