690503:0135 Update workflow #01
CI / CD Pipeline / build (push) Failing after 6m6s
CI / CD Pipeline / deploy (push) Has been skipped

This commit is contained in:
2026-05-03 01:35:05 +07:00
parent d239b58387
commit 2c24991f88
85 changed files with 6335 additions and 100 deletions
@@ -551,7 +551,19 @@ export class CorrespondenceService {
if (!correspondence) {
throw new NotFoundException('Correspondence', publicId);
}
return correspondence;
// ADR-021: expose live workflow state (null-safe — Draft \u0e22\u0e31\u0e07\u0e44\u0e21\u0e48\u0e21\u0e35 workflow instance)
const workflowInstance = await this.workflowEngine.getInstanceByEntity(
'correspondence',
correspondence.publicId
);
return {
...correspondence,
workflowInstanceId: workflowInstance?.id ?? null,
workflowState: workflowInstance?.currentState ?? null,
availableActions: workflowInstance?.availableActions ?? [],
};
}
async addReference(id: number, dto: AddReferenceDto) {
@@ -12,6 +12,8 @@ export class WorkflowHistoryItemDto {
toState!: string;
action!: string;
actionByUserId?: number;
// ADR-019: UUID ของ User ผู้ดำเนินการ — expose แทน INT PK ในทุก API Response
actorUuid?: string;
comment?: string;
metadata?: Record<string, unknown>;
attachments!: AttachmentSummaryDto[];
@@ -4,11 +4,13 @@ import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import {
ArrayMaxSize,
IsArray,
IsInt,
IsNotEmpty,
IsObject,
IsOptional,
IsString,
IsUUID,
Min,
} from 'class-validator';
export class WorkflowTransitionDto {
@@ -47,4 +49,15 @@ export class WorkflowTransitionDto {
@ArrayMaxSize(20)
@IsOptional()
attachmentPublicIds?: string[];
@ApiPropertyOptional({
description:
'Optimistic lock version — ส่งค่าที่ได้จาก GET /instances/:id เพื่อป้องกัน Double-approval (ADR-001 v1.1 FR-002). Server ตอบ 409 ถ้าค่าไม่ตรง',
example: 5,
minimum: 1,
})
@IsInt()
@Min(1)
@IsOptional()
versionNo?: number;
}
@@ -47,6 +47,17 @@ export class WorkflowHistory {
})
actionByUserId?: number;
// ADR-019: UUID ของ User ผู้ดำเนินการ — expose ใน API Response แทน INT PK
// NULL = System Action หรือ Pre-migration record (Delta 10)
@Column({
name: 'action_by_user_uuid',
length: 36,
nullable: true,
comment:
'UUID ของ User ผู้ดำเนินการ — ใช้ใน API Response per ADR-019. INT FK action_by_user_id ยังคงอยู่สำหรับ Internal use',
})
actionByUserUuid?: string;
@Column({ type: 'text', nullable: true, comment: 'ความเห็นประกอบการอนุมัติ' })
comment?: string;
@@ -85,4 +85,15 @@ export class WorkflowInstance {
@UpdateDateColumn({ name: 'updated_at' })
updatedAt!: Date;
// ADR-001 v1.1 FR-002: Optimistic lock — incremented on every successful transition
// Client ส่งค่านี้มาด้วยทุกครั้งที่ transition; Server reject HTTP 409 ถ้าไม่ตรง
@Column({
name: 'version_no',
type: 'int',
default: 1,
comment:
'Optimistic lock counter — incremented on each successful transition (ADR-001 v1.1 FR-002)',
})
versionNo!: number;
}
@@ -34,9 +34,11 @@ describe('WorkflowTransitionGuard', () => {
const mockRequest = (
params: Record<string, string> = {},
user: MockUserPayload = mockUser
user: MockUserPayload = mockUser,
action = 'APPROVE'
): Partial<RequestWithUser> => ({
params,
body: { action },
user: user as RequestWithUser['user'],
});
@@ -120,6 +122,7 @@ describe('WorkflowTransitionGuard', () => {
expect(userService.getUserPermissions).toHaveBeenCalledWith(123);
expect(instanceRepo.findOne).toHaveBeenCalledWith({
where: { id: 'instance-123' },
relations: ['definition'],
});
});
@@ -276,6 +279,130 @@ describe('WorkflowTransitionGuard', () => {
});
});
// T025: DSL require.role → CASL ability mapping tests
describe('DSL CASL Role Mapping (FR-002a)', () => {
it('should allow access when DSL requires OrgAdmin role and user has organization.manage_users', async () => {
userService.getUserPermissions.mockResolvedValue([
'organization.manage_users',
]);
const mockInstance = {
id: 'instance-dsl-1',
currentState: 'PENDING_REVIEW',
context: { organizationId: 99 }, // Different org — Level 2 would deny
contractId: null,
definition: {
compiled: {
states: {
PENDING_REVIEW: {
transitions: {
APPROVE: { requirements: { roles: ['OrgAdmin'] } },
},
},
},
},
},
};
instanceRepo.findOne.mockResolvedValue(mockInstance);
const context = mockContext(
mockRequest({ id: 'instance-dsl-1' }, mockUser, 'APPROVE')
);
const result = await guard.canActivate(context);
expect(result).toBe(true);
});
it('should allow access when DSL requires ContractMember and user has contract.view', async () => {
userService.getUserPermissions.mockResolvedValue(['contract.view']);
const mockInstance = {
id: 'instance-dsl-2',
currentState: 'REVIEW',
context: { organizationId: 99 },
contractId: null,
definition: {
compiled: {
states: {
REVIEW: {
transitions: {
SUBMIT: { requirements: { roles: ['ContractMember'] } },
},
},
},
},
},
};
instanceRepo.findOne.mockResolvedValue(mockInstance);
const context = mockContext(
mockRequest({ id: 'instance-dsl-2' }, mockUser, 'SUBMIT')
);
const result = await guard.canActivate(context);
expect(result).toBe(true);
});
it('should deny when DSL requires OrgAdmin but user only has contract.view', async () => {
userService.getUserPermissions.mockResolvedValue(['contract.view']);
const mockInstance = {
id: 'instance-dsl-3',
currentState: 'PENDING',
context: { organizationId: 99 },
contractId: null,
definition: {
compiled: {
states: {
PENDING: {
transitions: {
APPROVE: { requirements: { roles: ['OrgAdmin'] } },
},
},
},
},
},
};
instanceRepo.findOne.mockResolvedValue(mockInstance);
const context = mockContext(
mockRequest({ id: 'instance-dsl-3' }, mockUser, 'APPROVE')
);
await expect(guard.canActivate(context)).rejects.toThrow(
ForbiddenException
);
});
it('should fall through to Level 3 when DSL role is AssignedHandler', async () => {
userService.getUserPermissions.mockResolvedValue(['document.view']);
const mockInstance = {
id: 'instance-dsl-4',
currentState: 'ASSIGNED',
context: { organizationId: 99, assignedUserId: 123 }, // same as mockUser.user_id
contractId: null,
definition: {
compiled: {
states: {
ASSIGNED: {
transitions: {
COMPLETE: {
requirements: { roles: ['AssignedHandler'] },
},
},
},
},
},
},
};
instanceRepo.findOne.mockResolvedValue(mockInstance);
const context = mockContext(
mockRequest({ id: 'instance-dsl-4' }, mockUser, 'COMPLETE')
);
// AssignedHandler → falls to Level 3 check → passes because assignedUserId === user_id
const result = await guard.canActivate(context);
expect(result).toBe(true);
});
});
describe('Level 4: Unauthorized Users', () => {
it('should deny access for regular users without any special permissions', async () => {
// Arrange
@@ -12,7 +12,17 @@ import {
import { InjectRepository } from '@nestjs/typeorm';
import { DataSource, Repository } from 'typeorm';
import { WorkflowInstance } from '../entities/workflow-instance.entity';
import { CompiledWorkflow } from '../workflow-dsl.service';
import { UserService } from '../../../modules/user/user.service';
// FR-002a: DSL require.role → CASL ability สตาติก mapping (research.md Decision 2)
// 'ไม่รู้จัก' DSL role → fall through ไป Level 3 (assignedUserId) check
const DSL_ROLE_TO_CASL: Record<string, string> = {
Superadmin: 'system.manage_all',
OrgAdmin: 'organization.manage_users',
ContractMember: 'contract.view',
AssignedHandler: '__assigned__', // ไม่ map ไป CASL — จัดการโดย Level 3 check
};
import type { RequestWithUser } from '../../../common/interfaces/request-with-user.interface';
/**
@@ -39,6 +49,8 @@ export class WorkflowTransitionGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const request = context.switchToHttp().getRequest<RequestWithUser>();
const instanceId = request.params['id'];
// FR-002a: action \u0e2a\u0e33\u0e2b\u0e23\u0e31\u0e1a DSL role check (\u0e15\u0e23\u0e27\u0e08\u0e2a\u0e2d\u0e1a requirements.roles \u0e02\u0e2d\u0e07 transition \u0e17\u0e35\u0e48\u0e15\u0e49\u0e2d\u0e07\u0e01\u0e32\u0e23\u0e17\u0e33)
const action = (request.body as { action?: string }).action ?? '';
const user = request.user;
// ดึงสิทธิ์ทั้งหมดของ User จาก DB (ตาม pattern เดียวกับ RbacGuard)
@@ -51,15 +63,37 @@ export class WorkflowTransitionGuard implements CanActivate {
return true;
}
// ดึง Instance เพื่อตรวจสอบ Context
// ดึง Instance + Definition เพื่อตรวจสอบ Context และ DSL require.role
const instance = await this.instanceRepo.findOne({
where: { id: instanceId },
relations: ['definition'],
});
if (!instance) {
throw new NotFoundException('Workflow Instance', instanceId);
}
// FR-002a: DSL require.role → CASL ability check
// ตรวจสอบ requirements.roles ของ CompiledTransition ที่ตรงกับ action ที่ Request ขอ
// (ยังต้องผ่าน contract membership check Level 2.5)
const compiled = instance.definition?.compiled as
| CompiledWorkflow
| undefined;
const stateConfig = compiled?.states?.[instance.currentState];
// CompiledTransition.requirements.roles — ไม่ใช่ stateConfig.require (ซึ่งไม่มี)
const requiredDslRoles: string[] =
stateConfig?.transitions?.[action]?.requirements?.roles ?? [];
let dslRoleAuthorized = false;
for (const dslRole of requiredDslRoles) {
const caslAbility = DSL_ROLE_TO_CASL[dslRole];
if (caslAbility && caslAbility !== '__assigned__') {
if (userPermissions.includes(caslAbility)) {
dslRoleAuthorized = true;
break;
}
}
}
// Level 2: Org Admin — organization.manage_users + สังกัดองค์กรเดียวกับเอกสาร
const docOrgId = instance.context?.organizationId as number | undefined;
if (
@@ -99,16 +133,21 @@ export class WorkflowTransitionGuard implements CanActivate {
}
}
// Level 3: Assigned Handler — User นี้ถูก Assign มาให้ทำ Step นี้โดยตรง
// Level 3: Assigned Handler หรือ DSL CASL-authorized role
// FR-002a: ถ้า DSL require.role ตรงกับ CASL ability ของ User → ผ่าน
// (กรณี AssignedHandler ใน DSL → ตรวจสอบผ่าน assignedUserId ใน context)
const assignedUserId = instance.context?.assignedUserId as
| number
| undefined;
if (assignedUserId !== undefined && user.user_id === assignedUserId) {
if (
dslRoleAuthorized ||
(assignedUserId !== undefined && user.user_id === assignedUserId)
) {
return true;
}
this.logger.warn(
`Unauthorized transition attempt: User ${user.user_id} on Instance ${instanceId}`
`Unauthorized transition attempt: User ${user.user_id} on Instance ${instanceId} (DSL roles: [${requiredDslRoles.join(', ')}])`
);
throw new ForbiddenException({
userMessage: 'คุณไม่มีสิทธิ์ดำเนินการในขั้นตอนนี้',
@@ -93,6 +93,19 @@ export class WorkflowEngineController {
return this.workflowService.evaluate(dto);
}
@Post('definitions/validate')
@ApiOperation({
summary: 'FR-025: ตรวจสอบความถูกต้องของ DSL โดยไม่บันทึกข้อมูล',
})
@ApiResponse({
status: 200,
description: '{ valid: true } หรือ { valid: false, errors: [...] }',
})
@RequirePermission('system.manage_all')
validateDefinition(@Body() body: { dsl: Record<string, unknown> }) {
return this.workflowService.validateDsl(body.dsl);
}
// =================================================================
// Runtime Engine (User Actions)
// =================================================================
@@ -117,6 +130,8 @@ export class WorkflowEngineController {
}
const userId = req.user.user_id;
// ADR-019: ใช้ publicId (UUID) แทน INT PK สำหรับ History record
const userUuid = req.user.publicId;
// ตรวจ Redis ว่า Request นี้ถูกส่งมาแล้วหรือไม่ (key ผูกกับ userId ป้องกัน cross-user replay)
const cacheKey = `idempotency:transition:${idempotencyKey}:${userId}`;
@@ -131,7 +146,9 @@ export class WorkflowEngineController {
userId,
dto.comment,
dto.payload,
dto.attachmentPublicIds // ADR-021: step-specific attachments
dto.attachmentPublicIds, // ADR-021: step-specific attachments
userUuid, // ADR-019: UUID สำหรับ history record
dto.versionNo // ADR-001 v1.1 FR-002: Optimistic lock
);
// เก็บใน Redis 24 ชั่วโมง (86400 วินาที = 86400000 ms ใน cache-manager v7)
@@ -2,6 +2,7 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { BullModule } from '@nestjs/bullmq';
import {
makeCounterProvider,
makeHistogramProvider,
@@ -16,7 +17,8 @@ import { Attachment } from '../../common/file-storage/entities/attachment.entity
// Services
import { WorkflowDslService } from './workflow-dsl.service';
import { WorkflowEngineService } from './workflow-engine.service';
import { WorkflowEventService } from './workflow-event.service'; // [NEW]
import { WorkflowEventService } from './workflow-event.service';
import { WorkflowEventProcessor } from './workflow-event.processor';
// Guards
import { WorkflowTransitionGuard } from './guards/workflow-transition.guard';
@@ -33,6 +35,9 @@ import { WorkflowEngineController } from './workflow-engine.controller';
WorkflowHistory,
Attachment, // ADR-021: ใช้ link attachments ประจำ Step
]),
// FR-005/006: BullMQ queues สำหรับ workflow events + Dead-Letter Queue
BullModule.registerQueue({ name: 'workflow-events' }),
BullModule.registerQueue({ name: 'workflow-events-failed' }),
UserModule,
],
controllers: [WorkflowEngineController],
@@ -40,6 +45,7 @@ import { WorkflowEngineController } from './workflow-engine.controller';
WorkflowEngineService,
WorkflowDslService,
WorkflowEventService,
WorkflowEventProcessor, // FR-005: BullMQ Processor + DLQ handler
WorkflowTransitionGuard,
// ADR-021 S1: Redlock observability — Prometheus metrics
makeHistogramProvider({
@@ -52,6 +58,18 @@ import { WorkflowEngineController } from './workflow-engine.controller';
name: 'workflow_redlock_acquire_failures_total',
help: 'จำนวนครั้งที่ Redlock acquire ล้มเหลวหลัง retry ครบ (Fail-closed HTTP 503)',
}),
// FR-023: Per-transition metrics — labelled by workflow_code, action, outcome
makeCounterProvider({
name: 'workflow_transitions_total',
help: 'จำนวน workflow transitions ทั้งหมด จำแนกตาม workflow_code, action และ outcome',
labelNames: ['workflow_code', 'action', 'outcome'],
}),
makeHistogramProvider({
name: 'workflow_transition_duration_ms',
help: 'เวลาที่ใช้ในการ process workflow transition ทั้งหมด (ms) รวม Redlock + DB transaction',
labelNames: ['workflow_code'],
buckets: [50, 100, 250, 500, 1000, 2500, 5000, 10000],
}),
],
exports: [WorkflowEngineService], // Export Service ให้ Module อื่น (Correspondence, RFA) เรียกใช้
})
@@ -35,13 +35,22 @@ import { CreateWorkflowDefinitionDto } from './dto/create-workflow-definition.dt
const DEFAULT_REDIS_TOKEN = 'default_IORedisModuleConnectionToken';
describe('WorkflowEngineService', () => {
let compiledModule: TestingModule;
let service: WorkflowEngineService;
let defRepo: Repository<WorkflowDefinition>;
let instanceRepo: Repository<WorkflowInstance>;
let attachmentRepo: { find: jest.Mock; update: jest.Mock };
let dslService: WorkflowDslService;
let eventService: WorkflowEventService;
// Mock Objects
const mockCasQueryBuilder = {
update: jest.fn().mockReturnThis(),
set: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
execute: jest.fn().mockResolvedValue({ affected: 1 }),
};
const mockQueryRunner = {
connect: jest.fn(),
startTransaction: jest.fn(),
@@ -52,6 +61,8 @@ describe('WorkflowEngineService', () => {
findOne: jest.fn(),
save: jest.fn(),
update: jest.fn(),
// ADR-001 v1.1 FR-002: CAS version increment mock
createQueryBuilder: jest.fn().mockReturnValue(mockCasQueryBuilder),
},
};
@@ -85,7 +96,7 @@ describe('WorkflowEngineService', () => {
});
mockRedlockRelease.mockClear();
const module: TestingModule = await Test.createTestingModule({
compiledModule = await Test.createTestingModule({
providers: [
WorkflowEngineService,
{
@@ -151,14 +162,30 @@ describe('WorkflowEngineService', () => {
inc: jest.fn(),
},
},
// FR-023: Per-transition metrics mocks
{
provide: 'PROM_METRIC_WORKFLOW_TRANSITIONS_TOTAL',
useValue: {
labels: jest.fn().mockReturnThis(),
inc: jest.fn(),
},
},
{
provide: 'PROM_METRIC_WORKFLOW_TRANSITION_DURATION_MS',
useValue: {
labels: jest.fn().mockReturnThis(),
observe: jest.fn(),
},
},
],
}).compile();
service = module.get<WorkflowEngineService>(WorkflowEngineService);
defRepo = module.get(getRepositoryToken(WorkflowDefinition));
instanceRepo = module.get(getRepositoryToken(WorkflowInstance));
dslService = module.get(WorkflowDslService);
eventService = module.get(WorkflowEventService);
service = compiledModule.get<WorkflowEngineService>(WorkflowEngineService);
defRepo = compiledModule.get(getRepositoryToken(WorkflowDefinition));
instanceRepo = compiledModule.get(getRepositoryToken(WorkflowInstance));
attachmentRepo = compiledModule.get(getRepositoryToken(Attachment));
dslService = compiledModule.get(WorkflowDslService);
eventService = compiledModule.get(WorkflowEventService);
});
it('should be defined', () => {
@@ -563,11 +590,13 @@ describe('WorkflowEngineService', () => {
id: 'inst-1',
currentState: 'PENDING_REVIEW',
status: WorkflowStatus.ACTIVE,
definition: { compiled: mockCompiledWorkflow },
definition: { compiled: mockCompiledWorkflow, workflow_code: 'WF01' },
context: {},
versionNo: 1,
});
mockQueryRunner.manager.save.mockResolvedValue({ id: 'history-1' });
mockQueryRunner.manager.update.mockResolvedValue({ affected: 1 });
mockCasQueryBuilder.execute.mockResolvedValue({ affected: 1 });
mockDslService.evaluate.mockReturnValue({
nextState: 'APPROVED',
events: [],
@@ -585,4 +614,283 @@ describe('WorkflowEngineService', () => {
});
});
});
// ============================================================
// T024: ADR-001 v1.1 FR-002 — Optimistic Lock Tests
// ============================================================
describe('Optimistic Lock (FR-002)', () => {
const baseInstance = {
id: 'inst-opt-1',
currentState: 'PENDING_REVIEW',
status: WorkflowStatus.ACTIVE,
definition: { compiled: mockCompiledWorkflow, workflow_code: 'WF01' },
context: {},
versionNo: 5,
};
it('T024a: should throw ConflictException (409) when clientVersionNo does not match current versionNo (fast-fail)', async () => {
// Arrange: DB มี version_no=5, client ส่ง version_no=3 (ล้าสมัย)
(instanceRepo.findOne as jest.Mock).mockResolvedValue({
id: 'inst-opt-1',
versionNo: 5,
});
// Act + Assert
await expect(
service.processTransition(
'inst-opt-1',
'APPROVE',
1,
undefined,
{},
undefined,
'user-uuid-123',
3 // clientVersionNo ล้าสมัย
)
).rejects.toThrow(ConflictException);
// Fast-fail: Redlock ต้องไม่ถูกเรียก (ผ่าน check ก่อน acquire)
expect(mockRedlockAcquire).not.toHaveBeenCalled();
});
it('T024b: should pass fast-fail and proceed when clientVersionNo matches current versionNo', async () => {
// Arrange: clientVersionNo ตรงกับ DB
(instanceRepo.findOne as jest.Mock).mockResolvedValue({
id: 'inst-opt-1',
currentState: 'PENDING_REVIEW',
versionNo: 5,
});
mockQueryRunner.manager.findOne.mockResolvedValue({
...baseInstance,
versionNo: 5,
});
mockQueryRunner.manager.save.mockResolvedValue({ id: 'history-1' });
mockCasQueryBuilder.execute.mockResolvedValue({ affected: 1 });
mockDslService.evaluate.mockReturnValue({
nextState: 'APPROVED',
events: [],
});
// Act
const result = await service.processTransition(
'inst-opt-1',
'APPROVE',
1,
undefined,
{},
undefined,
'user-uuid-123',
5 // clientVersionNo ตรง
);
// Assert: สำเร็จ + คืน versionNo ใหม่
expect(result.success).toBe(true);
expect(result.versionNo).toBe(6); // 5 + 1
expect(mockRedlockAcquire).toHaveBeenCalled();
});
it('T024c: should throw ConflictException when CAS update returns affected=0 (TOCTOU edge case)', async () => {
// Arrange: fast-fail ผ่าน (ไม่ส่ง clientVersionNo), แต่ CAS ล้มเหลว
(instanceRepo.findOne as jest.Mock).mockResolvedValue({
id: 'inst-opt-1',
currentState: 'PENDING_REVIEW',
versionNo: 5,
});
mockQueryRunner.manager.findOne.mockResolvedValue({
...baseInstance,
versionNo: 5,
});
mockQueryRunner.manager.save.mockResolvedValue({ id: 'history-1' });
// CAS: เกิด TOCTOU — version_no ถูกเปลี่ยนระหว่าง Redlock acquire กับ CAS update
mockCasQueryBuilder.execute.mockResolvedValue({ affected: 0 });
mockDslService.evaluate.mockReturnValue({
nextState: 'APPROVED',
events: [],
});
// Act + Assert
await expect(
service.processTransition(
'inst-opt-1',
'APPROVE',
1,
undefined,
{},
undefined
// ไม่ส่ง clientVersionNo — TOCTOU ถูกตรวจโดย CAS layer
)
).rejects.toThrow(ConflictException);
expect(mockQueryRunner.rollbackTransaction).toHaveBeenCalled();
expect(mockQueryRunner.commitTransaction).not.toHaveBeenCalled();
});
it('T024d: should rollback attachments to temp when DB transaction fails (FR-019)', async () => {
// Arrange: commit ล้มเหลว — คาดว่า attachments จะถูก revert กลับเป็น temp
(instanceRepo.findOne as jest.Mock).mockResolvedValue(null); // no pre-check needed (no attachment state)
mockQueryRunner.manager.findOne.mockResolvedValue({
...baseInstance,
versionNo: 5,
});
mockQueryRunner.manager.save.mockResolvedValue({ id: 'history-1' });
// CAS สำเร็จ
mockCasQueryBuilder.execute.mockResolvedValue({ affected: 1 });
// commitTransaction ล้มเหลว
mockQueryRunner.commitTransaction.mockRejectedValueOnce(
new Error('DB connection lost')
);
mockDslService.evaluate.mockReturnValue({
nextState: 'APPROVED',
events: [],
});
// Act + Assert
await expect(
service.processTransition(
'inst-opt-1',
'APPROVE',
1,
undefined,
{},
['att-rollback-1', 'att-rollback-2'] // แนบไฟล์ 2 ไฟล์
)
).rejects.toThrow(Error);
// FR-019: attachmentRepo.update ต้องถูกเรียกเพื่อ revert ไฟล์กลับเป็น temp
expect(attachmentRepo.update).toHaveBeenCalledWith(
expect.objectContaining({
publicId: ['att-rollback-1', 'att-rollback-2'],
}),
expect.objectContaining({ isTemporary: true })
);
});
});
// ============================================================
// T048: ADR-001 FR-007 — DSL Redis Cache Invalidation Tests
// ============================================================
describe('DSL Redis Cache Invalidation (FR-007, SC-005)', () => {
it('T048a: update() should invalidate cache when DSL changes', async () => {
// Arrange
const mockDef = {
id: 'def-cache-1',
workflow_code: 'RFA_V1',
version: 2,
is_active: false,
dsl: {},
compiled: {},
};
(defRepo.findOne as jest.Mock).mockResolvedValue(mockDef);
(defRepo.save as jest.Mock).mockResolvedValue({ ...mockDef, version: 2 });
mockDslService.compile.mockReturnValue(mockCompiledWorkflow);
const cacheManager = compiledModule.get<{
del: jest.Mock;
set: jest.Mock;
get: jest.Mock;
}>(CACHE_MANAGER);
// Act
await service.update('def-cache-1', {
dsl: {
workflow: 'RFA_V1',
states: [],
} as unknown as import('./dto/create-workflow-definition.dto').CreateWorkflowDefinitionDto['dsl'],
});
// Assert: cache del เรียกด้วย version key
expect(cacheManager.del).toHaveBeenCalledWith('wf:def:RFA_V1:2');
// Assert: re-cache เรียกหลัง del
expect(cacheManager.set).toHaveBeenCalledWith(
'wf:def:RFA_V1:2',
expect.any(Object),
3_600_000
);
});
it('T048b: update() should invalidate active pointer when is_active toggles to true', async () => {
// Arrange: definition เดิม is_active = false
const mockDef = {
id: 'def-cache-2',
workflow_code: 'TRANSMITTAL_V1',
version: 1,
is_active: false,
dsl: {},
compiled: {},
};
(defRepo.findOne as jest.Mock).mockResolvedValue(mockDef);
(defRepo.save as jest.Mock).mockResolvedValue({
...mockDef,
is_active: true,
});
const cacheManager = compiledModule.get<{
del: jest.Mock;
set: jest.Mock;
get: jest.Mock;
}>(CACHE_MANAGER);
// Act: activate definition
await service.update('def-cache-2', { is_active: true });
// Assert: active pointer ถูกลบออกจาก cache
expect(cacheManager.del).toHaveBeenCalledWith(
'wf:def:TRANSMITTAL_V1:active'
);
});
it('T048c: createDefinition() should set cache with version key after save', async () => {
// Arrange
(defRepo.findOne as jest.Mock).mockResolvedValue({ version: 3 });
(defRepo.create as jest.Mock).mockReturnValue({
workflow_code: 'WF_CACHE',
version: 4,
});
(defRepo.save as jest.Mock).mockResolvedValue({
workflow_code: 'WF_CACHE',
version: 4,
});
mockDslService.compile.mockReturnValue(mockCompiledWorkflow);
const cacheManager = compiledModule.get<{
del: jest.Mock;
set: jest.Mock;
get: jest.Mock;
}>(CACHE_MANAGER);
// Act
await service.createDefinition({
workflow_code: 'WF_CACHE',
dsl: {},
} as import('./dto/create-workflow-definition.dto').CreateWorkflowDefinitionDto);
// Assert: cache set ด้วย version key
expect(cacheManager.set).toHaveBeenCalledWith(
'wf:def:WF_CACHE:4',
expect.objectContaining({ workflow_code: 'WF_CACHE', version: 4 }),
3_600_000
);
});
it('T048d: getDefinitionById() should return from cache on cache hit', async () => {
// Arrange: cache มีข้อมูลอยู่แล้ว
const cachedDef = {
id: 'def-hit-1',
workflow_code: 'CACHED_WF',
version: 1,
};
const cacheManager = compiledModule.get<{
del: jest.Mock;
set: jest.Mock;
get: jest.Mock;
}>(CACHE_MANAGER);
cacheManager.get.mockResolvedValueOnce(cachedDef);
// Act
const result = await service.getDefinitionById('def-hit-1');
// Assert: ไม่ต้องออก DB
expect(result).toEqual(cachedDef);
expect(defRepo.findOne).not.toHaveBeenCalled();
});
});
});
@@ -32,7 +32,11 @@ import { CreateWorkflowDefinitionDto } from './dto/create-workflow-definition.dt
import { EvaluateWorkflowDto } from './dto/evaluate-workflow.dto';
import { UpdateWorkflowDefinitionDto } from './dto/update-workflow-definition.dto';
import { WorkflowHistoryItemDto } from './dto/workflow-history-item.dto';
import { CompiledWorkflow, WorkflowDslService } from './workflow-dsl.service';
import {
CompiledWorkflow,
RawWorkflowDSL,
WorkflowDslService,
} from './workflow-dsl.service';
import { WorkflowEventService } from './workflow-event.service'; // [NEW] Import Event Service
// Legacy Interface (Backward Compatibility)
@@ -79,7 +83,12 @@ export class WorkflowEngineService {
@InjectMetric('workflow_redlock_acquire_duration_ms')
private readonly redlockAcquireDuration: Histogram<string>,
@InjectMetric('workflow_redlock_acquire_failures_total')
private readonly redlockAcquireFailures: Counter<string>
private readonly redlockAcquireFailures: Counter<string>,
// FR-023: Per-transition metrics — labelled by workflow_code, action, outcome
@InjectMetric('workflow_transitions_total')
private readonly transitionsTotal: Counter<string>,
@InjectMetric('workflow_transition_duration_ms')
private readonly transitionDuration: Histogram<string>
) {
// ADR-021 Clarify Q2 (C1): Redlock Fail-closed
// Retry 3 ครั้ง × 500ms เพิ่ม jitter → ถ้ายังไม่ได้ throw HTTP 503
@@ -95,6 +104,30 @@ export class WorkflowEngineService {
// [PART 1] Definition Management (Phase 6A)
// =================================================================
/**
* FR-025: ตรวจสอบ DSL โดยไม่บันทึก — ใช้สำหรับ inline validation ใน Admin Editor
*/
validateDsl(
dsl: Record<string, unknown>
):
| { valid: true }
| { valid: false; errors: { path: string; message: string }[] } {
try {
this.dslService.compile(dsl as unknown as RawWorkflowDSL);
return { valid: true };
} catch (error: unknown) {
return {
valid: false,
errors: [
{
path: '',
message: error instanceof Error ? error.message : String(error),
},
],
};
}
}
/**
* สร้างหรืออัปเดต Workflow Definition ใหม่ (Auto Versioning)
*/
@@ -122,6 +155,12 @@ export class WorkflowEngineService {
});
const saved = await this.workflowDefRepo.save(entity);
// T044: Cache definition per version (TTL 1h, SC-005)
await this.cacheManager.set(
`wf:def:${saved.workflow_code}:${saved.version}`,
saved,
3_600_000
);
this.logger.log(
`Created Workflow Definition: ${saved.workflow_code} v${saved.version}`
);
@@ -155,10 +194,30 @@ export class WorkflowEngineService {
}
}
const prevIsActive = definition.is_active;
if (dto.is_active !== undefined) definition.is_active = dto.is_active;
if (dto.workflow_code) definition.workflow_code = dto.workflow_code;
return this.workflowDefRepo.save(definition);
const updated = await this.workflowDefRepo.save(definition);
// T045: Invalidate version cache เมื่อ DSL เปลี่ยน
if (dto.dsl) {
await this.cacheManager.del(
`wf:def:${updated.workflow_code}:${updated.version}`
);
}
// T045: Invalidate active pointer เมื่อ is_active เปลี่ยน
if (dto.is_active !== undefined && dto.is_active !== prevIsActive) {
await this.cacheManager.del(`wf:def:${updated.workflow_code}:active`);
}
// T045: Re-cache updated definition
await this.cacheManager.set(
`wf:def:${updated.workflow_code}:${updated.version}`,
updated,
3_600_000
);
return updated;
}
/**
@@ -181,10 +240,17 @@ export class WorkflowEngineService {
* ดึง Workflow Definition ตาม ID หรือ Code
*/
async getDefinitionById(id: string): Promise<WorkflowDefinition> {
// T046: Read-through cache (TTL 1h, SC-005)
const cacheKey = `wf:def:id:${id}`;
const cached = await this.cacheManager.get<WorkflowDefinition>(cacheKey);
if (cached) return cached;
const definition = await this.workflowDefRepo.findOne({ where: { id } });
if (!definition) {
throw new NotFoundException('Workflow Definition', id);
}
await this.cacheManager.set(cacheKey, definition, 3_600_000);
return definition;
}
@@ -317,7 +383,7 @@ export class WorkflowEngineService {
: [];
return {
id: instance.id,
id: instance.id, // publicId (UUID) ของ workflow instance
currentState: instance.currentState,
availableActions,
};
@@ -333,11 +399,49 @@ export class WorkflowEngineService {
comment?: string,
payload: Record<string, unknown> = {},
// ADR-021: publicIds ของไฟล์แนบประจำ Step นี้ (Two-Phase upload ก่อนแล้ว)
attachmentPublicIds?: string[]
attachmentPublicIds?: string[],
// ADR-019: UUID ของ User สำหรับ history record (ไม่ expose INT PK)
userUuid?: string,
// ADR-001 v1.1 FR-002: Optimistic lock — Client ส่งมาเพื่อป้องกัน Double-approval
clientVersionNo?: number
) {
// FR-022/023: เริ่มจับเวลาทั้ง method เพื่อบันทึก latency metric
const startMs = Date.now();
let outcome:
| 'success'
| 'conflict'
| 'forbidden'
| 'validation_error'
| 'system_error' = 'system_error';
let workflowCode = 'unknown';
let fromState: string | undefined;
let toState: string | undefined;
const hasAttachments =
attachmentPublicIds !== undefined && attachmentPublicIds.length > 0;
// ==============================================================
// ADR-001 v1.1 FR-002: Fast-fail Optimistic Lock Check (ก่อน Redlock)
// ลดภาระ Redlock สำหรับ Client ที่ส่ง version_no ล้าสมัยมา
// ==============================================================
if (clientVersionNo !== undefined) {
const current = await this.instanceRepo.findOne({
where: { id: instanceId },
select: ['id', 'versionNo'],
});
if (!current) {
throw new NotFoundException('Workflow Instance', instanceId);
}
if (current.versionNo !== clientVersionNo) {
outcome = 'conflict';
throw new ConflictException(
'WORKFLOW_VERSION_CONFLICT',
`Fast-fail: expected version_no=${clientVersionNo}, actual=${current.versionNo}`,
'เอกสารถูกอนุมัติโดยผู้อื่นแล้ว กรุณารีเฟรชและลองใหม่',
['รีเฟรชหน้าแล้วดูสถานะล่าสุดก่อนดำเนินการ']
);
}
}
// ==============================================================
// ADR-021 Clarify Q1 (C3): ตรวจสถานะก่อน acquire Redlock
// อนุญาตให้แนบไฟล์เฉพาะในสถานะ PENDING_REVIEW / PENDING_APPROVAL
@@ -453,8 +557,10 @@ export class WorkflowEngineService {
context
);
const fromState = instance.currentState;
const toState = evaluation.nextState;
fromState = instance.currentState;
toState = evaluation.nextState;
// FR-023: บันทึก workflowCode สำหรับ metric labels
workflowCode = instance.definition?.workflow_code ?? 'unknown';
// 3. อัปเดต Instance
instance.currentState = toState;
@@ -474,6 +580,8 @@ export class WorkflowEngineService {
toState,
action,
actionByUserId: userId,
// ADR-019 FR-003: UUID ของ User สำหรับ API Response (INT PK ไม่ expose)
actionByUserUuid: userUuid,
comment,
metadata: {
events: evaluation.events,
@@ -516,6 +624,27 @@ export class WorkflowEngineService {
}
}
// ADR-001 v1.1 FR-002: CAS version increment หลัง commit ใน DB transaction
// UPDATE จะล้มเหลว (affected=0) ถ้า version_no ถูกเปลี่ยนระหว่างนี้ (TOCTOU edge case)
const casResult = await queryRunner.manager
.createQueryBuilder()
.update(WorkflowInstance)
.set({ versionNo: () => 'version_no + 1' })
.where('id = :id AND version_no = :expected', {
id: instanceId,
expected: instance.versionNo,
})
.execute();
if ((casResult.affected ?? 0) === 0) {
throw new ConflictException(
'WORKFLOW_VERSION_CONFLICT',
'version_no changed between Redlock acquisition and CAS update (TOCTOU edge case)',
'เกิด Conflict กรุณารีเฟรชและลองใหม่',
['รีเฟรชหน้า', 'ลองดำเนินการอีกครั้ง']
);
}
await queryRunner.commitTransaction();
// ADR-021 T043: Invalidate Workflow History cache หลัง transition สำเร็จ
@@ -536,23 +665,85 @@ export class WorkflowEngineService {
void this.eventService.dispatchEvents(
instance.id,
evaluation.events,
context
context,
workflowCode // FR-005: DLQ notification \u0e43\u0e0a\u0e49 workflowCode \u0e23\u0e30\u0e1a\u0e38\u0e1a\u0e23\u0e34\u0e1a\u0e17\u0e18\u0e34\u0e4c Ops
);
}
outcome = 'success';
// FR-014 T014: คืน versionNo ที่ increment แล้ว ให้ Client เก็บไว้สำหรับ request ถัดไป
const newVersionNo = instance.versionNo + 1;
return {
success: true,
previousState: fromState,
nextState: toState,
events: evaluation.events,
isCompleted: instance.status === WorkflowStatus.COMPLETED,
versionNo: newVersionNo,
};
} catch (err) {
await queryRunner.rollbackTransaction();
// FR-019: Rollback file attachments กลับเป็น temporary เมื่อ DB transaction ล้มเหลว
// ไฟล์บน disk ยังคงอยู่ที่ permanent storage; cleanup job จะจัดการหลัง 24h TTL
if (
hasAttachments &&
attachmentPublicIds &&
attachmentPublicIds.length > 0
) {
await this.attachmentRepo
.update(
{ publicId: In(attachmentPublicIds), uploadedByUserId: userId },
{
isTemporary: true,
expiresAt: new Date(Date.now() + 24 * 60 * 60 * 1000),
}
)
.catch((rollbackErr: unknown) =>
this.logger.error(
`FR-019 Attachment rollback failed for ${instanceId}: ${rollbackErr instanceof Error ? rollbackErr.message : String(rollbackErr)}`
)
);
this.logger.warn(
`FR-019: Reverted ${attachmentPublicIds.length} attachment(s) to temp for instance ${instanceId} after DB failure`
);
}
// จำแนก outcome สำหรับ metric label
if (err instanceof ConflictException) outcome = 'conflict';
else if ((err as { status?: number }).status === 403)
outcome = 'forbidden';
else if (err instanceof WorkflowException) outcome = 'validation_error';
this.logger.error(
`Transition Failed for ${instanceId}: ${(err as Error).message}`
);
throw err;
} finally {
const durationMs = Date.now() - startMs;
// FR-023: บันทึก transition duration histogram
this.transitionDuration
.labels({ workflow_code: workflowCode })
.observe(durationMs);
// FR-023: บันทึก transition counter ตาม outcome
this.transitionsTotal
.labels({ workflow_code: workflowCode, action, outcome })
.inc();
// FR-022: Structured log entry ทุก transition (success/failure/conflict)
this.logger.log(
JSON.stringify({
instanceId,
action,
fromState,
toState,
userUuid,
durationMs,
outcome,
workflowCode,
})
);
await queryRunner.release();
// ADR-021 C1: ปล่อย Redlock เสมอ (non-blocking หาก release ผิดพลาด)
lock.release().catch((e: unknown) => {
@@ -0,0 +1,165 @@
// File: src/modules/workflow-engine/workflow-event.processor.spec.ts
// T026: Unit tests for WorkflowEventProcessor DLQ + n8n webhook (FR-005, FR-006)
import { Test, TestingModule } from '@nestjs/testing';
import { getQueueToken } from '@nestjs/bullmq';
import { WorkflowEventProcessor } from './workflow-event.processor';
import type { WorkflowEventJobData } from './workflow-event.processor';
import type { Job } from 'bullmq';
// Mock global fetch สำหรับ n8n webhook
const mockFetch = jest.fn();
global.fetch = mockFetch;
describe('WorkflowEventProcessor', () => {
let processor: WorkflowEventProcessor;
let failedQueue: { add: jest.Mock };
const makeJob = (
overrides: Partial<{
id: string;
attemptsMade: number;
opts: { attempts: number };
data: Record<string, unknown>;
}> = {}
) =>
({
id: 'job-001',
attemptsMade: 3,
opts: { attempts: 3 },
data: {
instanceId: 'inst-wf-1',
events: [{ type: 'notify', target: 'admin', template: 'APPROVED' }],
context: {},
workflowCode: 'RFA_V1',
},
...overrides,
}) as unknown as Job<WorkflowEventJobData>;
beforeEach(async () => {
failedQueue = { add: jest.fn().mockResolvedValue(undefined) };
mockFetch.mockReset();
const module: TestingModule = await Test.createTestingModule({
providers: [
WorkflowEventProcessor,
{
provide: getQueueToken('workflow-events-failed'),
useValue: failedQueue,
},
],
}).compile();
processor = module.get<WorkflowEventProcessor>(WorkflowEventProcessor);
});
afterEach(() => {
delete process.env['N8N_WEBHOOK_URL'];
});
describe('onJobFailed()', () => {
it('T026a: should add dead-letter job to workflow-events-failed queue when attempts exhausted', async () => {
// Arrange: job.attemptsMade === job.opts.attempts (หมด retry)
const job = makeJob({ attemptsMade: 3, opts: { attempts: 3 } });
const error = new Error('Notification service timeout');
// Act
await processor.onJobFailed(job, error);
// Assert: ส่งไปยัง DLQ
expect(failedQueue.add).toHaveBeenCalledWith(
'dead-letter',
expect.objectContaining({
originalJobId: 'job-001',
queue: 'workflow-events',
error: 'Notification service timeout',
data: expect.objectContaining({ instanceId: 'inst-wf-1' }),
})
);
});
it('T026b: should NOT add to DLQ when job still has retry attempts remaining', async () => {
// Arrange: attempt 1 of 3 — ยังมี retry เหลือ
const job = makeJob({ attemptsMade: 1, opts: { attempts: 3 } });
const error = new Error('Temporary error');
// Act
await processor.onJobFailed(job, error);
// Assert: ไม่ส่ง DLQ
expect(failedQueue.add).not.toHaveBeenCalled();
expect(mockFetch).not.toHaveBeenCalled();
});
it('T026c: should POST to n8n webhook when N8N_WEBHOOK_URL is configured', async () => {
// Arrange: ตั้งค่า webhook URL
process.env['N8N_WEBHOOK_URL'] = 'https://n8n.example.com/webhook/dlq';
mockFetch.mockResolvedValue({ ok: true });
const job = makeJob({ attemptsMade: 3, opts: { attempts: 3 } });
const error = new Error('Service down');
// Act
await processor.onJobFailed(job, error);
// Assert: เรียก n8n webhook
expect(mockFetch).toHaveBeenCalledWith(
'https://n8n.example.com/webhook/dlq',
expect.objectContaining({
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: expect.stringContaining('"event":"workflow_event_failed"'),
})
);
// Body ต้องมี workflowCode + instanceId
const callArgs = mockFetch.mock.calls[0] as [string, RequestInit];
const body = JSON.parse(callArgs[1].body as string) as Record<
string,
unknown
>;
expect(body).toMatchObject({
event: 'workflow_event_failed',
jobId: 'job-001',
workflowCode: 'RFA_V1',
instanceId: 'inst-wf-1',
error: 'Service down',
});
});
it('T026d: should warn (not throw) when N8N_WEBHOOK_URL is not set', async () => {
// Arrange: ไม่ตั้ง env var
delete process.env['N8N_WEBHOOK_URL'];
const job = makeJob({ attemptsMade: 3, opts: { attempts: 3 } });
const error = new Error('Error');
// Act — ต้องไม่ throw
await expect(processor.onJobFailed(job, error)).resolves.toBeUndefined();
// DLQ ยังต้องถูกเรียก — แค่ไม่ call webhook
expect(failedQueue.add).toHaveBeenCalled();
expect(mockFetch).not.toHaveBeenCalled();
});
it('T026e: should continue without throwing when DLQ add fails', async () => {
// Arrange: DLQ queue ล้มเหลว — ไม่ควร throw ออกมา
failedQueue.add.mockRejectedValueOnce(new Error('Redis DLQ down'));
const job = makeJob({ attemptsMade: 3, opts: { attempts: 3 } });
const error = new Error('Original error');
// Act: ต้อง resolve ปกติ ไม่ throw
await expect(processor.onJobFailed(job, error)).resolves.toBeUndefined();
});
});
describe('process()', () => {
it('T026f: should process notify event without error', async () => {
const job = makeJob();
// Act — ต้อง resolve ปกติ
await expect(processor.process(job)).resolves.toBeUndefined();
});
});
});
@@ -0,0 +1,133 @@
// File: src/modules/workflow-engine/workflow-event.processor.ts
// FR-005/FR-006: BullMQ Processor สำหรับ workflow-events queue พร้อม Dead-Letter Queue
import {
Processor,
WorkerHost,
OnWorkerEvent,
InjectQueue,
} from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job, Queue } from 'bullmq';
import { RawEvent } from './workflow-dsl.service';
export interface WorkflowEventJobData {
instanceId: string;
events: RawEvent[];
context: Record<string, unknown>;
workflowCode?: string;
}
@Processor('workflow-events', {
concurrency: 5,
limiter: { max: 100, duration: 60_000 },
})
export class WorkflowEventProcessor extends WorkerHost {
private readonly logger = new Logger(WorkflowEventProcessor.name);
constructor(
// FR-006: Queue สำหรับ Dead-Letter (jobs ที่หมด retry)
@InjectQueue('workflow-events-failed')
private readonly failedQueue: Queue
) {
super();
}
// ADR-008: ประมวลผล workflow event job
process(job: Job<WorkflowEventJobData>): Promise<void> {
const { instanceId, events } = job.data;
this.logger.log(
`Processing ${events.length} event(s) for Instance ${instanceId} (Job: ${job.id})`
);
// ประมวลผลแต่ละ event (throw เพื่อให้ BullMQ retry อัตโนมัติ)
for (const event of events) {
this.processSingleEvent(instanceId, event, job.data.context);
}
return Promise.resolve();
}
// FR-006: Dead-Letter Queue handler — เรียกเมื่อ job หมด retry ทั้งหมด
@OnWorkerEvent('failed')
async onJobFailed(
job: Job<WorkflowEventJobData>,
error: Error
): Promise<void> {
const maxAttempts = job.opts.attempts ?? 3;
if ((job.attemptsMade ?? 0) < maxAttempts) {
// ยังมี retry เหลือ — ไม่ต้องส่ง DLQ
return;
}
this.logger.error(
`Job ${job.id} exhausted all ${maxAttempts} retries for Instance ${job.data.instanceId}: ${error.message}`
);
// ส่งไปยัง Dead-Letter Queue
await this.failedQueue
.add('dead-letter', {
originalJobId: job.id,
queue: 'workflow-events',
data: job.data,
failedAt: new Date().toISOString(),
error: error.message,
})
.catch((dlqErr: unknown) =>
this.logger.error(
`Failed to add job ${job.id} to DLQ: ${dlqErr instanceof Error ? dlqErr.message : String(dlqErr)}`
)
);
// แจ้ง Ops ผ่าน n8n webhook (ถ้าตั้งค่าไว้)
const webhookUrl = process.env['N8N_WEBHOOK_URL'];
if (webhookUrl) {
await fetch(webhookUrl, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
event: 'workflow_event_failed',
jobId: job.id,
workflowCode: job.data.workflowCode,
instanceId: job.data.instanceId,
error: error.message,
timestamp: new Date().toISOString(),
}),
}).catch((webhookErr: unknown) => {
// Warning เท่านั้น — ไม่ throw เพื่อไม่กระทบ DLQ add ที่สำเร็จแล้ว
this.logger.warn(
`n8n webhook failed for job ${job.id}: ${webhookErr instanceof Error ? webhookErr.message : String(webhookErr)}`
);
});
} else {
this.logger.warn(
`N8N_WEBHOOK_URL not configured — DLQ job created without ops notification (job: ${job.id})`
);
}
}
// --- Private Handlers ---
private processSingleEvent(
instanceId: string,
event: RawEvent,
_context: Record<string, unknown>
): void {
switch (event.type) {
case 'notify':
this.logger.log(
`[NOTIFY] Instance ${instanceId} → target: "${event.target}" | template: "${event.template}"`
);
break;
case 'webhook':
this.logger.log(
`[WEBHOOK] Instance ${instanceId} → url: "${event.target}"`
);
break;
case 'auto_action':
this.logger.log(`[AUTO_ACTION] Instance ${instanceId}`);
break;
default:
this.logger.warn(`Unknown event type: ${event.type} for ${instanceId}`);
}
}
}
@@ -1,6 +1,8 @@
// File: src/modules/workflow-engine/workflow-event.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { RawEvent } from './workflow-dsl.service';
// Interface สำหรับ External Services ที่จะมารับ Event ต่อ
@@ -19,81 +21,47 @@ export interface WorkflowEventHandler {
export class WorkflowEventService {
private readonly logger = new Logger(WorkflowEventService.name);
// สามารถ Inject NotificationService หรือ HttpService เข้ามาได้ตรงนี้
// constructor(private readonly notificationService: NotificationService) {}
constructor(
// ADR-008: ใช้ BullMQ queue แทน inline processing เพื่อ Retry + DLQ (FR-005)
@InjectQueue('workflow-events')
private readonly workflowEventQueue: Queue
) {}
/**
* ประมวลผลรายการ Events ที่เกิดจากการเปลี่ยนสถานะ
* เพิ่ม Job ลงใน workflow-events queue (ADR-008: Async ไม่ Block Response)
* Processor: WorkflowEventProcessor (workflow-event.processor.ts)
*/
dispatchEvents(
instanceId: string,
events: RawEvent[],
context: Record<string, unknown>
) {
context: Record<string, unknown>,
workflowCode?: string
): void {
if (!events || events.length === 0) return;
this.logger.log(
`Dispatching ${events.length} events for Instance ${instanceId}`
`Enqueuing ${events.length} event(s) for Instance ${instanceId} → workflow-events queue`
);
// ทำแบบ Async ไม่รอผล (Fire-and-forget) เพื่อไม่ให้กระทบ Response Time ของ User
void Promise.allSettled(
events.map((event) => this.processSingleEvent(instanceId, event, context))
).then((results) => {
// Log errors if any
results.forEach((res, idx) => {
if (res.status === 'rejected') {
this.logger.error(
`Failed to process event [${idx}]: ${String(res.reason)}`
);
// ADR-008: Fire-and-forget — ไม่ await เพื่อไม่กระทบ Response Time
// WorkflowEventProcessor จะประมวลผลและ retry อัตโนมัติ (3 retries, exponential backoff)
void this.workflowEventQueue
.add(
'process-events',
{ instanceId, events, context, workflowCode },
{
attempts: 3,
backoff: { type: 'exponential', delay: 500 },
removeOnComplete: { age: 86_400 }, // เก็บ 24h
removeOnFail: false, // เก็บไว้ใน Bull Board + DLQ
}
});
});
}
private async processSingleEvent(
instanceId: string,
event: RawEvent,
context: Record<string, unknown>
) {
await Promise.resolve();
try {
switch (event.type) {
case 'notify':
this.handleNotify(event, context);
break;
case 'webhook':
this.handleWebhook(event, context);
break;
case 'auto_action':
// Logic สำหรับ Auto Transition (เช่น ถ้าผ่านเงื่อนไข ให้ไปต่อเลย)
this.logger.log(`Auto Action triggered for ${instanceId}`);
break;
default:
this.logger.warn(`Unknown event type: ${event.type}`);
}
} catch (error) {
this.logger.error(
`Error processing event ${event.type}: ${String(error)}`
)
.catch((err: unknown) =>
this.logger.error(
`Failed to enqueue workflow events for ${instanceId}: ${
err instanceof Error ? err.message : String(err)
}`
)
);
throw error;
}
}
// --- Handlers ---
private handleNotify(event: RawEvent, _context: Record<string, unknown>) {
// Mockup: ในของจริงจะเรียก NotificationService.send()
// const recipients = this.resolveRecipients(event.target, context);
this.logger.log(
`[EVENT] Notify target: "${event.target}" | Template: "${event.template}"`
);
}
private handleWebhook(event: RawEvent, _context: Record<string, unknown>) {
// Mockup: เรียก HttpService.post()
this.logger.log(
`[EVENT] Webhook to: "${event.target}" | Payload: ${JSON.stringify(event.payload)}`
);
}
}