Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
612 lines
19 KiB
Markdown
612 lines
19 KiB
Markdown
# Asynchronous Workflow Implementation Update
|
|
|
|
## Overview
|
|
|
|
This document provides an update to our asynchronous workflow implementation plan based on an analysis of the current codebase. While many of the foundational components exist, there are critical integration gaps that need to be addressed to achieve a fully asynchronous workflow system.
|
|
|
|
## Current Implementation Status
|
|
|
|
### Components Already Implemented
|
|
|
|
1. **Redis Stream Client (`redisStreamClient.ts`)**
|
|
- Full implementation for publishing/consuming events via Redis Streams
|
|
- Support for consumer groups and pending message claiming
|
|
- Message acknowledgment and error handling
|
|
|
|
2. **Distributed Lock Mechanism (`distributedLock.ts`)**
|
|
- Complete Redis-based distributed lock implementation
|
|
- Functions for acquiring, releasing, and extending locks
|
|
- Helper functions for executing code with locks
|
|
|
|
3. **Distributed Transaction Support (`distributedTransaction.ts`)**
|
|
- Integration of database transactions with distributed locks
|
|
- Error handling and retry mechanisms
|
|
|
|
4. **Worker Framework (`workflowWorker.ts`)**
|
|
- Robust implementation with health monitoring and metrics
|
|
- Support for concurrent event processing and pending message claiming
|
|
- Graceful shutdown handling
|
|
|
|
### Critical Integration Gaps
|
|
|
|
1. **Missing Workflow Runtime Integration**
|
|
- The `TypeScriptWorkflowRuntime` class in `workflowRuntime.ts` still uses in-memory state tracking
|
|
- No `enqueueEvent` method for "fire and forget" event submission
|
|
- No `processQueuedEvent` method referenced by the worker
|
|
- No proper event sourcing pattern for state derivation
|
|
|
|
2. **Incomplete Database Integration**
|
|
- The event processing record handling only partially implemented
|
|
- No transaction management for ensuring atomic operations
|
|
|
|
3. **Missing Runtime-Worker Connection**
|
|
- The worker is implemented but not properly connected to the workflow runtime
|
|
- No service startup/initialization for workers
|
|
|
|
## Implementation Steps
|
|
|
|
### 1. Enhance TypeScriptWorkflowRuntime
|
|
|
|
Update `server/src/lib/workflow/core/workflowRuntime.ts` to add support for asynchronous processing:
|
|
|
|
```typescript
|
|
interface ProcessQueuedEventParams {
|
|
eventId: string;
|
|
executionId: string;
|
|
processingId: string;
|
|
workerId: string;
|
|
tenant: string;
|
|
}
|
|
|
|
interface ProcessQueuedEventResult {
|
|
success: boolean;
|
|
errorMessage?: string;
|
|
previousState?: string;
|
|
currentState?: string;
|
|
actionsExecuted: Array<{
|
|
actionName: string;
|
|
success: boolean;
|
|
error?: string;
|
|
}>;
|
|
}
|
|
|
|
export class TypeScriptWorkflowRuntime {
|
|
// ... existing code ...
|
|
|
|
/**
|
|
* Enqueues an event for asynchronous processing
|
|
* Returns quickly after persisting the event and publishing to Redis
|
|
*/
|
|
async enqueueEvent(options: EventSubmissionOptions): Promise<{ eventId: string }> {
|
|
const { execution_id, event_name, payload, user_id, tenant } = options;
|
|
|
|
// Generate a unique event ID
|
|
const eventId = `evt-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
|
|
|
|
// Get Redis stream client
|
|
const redisStreamClient = getRedisStreamClient();
|
|
|
|
// Verify execution exists and is valid
|
|
const executionState = this.executionStates.get(execution_id);
|
|
if (!executionState) {
|
|
throw new Error(`Workflow execution "${execution_id}" not found`);
|
|
}
|
|
|
|
// Verify tenant
|
|
if (executionState.tenant !== tenant) {
|
|
throw new Error(`Tenant mismatch for workflow execution "${execution_id}"`);
|
|
}
|
|
|
|
// Create event
|
|
const event: WorkflowEvent = {
|
|
id: eventId,
|
|
name: event_name,
|
|
payload: payload || {},
|
|
user_id,
|
|
timestamp: new Date().toISOString()
|
|
};
|
|
|
|
// Use distributed transaction to persist event and publish to Redis
|
|
await executeDistributedTransaction(knex, `workflow:${execution_id}`, async (trx) => {
|
|
// Persist event to database
|
|
await this.persistEventToDatabase(execution_id, tenant, event, trx);
|
|
|
|
// Create processing record
|
|
await this.createEventProcessingRecord(eventId, tenant, trx);
|
|
|
|
// Publish to Redis stream
|
|
await redisStreamClient.publishEvent({
|
|
id: eventId,
|
|
executionId: execution_id,
|
|
eventName: event_name,
|
|
tenant,
|
|
userId: user_id,
|
|
payload: payload || {},
|
|
timestamp: event.timestamp
|
|
});
|
|
});
|
|
|
|
return { eventId };
|
|
}
|
|
|
|
/**
|
|
* Process a queued event from Redis stream
|
|
* Called by worker processes to handle events asynchronously
|
|
*/
|
|
async processQueuedEvent(
|
|
params: ProcessQueuedEventParams
|
|
): Promise<ProcessQueuedEventResult> {
|
|
const { eventId, executionId, processingId, workerId, tenant } = params;
|
|
|
|
// Acquire distributed lock to ensure only one worker processes this event
|
|
const lockKey = `event:${eventId}:processing`;
|
|
const lockOwner = `worker:${workerId}`;
|
|
|
|
const lockAcquired = await acquireDistributedLock(lockKey, lockOwner, {
|
|
waitTimeMs: 5000,
|
|
ttlMs: 60000
|
|
});
|
|
|
|
if (!lockAcquired) {
|
|
return {
|
|
success: false,
|
|
errorMessage: 'Failed to acquire lock for event processing',
|
|
actionsExecuted: []
|
|
};
|
|
}
|
|
|
|
try {
|
|
// Mark event as processing
|
|
await this.updateEventProcessingStatus(processingId, 'processing', {
|
|
workerId,
|
|
startedAt: new Date().toISOString()
|
|
});
|
|
|
|
// Load execution data and events
|
|
const workflowDef = this.workflowDefinitions.get(executionId);
|
|
if (!workflowDef) {
|
|
throw new Error(`Workflow "${executionId}" not found`);
|
|
}
|
|
|
|
// Load all events from database
|
|
const eventLog = await this.loadEventsFromDatabase(executionId, tenant);
|
|
|
|
// Find the event being processed
|
|
const currentEvent = eventLog.find(e => e.id === eventId);
|
|
if (!currentEvent) {
|
|
throw new Error(`Event ${eventId} not found`);
|
|
}
|
|
|
|
// Derive state by replaying events
|
|
const replayState = this.replayEvents(workflowDef, eventLog.filter(e => e.id !== eventId));
|
|
const previousState = replayState.currentState;
|
|
|
|
// Execute actions
|
|
const actionResults = await this.executeActionsForEvent(
|
|
workflowDef,
|
|
executionId,
|
|
currentEvent,
|
|
tenant
|
|
);
|
|
|
|
// Update workflow state
|
|
const currentState = this.getExecutionState(executionId, tenant).currentState;
|
|
|
|
// Mark event as completed
|
|
await this.updateEventProcessingStatus(processingId, 'completed', {
|
|
workerId,
|
|
completedAt: new Date().toISOString(),
|
|
result: {
|
|
previousState,
|
|
currentState,
|
|
actionsExecuted: actionResults.map(a => a.actionName)
|
|
}
|
|
});
|
|
|
|
return {
|
|
success: true,
|
|
previousState,
|
|
currentState,
|
|
actionsExecuted: actionResults
|
|
};
|
|
} catch (error) {
|
|
// Mark event as failed
|
|
await this.updateEventProcessingStatus(processingId, 'failed', {
|
|
workerId,
|
|
completedAt: new Date().toISOString(),
|
|
error: error instanceof Error ? error.message : String(error)
|
|
});
|
|
|
|
return {
|
|
success: false,
|
|
errorMessage: error instanceof Error ? error.message : String(error),
|
|
actionsExecuted: []
|
|
};
|
|
} finally {
|
|
// Release lock
|
|
await releaseDistributedLock(lockKey, lockOwner);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Helper methods
|
|
*/
|
|
private async persistEventToDatabase(
|
|
executionId: string,
|
|
tenant: string,
|
|
event: WorkflowEvent,
|
|
trx: Knex.Transaction
|
|
): Promise<void> {
|
|
// Implementation to persist event to database
|
|
}
|
|
|
|
private async createEventProcessingRecord(
|
|
eventId: string,
|
|
tenant: string,
|
|
trx: Knex.Transaction
|
|
): Promise<void> {
|
|
// Implementation to create processing record
|
|
}
|
|
|
|
private async updateEventProcessingStatus(
|
|
processingId: string,
|
|
status: 'pending' | 'processing' | 'completed' | 'failed',
|
|
metadata: Record<string, any>
|
|
): Promise<void> {
|
|
// Implementation to update processing status
|
|
}
|
|
|
|
private async loadEventsFromDatabase(
|
|
executionId: string,
|
|
tenant: string
|
|
): Promise<WorkflowEvent[]> {
|
|
// Implementation to load events from database
|
|
}
|
|
|
|
private replayEvents(
|
|
workflowDef: WorkflowDefinition,
|
|
events: WorkflowEvent[]
|
|
): { currentState: string; data: Record<string, any> } {
|
|
// Implementation to replay events and derive state
|
|
}
|
|
|
|
private async executeActionsForEvent(
|
|
workflowDef: WorkflowDefinition,
|
|
executionId: string,
|
|
event: WorkflowEvent,
|
|
tenant: string
|
|
): Promise<Array<{ actionName: string; success: boolean; error?: string }>> {
|
|
// Implementation to execute actions for an event
|
|
}
|
|
}
|
|
```
|
|
|
|
### 2. Implement Event Persistence Model
|
|
|
|
Create or enhance `server/src/lib/workflow/persistence/workflowEventModel.ts`:
|
|
|
|
```typescript
|
|
import { Knex } from 'knex';
|
|
import { WorkflowEvent } from '../core/workflowContext';
|
|
|
|
export class WorkflowEventModel {
|
|
static async persistEvent(
|
|
event: WorkflowEvent,
|
|
executionId: string,
|
|
tenant: string,
|
|
fromState: string,
|
|
trx?: Knex.Transaction
|
|
): Promise<void> {
|
|
const db = trx || knex;
|
|
|
|
await db('workflow_events').insert({
|
|
event_id: event.id,
|
|
execution_id: executionId,
|
|
event_name: event.name,
|
|
from_state: fromState,
|
|
to_state: '', // Will be determined during processing
|
|
payload: JSON.stringify(event.payload),
|
|
user_id: event.user_id,
|
|
tenant,
|
|
timestamp: event.timestamp || new Date().toISOString()
|
|
});
|
|
}
|
|
|
|
static async loadEventLog(
|
|
executionId: string,
|
|
tenant: string
|
|
): Promise<WorkflowEvent[]> {
|
|
const events = await knex('workflow_events')
|
|
.where({
|
|
execution_id: executionId,
|
|
tenant
|
|
})
|
|
.orderBy('timestamp', 'asc');
|
|
|
|
return events.map(e => ({
|
|
id: e.event_id,
|
|
name: e.event_name,
|
|
payload: JSON.parse(e.payload),
|
|
user_id: e.user_id,
|
|
timestamp: e.timestamp,
|
|
fromState: e.from_state,
|
|
toState: e.to_state
|
|
}));
|
|
}
|
|
|
|
static async updateEventStateTransition(
|
|
eventId: string,
|
|
tenant: string,
|
|
toState: string,
|
|
trx?: Knex.Transaction
|
|
): Promise<void> {
|
|
const db = trx || knex;
|
|
|
|
await db('workflow_events')
|
|
.where({
|
|
event_id: eventId,
|
|
tenant
|
|
})
|
|
.update({
|
|
to_state: toState,
|
|
updated_at: new Date().toISOString()
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
### 3. Integrate Worker with Workflow Runtime
|
|
|
|
Enhance the existing `WorkflowWorker` class to properly integrate with the `TypeScriptWorkflowRuntime`:
|
|
|
|
```typescript
|
|
export class WorkflowWorker {
|
|
// ... existing code ...
|
|
|
|
constructor(private workflowRuntime: TypeScriptWorkflowRuntime) {
|
|
// Generate a unique worker ID
|
|
this.workerId = `${os.hostname()}-${process.pid}-${generateUniqueId()}`;
|
|
|
|
// Create Redis Stream client
|
|
this.redisStreamClient = getRedisStreamClient();
|
|
}
|
|
|
|
// ... modify processEvent method ...
|
|
|
|
private async processEvent(
|
|
eventId: string,
|
|
executionId: string,
|
|
processingId: string,
|
|
tenant: string
|
|
): Promise<void> {
|
|
const startTime = Date.now();
|
|
this.eventsProcessed++;
|
|
|
|
try {
|
|
// Create parameters for processing the queued event
|
|
const params: ProcessQueuedEventParams = {
|
|
eventId,
|
|
executionId,
|
|
processingId,
|
|
workerId: this.workerId,
|
|
tenant
|
|
};
|
|
|
|
// Use the workflow runtime to process the event
|
|
const result = await this.workflowRuntime.processQueuedEvent(params);
|
|
|
|
if (!result.success) {
|
|
logger.error(`[WorkflowWorker] Failed to process event ${eventId}: ${result.errorMessage}`, {
|
|
workerId: this.workerId,
|
|
eventId,
|
|
executionId,
|
|
error: result.errorMessage
|
|
});
|
|
this.eventsFailed++;
|
|
} else {
|
|
logger.info(`[WorkflowWorker] Successfully processed event ${eventId}`, {
|
|
workerId: this.workerId,
|
|
eventId,
|
|
executionId,
|
|
previousState: result.previousState,
|
|
currentState: result.currentState,
|
|
actionsExecuted: result.actionsExecuted.map(a => a.actionName)
|
|
});
|
|
|
|
this.eventsSucceeded++;
|
|
}
|
|
|
|
// Record processing time
|
|
const processingTime = Date.now() - startTime;
|
|
this.processingTimes.push(processingTime);
|
|
|
|
} catch (error) {
|
|
// ... existing error handling ...
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### 4. Create Worker Service for Application Startup
|
|
|
|
Create `server/src/services/workflowWorkerService.ts` to initialize and manage worker instances:
|
|
|
|
```typescript
|
|
import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
|
|
import { getActionRegistry } from '@shared/workflow/core/actionRegistry';
|
|
import { startWorkflowWorker, WorkflowWorker } from '../lib/workflow/workers/workflowWorker';
|
|
import logger from '../utils/logger';
|
|
|
|
export class WorkflowWorkerService {
|
|
private workers: WorkflowWorker[] = [];
|
|
private workerCount: number;
|
|
|
|
constructor(workerCount: number = 2) {
|
|
this.workerCount = workerCount;
|
|
}
|
|
|
|
async start(): Promise<void> {
|
|
logger.info(`[WorkflowWorkerService] Starting ${this.workerCount} workflow workers`);
|
|
|
|
try {
|
|
// Initialize action registry and workflow runtime
|
|
const actionRegistry = getActionRegistry();
|
|
const workflowRuntime = getWorkflowRuntime(actionRegistry);
|
|
|
|
// Start worker instances
|
|
for (let i = 0; i < this.workerCount; i++) {
|
|
const worker = await startWorkflowWorker(workflowRuntime);
|
|
this.workers.push(worker);
|
|
logger.info(`[WorkflowWorkerService] Started worker ${i + 1} of ${this.workerCount}`);
|
|
}
|
|
|
|
logger.info('[WorkflowWorkerService] All workflow workers started successfully');
|
|
} catch (error) {
|
|
logger.error('[WorkflowWorkerService] Failed to start workflow workers:', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async stop(): Promise<void> {
|
|
logger.info(`[WorkflowWorkerService] Stopping ${this.workers.length} workflow workers`);
|
|
|
|
// Stop all workers
|
|
await Promise.all(this.workers.map(async (worker) => {
|
|
try {
|
|
await worker.stop();
|
|
} catch (error) {
|
|
logger.error('[WorkflowWorkerService] Error stopping worker:', error);
|
|
}
|
|
}));
|
|
|
|
this.workers = [];
|
|
logger.info('[WorkflowWorkerService] All workflow workers stopped');
|
|
}
|
|
}
|
|
|
|
// Singleton instance
|
|
let workerService: WorkflowWorkerService | null = null;
|
|
|
|
export function getWorkflowWorkerService(workerCount?: number): WorkflowWorkerService {
|
|
if (!workerService) {
|
|
workerService = new WorkflowWorkerService(workerCount);
|
|
}
|
|
return workerService;
|
|
}
|
|
```
|
|
|
|
### 5. Update Configuration
|
|
|
|
In `server/src/config/index.ts`, update configuration for workflow system:
|
|
|
|
```typescript
|
|
export const config = {
|
|
// ... existing config ...
|
|
|
|
workflow: {
|
|
// Number of worker instances to run
|
|
workerCount: parseInt(process.env.WORKFLOW_WORKER_COUNT || '2', 10),
|
|
|
|
// Redis stream configuration
|
|
redis: {
|
|
streamPrefix: process.env.WORKFLOW_REDIS_STREAM_PREFIX || 'workflow:events:',
|
|
consumerGroup: process.env.WORKFLOW_REDIS_CONSUMER_GROUP || 'workflow-workers',
|
|
batchSize: parseInt(process.env.WORKFLOW_REDIS_BATCH_SIZE || '10', 10),
|
|
idleTimeoutMs: parseInt(process.env.WORKFLOW_REDIS_IDLE_TIMEOUT_MS || '60000', 10),
|
|
}
|
|
}
|
|
};
|
|
```
|
|
|
|
### 6. Update API Controllers to Use Enqueue
|
|
|
|
Update `server/src/controllers/workflowController.ts`:
|
|
|
|
```typescript
|
|
import { config } from '../config';
|
|
import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
|
|
import { Request, Response } from 'express';
|
|
|
|
export class WorkflowController {
|
|
async submitEvent(req: Request, res: Response): Promise<void> {
|
|
const { execution_id, event_name, payload, tenant } = req.body;
|
|
const user_id = req.user?.id;
|
|
|
|
try {
|
|
const runtime = getWorkflowRuntime();
|
|
|
|
// Enqueue event for asynchronous processing
|
|
const result = await runtime.enqueueEvent({
|
|
execution_id,
|
|
event_name,
|
|
payload,
|
|
user_id,
|
|
tenant
|
|
});
|
|
|
|
res.status(202).json({
|
|
message: 'Event accepted for processing',
|
|
eventId: result.eventId
|
|
});
|
|
} catch (error) {
|
|
// Handle errors
|
|
res.status(400).json({
|
|
error: error instanceof Error ? error.message : 'Unknown error'
|
|
});
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Testing Strategy
|
|
|
|
1. **Unit Tests for New Components**
|
|
- Test `enqueueEvent` with Redis Stream publishing
|
|
- Test `processQueuedEvent` for event replay and processing
|
|
- Test asynchronous event flow from submission to processing
|
|
|
|
2. **Integration Tests with Worker**
|
|
- Test worker processing of Redis stream messages
|
|
- Test worker coordination with distributed locks
|
|
- Test failure recovery and message reclaiming
|
|
|
|
3. **End-to-End Asynchronous Workflow Tests**
|
|
- Test complete workflows running with asynchronous processing
|
|
- Test system resilience to worker restarts
|
|
- Test workflow resumption after application restart
|
|
|
|
## Implementation Roadmap
|
|
|
|
1. **Phase 1: Core Implementation (2 weeks)**
|
|
- Implement `enqueueEvent` method in `TypeScriptWorkflowRuntime`
|
|
- Implement `processQueuedEvent` method in `TypeScriptWorkflowRuntime`
|
|
- Implement event persistence and loading from database
|
|
|
|
2. **Phase 2: Integration (1 week)**
|
|
- Connect `WorkflowWorker` with `TypeScriptWorkflowRuntime`
|
|
- Create `WorkflowWorkerService` for application startup
|
|
- Update configuration for asynchronous processing
|
|
|
|
3. **Phase 3: Testing and Validation (2 weeks)**
|
|
- Develop unit and integration tests
|
|
- Test with multiple workers
|
|
- Validate worker failover and recovery
|
|
|
|
4. **Phase 4: Deployment and Monitoring (1 week)**
|
|
- Deploy to staging environment
|
|
- Implement monitoring for worker health
|
|
- Document deployment requirements
|
|
|
|
## Conclusion
|
|
|
|
The implementation plan described here will transform our current workflow system into a fully asynchronous execution platform. By focusing on the specific integration points required between existing components, we can leverage much of the code that already exists while filling in the critical gaps.
|
|
|
|
Key success factors for this implementation:
|
|
|
|
1. **Proper event sourcing implementation**: Ensure that workflow state is derived by replaying events
|
|
2. **Robust error handling**: Implement thorough error handling and recovery mechanisms
|
|
3. **Idempotent processing**: Guarantee that event processing is idempotent for reliability
|
|
4. **Comprehensive testing**: Thoroughly test worker coordination and failure scenarios
|
|
|
|
With these enhancements, our workflow system will gain the resilience, scalability, and fault tolerance necessary for handling mission-critical business processes in an asynchronous environment.
|