PSA/docs/workflow/workflow-system.md
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

43 KiB

Workflow System

1. Introduction and Philosophy

Important Note on Workflow Execution: Workflows are executed in response to events. When a workflow is triggered, the event that triggered it is passed as input to the workflow. The workflow does not wait for the initial event - the fact that the workflow is executing means the event has already occurred. This is reflected in the workflow examples where each workflow receives its triggering event via context.input.triggerEvent.

Purpose and Vision

The workflow system is a robust, flexible engine designed to model, execute, and manage complex business processes. It provides a structured way to define business logic, execute actions, and respond to events while maintaining a complete audit trail of all activities. The system is built on modern architectural principles to ensure reliability, scalability, and maintainability.

The system uses a TypeScript-based approach to workflow definition, allowing developers to create workflows using familiar programming constructs while leveraging the type safety and tooling of TypeScript.

Core Principles

The workflow system is built on several key principles:

  1. Event Sourcing: The system uses event sourcing as its foundational pattern, where the state of a workflow is derived by replaying a sequence of events rather than storing the current state directly. This provides a complete audit trail and enables powerful time-travel debugging capabilities.

  2. Domain-Driven Design: The workflow engine is designed to be a flexible tool that can model complex domain processes while keeping domain logic separate from the workflow infrastructure.

  3. Programmatic Workflows: Workflows are defined as TypeScript functions, allowing developers to use familiar programming constructs while maintaining the benefits of a structured workflow system.

  4. Idempotent Execution: Actions are executed idempotently, ensuring that even if the same event is processed multiple times (e.g., due to retries), the outcome remains consistent.

  5. Parallel Execution: The system supports executing actions in parallel based on explicit dependencies, enabling efficient processing of complex workflows.

  6. Asynchronous Event Processing: The system processes all events asynchronously through a message queue, providing improved scalability and fault tolerance.

Key Benefits

  • Auditability: Complete history of all events and workflow execution
  • Flexibility: Customizable workflows defined in TypeScript
  • Scalability: Distributed processing with Redis Streams
  • Reliability: Idempotent processing and robust error handling
  • Visibility: Comprehensive monitoring and observability
  • Maintainability: Clean separation of concerns and modular architecture
  • Developer Experience: Familiar TypeScript syntax with IDE support, type checking, and refactoring tools

2. Architecture Overview

System Components

The workflow system consists of the following major components:

  1. TypeScript Workflow Runtime: Executes workflows defined as TypeScript functions, managing their state and event handling.

  2. Action Executor: Executes actions in parallel based on their dependencies, with support for various error handling strategies.

  3. Action Registry: Manages the registration and execution of actions, ensuring idempotent execution.

  4. Persistence Layer: Stores workflow executions, events, action results, and other workflow-related data.

  5. Workflow Context: Provides the execution context for workflows, including access to actions, data, events, and logging.

  6. Redis Streams Integration: Enables asynchronous event distribution. Currently, all workflow-related events are published to a single global Redis stream named workflow:events:global. (See Section 5 for more details on event stream usage).

  7. Worker Service: Processes events asynchronously from Redis Streams.

    • Current Architecture: The WorkflowWorker currently operates as a singleton instance. This means all events from the global Redis stream are processed by this single worker. This is important for understanding how features like context.events.waitFor function reliably with in-memory listeners.
    • Future Scalability: Future plans for scaling may involve distributing workers, potentially by sharding streams by tenant ID (e.g., workflow:events:tenant_hash_X). This approach would aim to maintain worker affinity for workflow executions within a tenant, preserving the effectiveness of in-memory event listeners. If a different distribution model is adopted where affinity is not guaranteed, the in-memory listener mechanism for context.events.waitFor would require re-evaluation.
  8. Distributed Coordination: Ensures reliable processing in a distributed environment.

Data Flow

  1. Event Submission:

    • A client publishes an event to the event bus (e.g., "INVOICE_CREATED")
    • The event is persisted to the database
    • The system identifies workflows attached to this event type
    • For each attached workflow, a new execution is created
    • In a distributed setup, the event is also published to Redis Streams
  2. Event Processing:

    • The workflow runtime processes the event within the context of the workflow function
    • The workflow function determines how to handle the event based on its current state
    • The workflow function may execute actions, update data, or change state
  3. Action Execution:

    • The action executor builds a dependency graph of actions
    • It executes actions in parallel based on their dependencies
    • Results are stored in the database
    • Any errors are handled according to the defined strategy
  4. State Update:

    • The workflow function updates its state based on the event and action results
    • The complete event and its processing results are available for querying

System Workflows and Tenant-Specific Triggering

System workflows represent shared, reusable workflow definitions or templates that are available to all tenants within the platform. Examples might include standard processes like a generic invoiceLifecycleWorkflow or other common business operations.

A key aspect to understand is how these system workflows are invoked. Contrary to a potential misunderstanding that they might be triggered by global, non-tenant-specific "system events" (e.g., via a hypothetical system_workflow_event_attachments table for such global triggers), system workflows such as the invoice lifecycle example are typically triggered in the context of a specific tenant.

The triggering mechanism relies on tenant-specific event attachments. When an event relevant to a system workflow (e.g., INVOICE_UPDATED for an invoice sync workflow) occurs for a particular tenant, the system consults the tenant-specific workflow_event_attachments table. An entry in this table links the tenant_id, the event_type (e.g., INVOICE_UPDATED), and the workflow_id. For a system workflow, this workflow_id corresponds to its registration_id, effectively associating the tenant-specific event with the shared system workflow definition.

This ensures that while the workflow definition is shared, its execution is always tied to a specific tenant and triggered by events occurring within that tenant's scope. If a table named system_workflow_event_attachments exists, its purpose would be distinct from this tenant-specific triggering mechanism. For tenant-aware workflows, the attachment and subsequent execution remain tenant-specific.

Persistence Model

The workflow system uses several database tables to store its data:

  1. workflow_executions: Stores metadata about workflow instances

  2. workflow_events: Stores the event log for each workflow execution

  3. workflow_action_results: Tracks the results of action executions

  4. workflow_timers: Manages workflow timers

  5. workflow_action_dependencies: Stores dependencies between actions

  6. workflow_sync_points: Manages synchronization points for parallel execution

  7. workflow_event_processing: Tracks the detailed lifecycle and processing status of individual events intended for asynchronous handling. When an event is enqueued (e.g., via runtime.enqueueEvent()), a record is created here. Its status transitions typically from pending -> published (to Redis) -> processing (by a worker) -> completed or failed. This table is crucial for ensuring event processing reliability, enabling retries, and providing visibility into the asynchronous event pipeline, especially in distributed environments. It links to workflow_events via event_id.

  8. system_workflow_task_definitions: Stores definitions for system-wide, reusable task types (e.g., common error handling tasks). These definitions are not tenant-specific and are identified by their task_type string. They link to form definitions (often system forms).

  9. workflow_task_definitions: Stores definitions for task types that are specific to a tenant. These are identified by a UUID (task_definition_id) and are scoped to a tenant.

  10. workflow_tasks: Stores instances of human tasks. It links to its definition using:

    • task_definition_type ('system' or 'tenant'): Indicates whether the task uses a system or tenant-specific definition.
    • tenant_task_definition_id (UUID): Foreign key to workflow_task_definitions.task_definition_id if type is 'tenant'.
    • system_task_definition_task_type (TEXT): Foreign key to system_workflow_task_definitions.task_type if type is 'system'.
  11. system_workflow_form_definitions: Stores definitions for system-wide, reusable UI forms. These are referenced by task definitions. Key fields include name (unique identifier for the form), json_schema (defines the data structure and properties of the form), and ui_schema (provides hints for rendering the form).

  12. workflow_form_definitions: Stores tenant-specific UI form definitions, mirroring the structure of system_workflow_form_definitions.

  13. workflow_data_store: A cross-run key-value store scoped by (tenant, namespace, key). Values are stored as JSON, support an optional TTL (lazily expired on read and periodically swept), and carry a revision counter for optimistic concurrency. Accessed via store.* workflow actions (store.get, store.set, store.delete, store.increment, store.list, store.list_namespaces).

  14. workflow_entity_links: Stores typed bidirectional links between two entities (identified by entity type + ID pairs). Supports forward, reverse, and bidirectional traversal. Accessed via links.* workflow actions (links.upsert, links.lookup, links.delete, links.list, links.list_namespaces).

Human Task Forms and Dynamic Data Display

Human tasks often require displaying dynamic information to the user based on the workflow's current context and the specifics of the task (e.g., error details, entity identifiers). The workflow system facilitates this through the interaction of form definitions (system_workflow_form_definitions or workflow_form_definitions) and the contextData provided when a human task is created via actions.createHumanTask.

There are two primary ways dynamic data is presented in forms:

  1. Direct Field Population:

    • The form's json_schema defines distinct properties corresponding to individual pieces of data (e.g., resolutionNotes, retryCheckbox).
    • The workflow provides values for these properties directly within the contextData object when creating the task. For example:
      // In the workflow:
      await actions.createHumanTask({
        taskType: 'some_review_task',
        // ... other task parameters
        contextData: {
          resolutionNotes: "Initial assessment complete.",
          isUrgent: true
        }
      });
      
    • The UI rendering layer then uses these values to populate the corresponding input fields (e.g., a textarea for resolutionNotes, a checkbox for isUrgent).
  2. Template String Substitution:

    • This method is particularly useful for displaying formatted, read-only information composed of multiple data points, or for providing descriptive text that includes dynamic values.
    • In the form's json_schema (stored in system_workflow_form_definitions), a property (often read-only) can have its default value set to a template string. This template string includes placeholders that reference keys within the contextData object, using the syntax ${contextData.keyName}.
    • Example json_schema property for a form definition:
      {
        // ... other properties ...
        "errorReport": {
          "type": "string",
          "title": "Error Details",
          "readOnly": true,
          "default": "Workflow Instance ID: ${contextData.workflowInstanceId}\nError Code: ${contextData.errorCode}\nMessage: ${contextData.errorMessageText}\nEntity: ${contextData.entityType} (ID: ${contextData.entityId})"
        }
        // ...
      }
      
    • The workflow, when creating the human task, populates the contextData with the individual data elements whose keys match the placeholders in the template.
      // In the workflow, when an error occurs:
      await actions.createHumanTask({
        taskType: 'accounting_export_error', // This task type would link to the form definition above
        title: 'QuickBooks Sync Error',
        // ... other task parameters
        contextData: {
          workflowInstanceId: executionId,
          errorCode: 'QBO-123',
          errorMessageText: 'Customer not found in QBO.',
          entityType: 'Customer',
          entityId: algaCompanyId,
          // ... any other data needed by the template or for other purposes
        }
      });
      
    • The UI rendering layer is then responsible for:
      1. Retrieving the form definition (including the json_schema with the template string in the default value of the errorReport property).
      2. Accessing the contextData associated with the specific task instance.
      3. Performing string substitution on the template string, replacing placeholders like ${contextData.errorCode} with their corresponding values from the contextData (e.g., "QBO-123").
      4. Displaying the resulting formatted string to the user (e.g., in a read-only textarea as specified by the ui_schema).
    • This pattern allows for flexible and descriptive presentation of dynamic information without requiring a separate form field for every individual piece of data if they are only for display. The accounting-mapping-error-form example's productDetails field demonstrates this approach.

Key Considerations for Template Substitution:

  • The workflow must ensure that all keys referenced in the form's template string are present in the contextData it provides when creating the task.
  • The UI component responsible for rendering the form must implement the logic to perform this substitution.
  • This method is best suited for read-only display of information. If user input is required for these individual pieces of data, direct field population (Method 1) is more appropriate.

Enhanced Templating for Dynamic Content

To provide more flexibility while maintaining security for user-influenced template strings (e.g., in form default values or dynamic UI text), the templating mechanism described above is being enhanced. This enhancement will utilize the existing Parsimmon dependency to parse and evaluate a controlled, limited set of JavaScript-like expressions within the ${...} syntax.

Key features of the enhanced templating:

  • Parser: Implemented using Parsimmon.
  • Supported Expressions (Initial Scope):
    • Variable access: variableName or contextData.variableName
    • String literals: 'some string'
    • Logical OR: expression1 || expression2
    • Date formatting: new Date(variableOrString).toLocaleDateString() and new Date(variableOrString).toLocaleString()
  • Security: The custom parser and evaluator are designed to be secure by only allowing the explicitly defined expressions and operations, preventing arbitrary code execution.
  • Integration: This will affect how default values in JSON schemas and other templated strings are processed by server/src/utils/templateUtils.ts, impacting components like DynamicForm.tsx and ActivityDetailViewerDrawer.tsx.

For a detailed technical design of this Parsimmon-based templating engine, please refer to "docs/technical/parsimmon_templating_engine.md".

By understanding these data flow patterns, developers can effectively design workflows that create informative and actionable human tasks.

3. TypeScript-Based Workflows

The workflow system uses TypeScript functions for defining workflows, providing a programmatic approach with full access to TypeScript's features.

Basic Structure

async function workflow(context: WorkflowContext): Promise<void> {
  const { actions, data, events, logger } = context;
  
  // Initial state
  context.setState('initial');
  
  // Wait for events
  const startEvent = await events.waitFor('Start');
  
  // Execute actions, including the new composite action
  // await actions.doSomething({ param1: 'value1' });
  // Example of the new action:
  // const taskOutcome = await actions.createTaskAndWaitForResult({
  //   taskType: 'user_review',
  //   title: 'Review Item',
  //   contextData: { itemId: '123' }
  // });
  // if (taskOutcome.success) {
  //   logger.info(`Task resolved with: ${JSON.stringify(taskOutcome.resolutionData)}`);
  // }
  
  // Update state
  context.setState('completed');
}

Workflow Context

The WorkflowContext provides access to:

  • actions: Proxy object for executing registered actions. This now includes a powerful composite action createTaskAndWaitForResult which simplifies creating a human task and pausing the workflow until it's resolved.
  • data: Data manager for storing and retrieving workflow data
  • events: Event manager for waiting for and emitting events.
    • events.waitFor(): Pauses workflow execution until a specified external event (or one of several specified events) occurs. It relies on in-memory listeners within the TypeScriptWorkflowRuntime instance processing the workflow. Given the current singleton WorkflowWorker architecture (see Section 2), these in-memory listeners are effective as the same worker instance that initiates the wait will process the resolving event. This mechanism is crucial for the actions.createTaskAndWaitForResult composite action.
    • events.emit(): Asynchronously enqueues an event, which will be persisted and published to the global Redis stream for processing by the worker.
  • logger: Logger for workflow execution
  • setState/getCurrentState: Methods for managing workflow state

Control Flow

TypeScript-based workflows use native language constructs for control flow:

// Conditional logic
if (data.get<InvoiceData>('invoice').amount > 1000) {
  await actions.requireAdditionalApproval();
}

// Switch statements
switch (event.payload.status) {
  case 'approved':
    await actions.processApproval();
    break;
  case 'rejected':
    await actions.processRejection();
    break;
  default:
    throw new Error('Invalid status');
}

// Loops
for (const item of data.get<Order>('order').items) {
  await actions.processItem(item);
}

Parallel Execution

// Execute actions in parallel
await Promise.all([
  actions.validateInvoice(),
  actions.checkBudget(),
  actions.verifyApprover()
]);

Error Handling

try {
  await actions.riskyOperation();
} catch (error) {
  logger.error('Operation failed', error);
  await actions.handleFailure();
}
// For errors that are recoverable with human intervention, a more interactive pattern involves creating a human task and then pausing the workflow until the task is resolved, allowing the operation to be retried. The `actions.createTaskAndWaitForResult` method greatly simplifies this. See the "Interactive Error Resolution and Retries with Human Tasks" section for a detailed example.

Interactive Error Resolution and Retries with Human Tasks

A common and powerful pattern for handling operations that might fail due to issues requiring manual intervention (e.g., missing data, incorrect configuration, external system errors) is to:

  1. Attempt the operation.
  2. If it fails, use actions.createTaskAndWaitForResult to create a human task and automatically pause the workflow. This action encapsulates creating the task, determining the correct event to wait for (typically TaskEventNames.taskCompleted(taskId)), and then calling context.events.waitFor().
  3. The workflow resumes when createTaskAndWaitForResult returns, providing the outcome of the human task (e.g., data submitted by the user, or a timeout indication).
  4. Based on the task's outcome, the workflow can retry the operation or take other actions.

This allows the workflow to remain in context and continue from where it left off, rather than terminating and requiring a full restart.

Conceptual Flow (Simplified with createTaskAndWaitForResult):

loop until operation_succeeds or max_retries_reached:
    try:
        perform_operation()
        operation_succeeds = true
    catch error_requiring_human_intervention:
        task_outcome = await actions.createTaskAndWaitForResult(
            taskType: 'resolve_error_X',
            title: 'Resolve Error for Operation Y',
            contextData: { error_details, relevant_ids },
            waitForEventTimeoutMilliseconds: 3_600_000 // Optional: 1 hour timeout
        )
        if task_outcome.success and task_outcome.resolutionData?.userFixedTheProblem:
            // Loop continues, operation will be retried
            logger.info("User indicated problem resolved. Retrying operation.")
        else:
            logger.warn("Task did not result in a successful resolution. Aborting retry.")
            // Handle non-resolution (e.g., log, escalate, break loop)
            break

Example: Processing an item that requires a valid mapping (using createTaskAndWaitForResult)

async function processItemWithMappingWorkflow(context: WorkflowContext): Promise<void> {
  const { actions, data, logger, setState, executionId } = context; // `events` is not directly needed here
  const itemId = data.get('itemIdToProcess');
  let itemProcessed = false;
  let attempts = 0;
  const MAX_ATTEMPTS = 3; // Define a retry limit

  setState('VALIDATING_ITEM_MAPPING');

  while (!itemProcessed && attempts < MAX_ATTEMPTS) {
    attempts++;
    logger.info(`Attempt ${attempts} to process item ${itemId}.`);

    try {
      // Action that might fail if mapping is missing or invalid
      const mappingDetails = await actions.getMappingForItem({ itemId }); 
      if (!mappingDetails || !mappingDetails.isValid) {
        throw new Error(`Mapping for item ${itemId} is missing or invalid.`);
      }

      // Proceed with processing using mappingDetails
      await actions.processItemUsingMapping({ itemId, mappingDetails });
      itemProcessed = true;
      logger.info(`Item ${itemId} processed successfully.`);
      setState('ITEM_PROCESSING_COMPLETE');

    } catch (error: any) {
      logger.error(`Error processing item ${itemId} on attempt ${attempts}: ${error.message}`);
      setState(`AWAITING_MAPPING_RESOLUTION_FOR_ITEM_${itemId}_ATTEMPT_${attempts}`);

      // Use the new composite action to create a task and wait for its resolution
      const taskResolution = await actions.createTaskAndWaitForResult({
        taskType: 'resolve_item_mapping_error', // Links to a pre-defined task and form
        title: `Resolve Mapping for Item ${itemId} (Attempt ${attempts})`,
        description: `Error: ${error.message}. Please review and correct the mapping for item ID: ${itemId}.`,
        contextData: { // Data for the form and task context
          itemId: itemId,
          errorMessage: error.message,
          currentAttempt: attempts,
          maxAttempts: MAX_ATTEMPTS,
          workflowInstanceId: executionId
        },
        // Optional: Timeout for waiting for the task resolution event
        // waitForEventTimeoutMilliseconds: 60 * 60 * 1000 // 1 hour
      });

      if (taskResolution.success) {
        // Task was completed, event was received.
        // The payload of the event (taskResolution.resolutionData) might contain user input.
        logger.info(`Task ${taskResolution.taskId} for item ${itemId} resolved. Resolution data: ${JSON.stringify(taskResolution.resolutionData)}. Retrying processing.`);
        // The loop will continue, and getMappingForItem will be re-attempted.
      } else {
        // Task creation might have failed, or waitFor event might have timed out or errored.
        logger.error(`Failed to get resolution for task for item ${itemId}. Error: ${taskResolution.error}. Details: ${JSON.stringify(taskResolution.details)}. Halting retries for this item.`);
        setState('CRITICAL_ERROR_TASK_RESOLUTION_FAILED');
        // Depending on policy, you might throw an error here or return
        return; 
      }
    }
  } // End while loop

  if (!itemProcessed) {
    logger.error(`Failed to process item ${itemId} after ${MAX_ATTEMPTS} attempts.`);
    setState('ITEM_PROCESSING_FAILED_MAX_ATTEMPTS');
    // Optionally, create a final escalation task or take other compensatory actions.
    // Can still use createHumanTask directly if no waiting is needed, or createTaskAndWaitForResult if it is.
    await actions.createHumanTask({ 
        taskType: 'item_processing_escalation',
        title: `Escalation: Item ${itemId} processing failed after max attempts.`,
        contextData: { itemId, attempts, workflowInstanceId: executionId }
    });
  }
}

Key Considerations for actions.createTaskAndWaitForResult:

  • Simplicity: This action significantly simplifies the workflow code by encapsulating the createHumanTask call, event name construction (it uses TaskEventNames.taskCompleted(taskId) internally), and the events.waitFor() call.
  • Return Value: The action returns an object (CreateTaskAndWaitForResultReturn) indicating success (boolean), the taskId (string | null), resolutionData (any, the payload of the task completion event), and an optional error message or details object.
  • Task Resolution Event: The system relies on an event named according to TaskEventNames.taskCompleted(taskId) being emitted when the human task is resolved. This is a standard convention.
  • Retry Limits: Always include a mechanism (like a maximum attempt counter) to prevent infinite loops.
  • Idempotency: Ensure retried operations are idempotent.
  • setState for Observability: Continue to use context.setState() for visibility.
  • Timeout: The waitForEventTimeoutMilliseconds parameter in createTaskAndWaitForResultParams allows specifying a timeout for how long the workflow will wait for the task resolution event. If a timeout occurs, taskResolution.success will be false, and taskResolution.error will indicate a timeout.
  • Detailed Event Flow:
    1. createTaskAndWaitForResult internally calls the registered actions.create_human_task action.
    2. This action (or its underlying logic) creates the human task record in the database.
    3. createTaskAndWaitForResult then determines the expected resolution event name (typically TaskEventNames.taskCompleted(taskId)) and calls context.events.waitFor() with this event name, causing the workflow to pause.
    4. When the human task is completed (e.g., through a UI interaction that calls an API like submitTaskForm): a. The API endpoint handling task completion (e.g., submitTaskForm) is responsible for emitting the task resolution event (e.g., TASK_COMPLETED with the taskId and resolution data in its payload). b. This event is enqueued using runtime.enqueueEvent(). c. enqueueEvent() persists the event to the database (e.g., workflow_events table) and publishes it to the global Redis stream (workflow:events:global). d. The singleton WorkflowWorker consumes this event from the Redis stream. e. The worker invokes runtime.processQueuedEvent() for the consumed event. f. processQueuedEvent() loads the relevant workflow execution state, applies the event, and then calls notifyEventListeners(). g. notifyEventListeners() finds the specific in-memory listener registered by the earlier context.events.waitFor() call (within the same worker process) and resolves the associated promise. h. This resolution allows createTaskAndWaitForResult to resume, and it then returns the task outcome (including resolutionData from the event payload) to the workflow.

4. Core Components

TypeScript Workflow Runtime

The TypeScript Workflow Runtime is responsible for executing workflows defined as TypeScript functions. It manages workflow state, processes events, and provides the execution context for workflows.

Key features:

  • Workflow registration and execution
  • Event handling and processing
  • State management
  • Action execution coordination
  • Data persistence

Example usage:

import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
import { invoiceApprovalWorkflow } from '../workflows/invoiceApprovalWorkflow';

// Initialize the workflow runtime
const runtime = getWorkflowRuntime();

// Register a workflow
runtime.registerWorkflow(invoiceApprovalWorkflow);

// Start a new workflow execution
const executionId = await runtime.startWorkflow(
  'InvoiceApproval',
  {
    tenant: 'acme',
    initialData: {
      invoice: {
        id: 'inv-123',
        amount: 500,
        status: 'draft'
      }
    }
  }
);

// Option 1: Enqueue an event for a specific workflow execution
await runtime.enqueueEvent({
  execution_id: executionId,
  event_name: 'Submit',
  payload: { submittedBy: 'user-456' },
  user_id: 'user-456',
  tenant: 'acme'
});

// Option 2: Publish an event to the event bus to trigger attached workflows
await submitWorkflowEventAction({
  event_name: 'INVOICE_SUBMITTED',
  event_type: 'INVOICE_SUBMITTED',
  payload: {
    invoiceId: 'inv-123',
    submittedBy: 'user-456'
  }
});

Workflow Context

The Workflow Context provides the execution environment for workflows, giving them access to actions, data, events, and logging capabilities.

Key features:

  • Action execution through a proxy object
  • Data storage and retrieval
  • Event waiting and emission
  • Logging and debugging
  • State management

Example usage within a workflow function:

async function invoiceApprovalWorkflow(context: WorkflowContext) {
  const { actions, data, events, logger, setState } = context;
  
  // Set initial state
  setState('draft');
  
  // Store workflow data
  data.set('invoice', { id: 'inv-123', amount: 500, status: 'draft' });
  
  // Wait for an event
  const submitEvent = await events.waitFor('Submit');
  logger.info(`Invoice submitted by ${submitEvent.user_id}`);
  
  // Execute an action
  await actions.sendNotification({
    recipient: 'manager',
    message: 'Invoice submitted for approval'
  });
  
  // Update state
  setState('submitted');
  
  // Wait for approval or rejection
  const decisionEvent = await events.waitFor(['Approve', 'Reject']);
  
  if (decisionEvent.name === 'Approve') {
    await actions.updateInvoiceStatus({ status: 'approved' });
    setState('approved');
  } else {
    await actions.updateInvoiceStatus({ status: 'rejected' });
    setState('rejected');
  }
}

Action Executor

The action executor is responsible for executing actions in parallel based on their dependencies. It supports various error handling strategies and ensures that actions are executed in the correct order.

Key features:

  • Parallel execution based on dependencies
  • Support for synchronization points (join operations)
  • Error handling strategies (stop, continue, retry, compensate)
  • Transaction management

Example usage:

const actionExecutor = createActionExecutor();
const results = await actionExecutor.executeActions(
  actionsToExecute,
  event,
  tenant
);

console.log(`Executed ${results.length} actions`);
results.forEach(result => {
  if (result.success) {
    console.log(`Action ${result.actionName} succeeded: ${JSON.stringify(result.result)}`);
  } else {
    console.error(`Action ${result.actionName} failed: ${result.error}`);
  }
});

Action Registry

The action registry manages the registration and execution of actions. It ensures that actions are executed idempotently and provides a way to define and validate action parameters.

Key features:

  • Action registration with parameter validation
  • Idempotent action execution
  • Transaction isolation level support
  • Built-in actions for common operations

Example usage:

const registry = getActionRegistry();

// Register a custom action
registry.registerSimpleAction(
  'SendInvoiceEmail',
  'Send an email with invoice details',
  [
    { name: 'recipient', type: 'string', required: true },
    { name: 'invoiceId', type: 'string', required: true },
    { name: 'template', type: 'string', required: false, defaultValue: 'default' }
  ],
  async (params) => {
    // Implementation
    return { sent: true, timestamp: new Date().toISOString() };
  }
);

// Execute an action
const result = await registry.executeAction('SendInvoiceEmail', {
  tenant: 'acme',
  executionId: 'wf-123',
  eventId: 'evt-456',
  idempotencyKey: 'send-invoice-email-789',
  parameters: {
    recipient: 'customer@example.com',
    invoiceId: 'inv-123'
  }
});

5. Asynchronous Event Processing

The workflow system processes all events asynchronously through a message queue. This provides higher throughput, better fault tolerance, and improved scalability.

Redis Streams Integration

Redis Streams is used as the message queue for distributing workflow events.

Global Event Stream: Currently, all workflow-related events—including initial trigger events, events emitted by workflows via context.events.emit(), and task completion events—are published to a single global Redis stream named workflow:events:global. The WorkflowWorker (currently a singleton instance) consumes events from this global stream.

Event Publication Flow: When a workflow event is submitted (e.g., via runtime.enqueueEvent()):

  1. The event is validated and persisted to the database (e.g., into workflow_events and workflow_event_processing tables).
  2. It is then published as a message to the workflow:events:global Redis stream for asynchronous processing.
  3. Worker processes (currently one) consume messages from this stream.

Key Features Leveraged:

  • Consumer Groups: Can be used to ensure each event message is processed by one consumer in a group (relevant for future multi-worker scenarios).
  • Message Acknowledgment: Helps in handling worker failures and ensuring events are not lost.
  • Error Handling: The system includes mechanisms for retrying event processing in case of transient failures.

Two-Phase Event Processing

Event processing is split into two phases:

  1. Enqueue Phase: Validate the event, persist it to the database, and publish it to Redis Streams
  2. Process Phase: Consume the event from Redis Streams, process it using the workflow runtime, and execute the resulting actions

This separation allows for immediate response to clients while ensuring reliable processing of events.

Worker Service

The worker service consumes events from Redis Streams and processes them using the workflow runtime. It provides:

  • Worker lifecycle management (startup, shutdown, health checks)
  • Error handling with proper classification and retry logic
  • Telemetry and logging for monitoring

For more details, see the Worker Service Documentation.

Reliable Processing

To ensure reliable event processing, the system uses:

  • Redis-based locks for critical sections (e.g., during event processing by a worker).
  • Transaction management for data consistency (e.g., when persisting events and updating workflow state).
  • Error classification and recovery strategies for handling failures during event processing.
  • Standardized Identifiers: Key identifiers such as event_id in the workflow_events table and processing_id in the workflow_event_processing table are expected to be standard Universally Unique Identifiers (UUIDs). If idempotency keys are provided and used as event_id, they should also conform to the UUID format or be handled by a system layer that ensures compatibility with database constraints (e.g., by hashing or mapping if the underlying column type is strictly UUID). This ensures data integrity and compatibility with database functions expecting UUIDs.

6. Usage Examples

Basic TypeScript Workflow Definition

async function approvalWorkflow(context: WorkflowContext): Promise<void> {
  const { actions, events, logger } = context;
  
  // Initial state - Processing
  context.setState('processing');
  logger.info('Workflow started in processing state');
  
  // The workflow is triggered by a Submit event, which is passed as input
  const { triggerEvent } = context.input;
  logger.info(`Processing submission from ${triggerEvent.user_id}`);
  
  await actions.log_event({ message: "Item submitted for approval" });
  context.setState('submitted');
  
  // Wait for Approve or Reject event
  const decisionEvent = await events.waitFor(['Approve', 'Reject']);
  
  if (decisionEvent.name === 'Approve') {
    await actions.log_event({ message: "Item approved" });
    context.setState('approved');
  } else {
    await actions.log_event({ message: "Item rejected" });
    context.setState('rejected');
  }
  
  logger.info('Workflow completed');
}

Asynchronous Event Processing Example

import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';

// Initialize the workflow runtime
const runtime = getWorkflowRuntime();

// Option 1: Enqueue an event for a specific workflow execution
await runtime.enqueueEvent({
  execution_id: 'wf-123',
  event_name: 'Approve',
  payload: { approvedBy: 'user-789' },
  user_id: 'user-789',
  tenant: 'acme'
});

// Option 2: Publish an event to the event bus to trigger attached workflows
await submitWorkflowEventAction({
  event_type: 'INVOICE_APPROVED',
  payload: {
    invoiceId: 'inv-123',
    approvedBy: 'user-789'
  }
});

// The event will be processed asynchronously by a worker
console.log('Event enqueued for processing');

7. Operational Considerations

Monitoring

The workflow system provides several monitoring capabilities:

  1. Workflow Telemetry: Counts, rates, and durations of workflow events and actions
  2. Event Tracing: Correlation IDs for tracking events across the system
  3. Health Checks: API endpoints for checking the health of the workflow system
  4. Logging: Comprehensive logging of workflow activities

Scaling

The workflow system can be scaled in several ways:

  1. Vertical Scaling: Increase resources (CPU, memory) for the workflow runtime and worker processes
  2. Horizontal Scaling: Add more worker processes across multiple servers
  3. Database Scaling: Optimize database queries and indexes for efficient event loading and replay
  4. Redis Scaling: Configure Redis for high availability and performance

Troubleshooting

Common issues and their solutions:

  1. Events not being processed:

    • Check Redis connection
    • Verify that events are being published to Redis Streams
    • Check for errors in the worker logs
  2. High event processing latency:

    • Increase the number of workers
    • Optimize database queries
    • Check for resource bottlenecks
  3. Inconsistent workflow state:

    • Verify that actions are idempotent
    • Check for distributed lock failures
    • Ensure that the event log is complete and ordered correctly

Integration with Domain Logic

import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
import { getActionRegistry } from '@shared/workflow/core/actionRegistry';
import { invoiceApprovalWorkflow } from '../workflows/invoiceApprovalWorkflow';

// Initialize the action registry
const registry = getActionRegistry();

// Register domain-specific actions
registry.registerDatabaseAction(
  'UpdateInvoiceStatus',
  'Update the status of an invoice',
  [
    { name: 'invoiceId', type: 'string', required: true },
    { name: 'status', type: 'string', required: true }
  ],
  TransactionIsolationLevel.REPEATABLE_READ,
  async (params, context) => {
    const result = await context.transaction('invoices')
      .where('id', params.invoiceId)
      .update({ status: params.status });
    
    return { updated: result === 1 };
  }
);

// Initialize the workflow runtime
const runtime = getWorkflowRuntime(registry);

// Register the TypeScript workflow
runtime.registerWorkflow(invoiceApprovalWorkflow);

// Start a new workflow execution
const executionId = await runtime.startWorkflow(
  'InvoiceApproval',
  {
    tenant: 'acme',
    initialData: {
      invoice: {
        id: 'inv-123',
        amount: 500,
        status: 'draft'
      }
    }
  }
);

// Enqueue an event
await runtime.enqueueEvent({
  execution_id: executionId,
  event_name: 'Submit',
  payload: { submittedBy: 'user-456' },
  user_id: 'user-456',
  tenant: 'acme'
});

8. Future Enhancements

Planned enhancements for the workflow system include:

  1. Workflow Designer UI: A visual tool for designing and testing TypeScript workflows
  2. Enhanced Monitoring: More detailed metrics and visualizations
  3. Advanced Error Recovery: Automated recovery strategies for different error types
  4. Performance Optimizations: Improved event processing and action execution
  5. TypeScript Workflow Analyzer: Improved static analysis of TypeScript workflows for visualization and validation
  6. Workflow Templates: Reusable workflow patterns and templates
  7. Enhanced Testing Tools: Specialized tools for testing and debugging workflows

9. Conclusion

The workflow system provides a powerful, flexible foundation for modeling and executing complex business processes. Its event-sourced architecture, parallel execution capabilities, and distributed processing make it suitable for a wide range of applications, from simple approval flows to complex multi-stage processes.

The TypeScript-based workflow approach offers several key advantages:

  1. Familiar Programming Model: Developers can use the full power of TypeScript, including its type system, control flow constructs, and error handling.

  2. IDE Support: Full IDE support including code completion, refactoring, and navigation.

  3. Testability: Workflows can be unit tested like any other TypeScript code.

  4. Flexibility: Complex business logic can be expressed naturally using programming constructs.

  5. Maintainability: Standard software engineering practices can be applied to workflow code.

By separating the workflow infrastructure from domain logic, the system enables clean, maintainable code while providing the reliability and auditability required for critical business processes.