# Asynchronous Workflow Implementation Update ## Overview This document provides an update to our asynchronous workflow implementation plan based on an analysis of the current codebase. While many of the foundational components exist, there are critical integration gaps that need to be addressed to achieve a fully asynchronous workflow system. ## Current Implementation Status ### Components Already Implemented 1. **Redis Stream Client (`redisStreamClient.ts`)** - Full implementation for publishing/consuming events via Redis Streams - Support for consumer groups and pending message claiming - Message acknowledgment and error handling 2. **Distributed Lock Mechanism (`distributedLock.ts`)** - Complete Redis-based distributed lock implementation - Functions for acquiring, releasing, and extending locks - Helper functions for executing code with locks 3. **Distributed Transaction Support (`distributedTransaction.ts`)** - Integration of database transactions with distributed locks - Error handling and retry mechanisms 4. **Worker Framework (`workflowWorker.ts`)** - Robust implementation with health monitoring and metrics - Support for concurrent event processing and pending message claiming - Graceful shutdown handling ### Critical Integration Gaps 1. **Missing Workflow Runtime Integration** - The `TypeScriptWorkflowRuntime` class in `workflowRuntime.ts` still uses in-memory state tracking - No `enqueueEvent` method for "fire and forget" event submission - No `processQueuedEvent` method referenced by the worker - No proper event sourcing pattern for state derivation 2. **Incomplete Database Integration** - The event processing record handling only partially implemented - No transaction management for ensuring atomic operations 3. **Missing Runtime-Worker Connection** - The worker is implemented but not properly connected to the workflow runtime - No service startup/initialization for workers ## Implementation Steps ### 1. Enhance TypeScriptWorkflowRuntime Update `server/src/lib/workflow/core/workflowRuntime.ts` to add support for asynchronous processing: ```typescript interface ProcessQueuedEventParams { eventId: string; executionId: string; processingId: string; workerId: string; tenant: string; } interface ProcessQueuedEventResult { success: boolean; errorMessage?: string; previousState?: string; currentState?: string; actionsExecuted: Array<{ actionName: string; success: boolean; error?: string; }>; } export class TypeScriptWorkflowRuntime { // ... existing code ... /** * Enqueues an event for asynchronous processing * Returns quickly after persisting the event and publishing to Redis */ async enqueueEvent(options: EventSubmissionOptions): Promise<{ eventId: string }> { const { execution_id, event_name, payload, user_id, tenant } = options; // Generate a unique event ID const eventId = `evt-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`; // Get Redis stream client const redisStreamClient = getRedisStreamClient(); // Verify execution exists and is valid const executionState = this.executionStates.get(execution_id); if (!executionState) { throw new Error(`Workflow execution "${execution_id}" not found`); } // Verify tenant if (executionState.tenant !== tenant) { throw new Error(`Tenant mismatch for workflow execution "${execution_id}"`); } // Create event const event: WorkflowEvent = { id: eventId, name: event_name, payload: payload || {}, user_id, timestamp: new Date().toISOString() }; // Use distributed transaction to persist event and publish to Redis await executeDistributedTransaction(knex, `workflow:${execution_id}`, async (trx) => { // Persist event to database await this.persistEventToDatabase(execution_id, tenant, event, trx); // Create processing record await this.createEventProcessingRecord(eventId, tenant, trx); // Publish to Redis stream await redisStreamClient.publishEvent({ id: eventId, executionId: execution_id, eventName: event_name, tenant, userId: user_id, payload: payload || {}, timestamp: event.timestamp }); }); return { eventId }; } /** * Process a queued event from Redis stream * Called by worker processes to handle events asynchronously */ async processQueuedEvent( params: ProcessQueuedEventParams ): Promise { const { eventId, executionId, processingId, workerId, tenant } = params; // Acquire distributed lock to ensure only one worker processes this event const lockKey = `event:${eventId}:processing`; const lockOwner = `worker:${workerId}`; const lockAcquired = await acquireDistributedLock(lockKey, lockOwner, { waitTimeMs: 5000, ttlMs: 60000 }); if (!lockAcquired) { return { success: false, errorMessage: 'Failed to acquire lock for event processing', actionsExecuted: [] }; } try { // Mark event as processing await this.updateEventProcessingStatus(processingId, 'processing', { workerId, startedAt: new Date().toISOString() }); // Load execution data and events const workflowDef = this.workflowDefinitions.get(executionId); if (!workflowDef) { throw new Error(`Workflow "${executionId}" not found`); } // Load all events from database const eventLog = await this.loadEventsFromDatabase(executionId, tenant); // Find the event being processed const currentEvent = eventLog.find(e => e.id === eventId); if (!currentEvent) { throw new Error(`Event ${eventId} not found`); } // Derive state by replaying events const replayState = this.replayEvents(workflowDef, eventLog.filter(e => e.id !== eventId)); const previousState = replayState.currentState; // Execute actions const actionResults = await this.executeActionsForEvent( workflowDef, executionId, currentEvent, tenant ); // Update workflow state const currentState = this.getExecutionState(executionId, tenant).currentState; // Mark event as completed await this.updateEventProcessingStatus(processingId, 'completed', { workerId, completedAt: new Date().toISOString(), result: { previousState, currentState, actionsExecuted: actionResults.map(a => a.actionName) } }); return { success: true, previousState, currentState, actionsExecuted: actionResults }; } catch (error) { // Mark event as failed await this.updateEventProcessingStatus(processingId, 'failed', { workerId, completedAt: new Date().toISOString(), error: error instanceof Error ? error.message : String(error) }); return { success: false, errorMessage: error instanceof Error ? error.message : String(error), actionsExecuted: [] }; } finally { // Release lock await releaseDistributedLock(lockKey, lockOwner); } } /** * Helper methods */ private async persistEventToDatabase( executionId: string, tenant: string, event: WorkflowEvent, trx: Knex.Transaction ): Promise { // Implementation to persist event to database } private async createEventProcessingRecord( eventId: string, tenant: string, trx: Knex.Transaction ): Promise { // Implementation to create processing record } private async updateEventProcessingStatus( processingId: string, status: 'pending' | 'processing' | 'completed' | 'failed', metadata: Record ): Promise { // Implementation to update processing status } private async loadEventsFromDatabase( executionId: string, tenant: string ): Promise { // Implementation to load events from database } private replayEvents( workflowDef: WorkflowDefinition, events: WorkflowEvent[] ): { currentState: string; data: Record } { // Implementation to replay events and derive state } private async executeActionsForEvent( workflowDef: WorkflowDefinition, executionId: string, event: WorkflowEvent, tenant: string ): Promise> { // Implementation to execute actions for an event } } ``` ### 2. Implement Event Persistence Model Create or enhance `server/src/lib/workflow/persistence/workflowEventModel.ts`: ```typescript import { Knex } from 'knex'; import { WorkflowEvent } from '../core/workflowContext'; export class WorkflowEventModel { static async persistEvent( event: WorkflowEvent, executionId: string, tenant: string, fromState: string, trx?: Knex.Transaction ): Promise { const db = trx || knex; await db('workflow_events').insert({ event_id: event.id, execution_id: executionId, event_name: event.name, from_state: fromState, to_state: '', // Will be determined during processing payload: JSON.stringify(event.payload), user_id: event.user_id, tenant, timestamp: event.timestamp || new Date().toISOString() }); } static async loadEventLog( executionId: string, tenant: string ): Promise { const events = await knex('workflow_events') .where({ execution_id: executionId, tenant }) .orderBy('timestamp', 'asc'); return events.map(e => ({ id: e.event_id, name: e.event_name, payload: JSON.parse(e.payload), user_id: e.user_id, timestamp: e.timestamp, fromState: e.from_state, toState: e.to_state })); } static async updateEventStateTransition( eventId: string, tenant: string, toState: string, trx?: Knex.Transaction ): Promise { const db = trx || knex; await db('workflow_events') .where({ event_id: eventId, tenant }) .update({ to_state: toState, updated_at: new Date().toISOString() }); } } ``` ### 3. Integrate Worker with Workflow Runtime Enhance the existing `WorkflowWorker` class to properly integrate with the `TypeScriptWorkflowRuntime`: ```typescript export class WorkflowWorker { // ... existing code ... constructor(private workflowRuntime: TypeScriptWorkflowRuntime) { // Generate a unique worker ID this.workerId = `${os.hostname()}-${process.pid}-${generateUniqueId()}`; // Create Redis Stream client this.redisStreamClient = getRedisStreamClient(); } // ... modify processEvent method ... private async processEvent( eventId: string, executionId: string, processingId: string, tenant: string ): Promise { const startTime = Date.now(); this.eventsProcessed++; try { // Create parameters for processing the queued event const params: ProcessQueuedEventParams = { eventId, executionId, processingId, workerId: this.workerId, tenant }; // Use the workflow runtime to process the event const result = await this.workflowRuntime.processQueuedEvent(params); if (!result.success) { logger.error(`[WorkflowWorker] Failed to process event ${eventId}: ${result.errorMessage}`, { workerId: this.workerId, eventId, executionId, error: result.errorMessage }); this.eventsFailed++; } else { logger.info(`[WorkflowWorker] Successfully processed event ${eventId}`, { workerId: this.workerId, eventId, executionId, previousState: result.previousState, currentState: result.currentState, actionsExecuted: result.actionsExecuted.map(a => a.actionName) }); this.eventsSucceeded++; } // Record processing time const processingTime = Date.now() - startTime; this.processingTimes.push(processingTime); } catch (error) { // ... existing error handling ... } } } ``` ### 4. Create Worker Service for Application Startup Create `server/src/services/workflowWorkerService.ts` to initialize and manage worker instances: ```typescript import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime'; import { getActionRegistry } from '@shared/workflow/core/actionRegistry'; import { startWorkflowWorker, WorkflowWorker } from '../lib/workflow/workers/workflowWorker'; import logger from '../utils/logger'; export class WorkflowWorkerService { private workers: WorkflowWorker[] = []; private workerCount: number; constructor(workerCount: number = 2) { this.workerCount = workerCount; } async start(): Promise { logger.info(`[WorkflowWorkerService] Starting ${this.workerCount} workflow workers`); try { // Initialize action registry and workflow runtime const actionRegistry = getActionRegistry(); const workflowRuntime = getWorkflowRuntime(actionRegistry); // Start worker instances for (let i = 0; i < this.workerCount; i++) { const worker = await startWorkflowWorker(workflowRuntime); this.workers.push(worker); logger.info(`[WorkflowWorkerService] Started worker ${i + 1} of ${this.workerCount}`); } logger.info('[WorkflowWorkerService] All workflow workers started successfully'); } catch (error) { logger.error('[WorkflowWorkerService] Failed to start workflow workers:', error); throw error; } } async stop(): Promise { logger.info(`[WorkflowWorkerService] Stopping ${this.workers.length} workflow workers`); // Stop all workers await Promise.all(this.workers.map(async (worker) => { try { await worker.stop(); } catch (error) { logger.error('[WorkflowWorkerService] Error stopping worker:', error); } })); this.workers = []; logger.info('[WorkflowWorkerService] All workflow workers stopped'); } } // Singleton instance let workerService: WorkflowWorkerService | null = null; export function getWorkflowWorkerService(workerCount?: number): WorkflowWorkerService { if (!workerService) { workerService = new WorkflowWorkerService(workerCount); } return workerService; } ``` ### 5. Update Configuration In `server/src/config/index.ts`, update configuration for workflow system: ```typescript export const config = { // ... existing config ... workflow: { // Number of worker instances to run workerCount: parseInt(process.env.WORKFLOW_WORKER_COUNT || '2', 10), // Redis stream configuration redis: { streamPrefix: process.env.WORKFLOW_REDIS_STREAM_PREFIX || 'workflow:events:', consumerGroup: process.env.WORKFLOW_REDIS_CONSUMER_GROUP || 'workflow-workers', batchSize: parseInt(process.env.WORKFLOW_REDIS_BATCH_SIZE || '10', 10), idleTimeoutMs: parseInt(process.env.WORKFLOW_REDIS_IDLE_TIMEOUT_MS || '60000', 10), } } }; ``` ### 6. Update API Controllers to Use Enqueue Update `server/src/controllers/workflowController.ts`: ```typescript import { config } from '../config'; import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime'; import { Request, Response } from 'express'; export class WorkflowController { async submitEvent(req: Request, res: Response): Promise { const { execution_id, event_name, payload, tenant } = req.body; const user_id = req.user?.id; try { const runtime = getWorkflowRuntime(); // Enqueue event for asynchronous processing const result = await runtime.enqueueEvent({ execution_id, event_name, payload, user_id, tenant }); res.status(202).json({ message: 'Event accepted for processing', eventId: result.eventId }); } catch (error) { // Handle errors res.status(400).json({ error: error instanceof Error ? error.message : 'Unknown error' }); } } } ``` ## Testing Strategy 1. **Unit Tests for New Components** - Test `enqueueEvent` with Redis Stream publishing - Test `processQueuedEvent` for event replay and processing - Test asynchronous event flow from submission to processing 2. **Integration Tests with Worker** - Test worker processing of Redis stream messages - Test worker coordination with distributed locks - Test failure recovery and message reclaiming 3. **End-to-End Asynchronous Workflow Tests** - Test complete workflows running with asynchronous processing - Test system resilience to worker restarts - Test workflow resumption after application restart ## Implementation Roadmap 1. **Phase 1: Core Implementation (2 weeks)** - Implement `enqueueEvent` method in `TypeScriptWorkflowRuntime` - Implement `processQueuedEvent` method in `TypeScriptWorkflowRuntime` - Implement event persistence and loading from database 2. **Phase 2: Integration (1 week)** - Connect `WorkflowWorker` with `TypeScriptWorkflowRuntime` - Create `WorkflowWorkerService` for application startup - Update configuration for asynchronous processing 3. **Phase 3: Testing and Validation (2 weeks)** - Develop unit and integration tests - Test with multiple workers - Validate worker failover and recovery 4. **Phase 4: Deployment and Monitoring (1 week)** - Deploy to staging environment - Implement monitoring for worker health - Document deployment requirements ## Conclusion The implementation plan described here will transform our current workflow system into a fully asynchronous execution platform. By focusing on the specific integration points required between existing components, we can leverage much of the code that already exists while filling in the critical gaps. Key success factors for this implementation: 1. **Proper event sourcing implementation**: Ensure that workflow state is derived by replaying events 2. **Robust error handling**: Implement thorough error handling and recovery mechanisms 3. **Idempotent processing**: Guarantee that event processing is idempotent for reliability 4. **Comprehensive testing**: Thoroughly test worker coordination and failure scenarios With these enhancements, our workflow system will gain the resilience, scalability, and fault tolerance necessary for handling mission-critical business processes in an asynchronous environment.