PSA/docs/workflow/workflow-system.md
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

853 lines
43 KiB
Markdown

# Workflow System
## 1. Introduction and Philosophy
> **Important Note on Workflow Execution**: Workflows are executed in response to events. When a workflow is triggered, the event that triggered it is passed as input to the workflow. The workflow does not wait for the initial event - the fact that the workflow is executing means the event has already occurred. This is reflected in the workflow examples where each workflow receives its triggering event via `context.input.triggerEvent`.
### Purpose and Vision
The workflow system is a robust, flexible engine designed to model, execute, and manage complex business processes. It provides a structured way to define business logic, execute actions, and respond to events while maintaining a complete audit trail of all activities. The system is built on modern architectural principles to ensure reliability, scalability, and maintainability.
The system uses a TypeScript-based approach to workflow definition, allowing developers to create workflows using familiar programming constructs while leveraging the type safety and tooling of TypeScript.
### Core Principles
The workflow system is built on several key principles:
1. **Event Sourcing**: The system uses event sourcing as its foundational pattern, where the state of a workflow is derived by replaying a sequence of events rather than storing the current state directly. This provides a complete audit trail and enables powerful time-travel debugging capabilities.
2. **Domain-Driven Design**: The workflow engine is designed to be a flexible tool that can model complex domain processes while keeping domain logic separate from the workflow infrastructure.
3. **Programmatic Workflows**: Workflows are defined as TypeScript functions, allowing developers to use familiar programming constructs while maintaining the benefits of a structured workflow system.
4. **Idempotent Execution**: Actions are executed idempotently, ensuring that even if the same event is processed multiple times (e.g., due to retries), the outcome remains consistent.
5. **Parallel Execution**: The system supports executing actions in parallel based on explicit dependencies, enabling efficient processing of complex workflows.
6. **Asynchronous Event Processing**: The system processes all events asynchronously through a message queue, providing improved scalability and fault tolerance.
### Key Benefits
- **Auditability**: Complete history of all events and workflow execution
- **Flexibility**: Customizable workflows defined in TypeScript
- **Scalability**: Distributed processing with Redis Streams
- **Reliability**: Idempotent processing and robust error handling
- **Visibility**: Comprehensive monitoring and observability
- **Maintainability**: Clean separation of concerns and modular architecture
- **Developer Experience**: Familiar TypeScript syntax with IDE support, type checking, and refactoring tools
## 2. Architecture Overview
### System Components
The workflow system consists of the following major components:
1. **TypeScript Workflow Runtime**: Executes workflows defined as TypeScript functions, managing their state and event handling.
2. **Action Executor**: Executes actions in parallel based on their dependencies, with support for various error handling strategies.
3. **Action Registry**: Manages the registration and execution of actions, ensuring idempotent execution.
4. **Persistence Layer**: Stores workflow executions, events, action results, and other workflow-related data.
5. **Workflow Context**: Provides the execution context for workflows, including access to actions, data, events, and logging.
6. **Redis Streams Integration**: Enables asynchronous event distribution. Currently, all workflow-related events are published to a single global Redis stream named `workflow:events:global`. (See Section 5 for more details on event stream usage).
7. **Worker Service**: Processes events asynchronously from Redis Streams.
* **Current Architecture**: The `WorkflowWorker` currently operates as a singleton instance. This means all events from the global Redis stream are processed by this single worker. This is important for understanding how features like `context.events.waitFor` function reliably with in-memory listeners.
* **Future Scalability**: Future plans for scaling may involve distributing workers, potentially by sharding streams by tenant ID (e.g., `workflow:events:tenant_hash_X`). This approach would aim to maintain worker affinity for workflow executions within a tenant, preserving the effectiveness of in-memory event listeners. If a different distribution model is adopted where affinity is not guaranteed, the in-memory listener mechanism for `context.events.waitFor` would require re-evaluation.
8. **Distributed Coordination**: Ensures reliable processing in a distributed environment.
### Data Flow
1. **Event Submission**:
- A client publishes an event to the event bus (e.g., "INVOICE_CREATED")
- The event is persisted to the database
- The system identifies workflows attached to this event type
- For each attached workflow, a new execution is created
- In a distributed setup, the event is also published to Redis Streams
2. **Event Processing**:
- The workflow runtime processes the event within the context of the workflow function
- The workflow function determines how to handle the event based on its current state
- The workflow function may execute actions, update data, or change state
3. **Action Execution**:
- The action executor builds a dependency graph of actions
- It executes actions in parallel based on their dependencies
- Results are stored in the database
- Any errors are handled according to the defined strategy
4. **State Update**:
- The workflow function updates its state based on the event and action results
- The complete event and its processing results are available for querying
### System Workflows and Tenant-Specific Triggering
System workflows represent shared, reusable workflow definitions or templates that are available to all tenants within the platform. Examples might include standard processes like a generic `invoiceLifecycleWorkflow` or other common business operations.
A key aspect to understand is how these system workflows are invoked. Contrary to a potential misunderstanding that they might be triggered by global, non-tenant-specific \"system events\" (e.g., via a hypothetical `system_workflow_event_attachments` table for such global triggers), system workflows such as the invoice lifecycle example are typically triggered in the context of a *specific tenant*.
The triggering mechanism relies on tenant-specific event attachments. When an event relevant to a system workflow (e.g., `INVOICE_UPDATED` for an invoice sync workflow) occurs for a particular tenant, the system consults the tenant-specific `workflow_event_attachments` table. An entry in this table links the `tenant_id`, the `event_type` (e.g., `INVOICE_UPDATED`), and the `workflow_id`. For a system workflow, this `workflow_id` corresponds to its `registration_id`, effectively associating the tenant-specific event with the shared system workflow definition.
This ensures that while the workflow *definition* is shared, its *execution* is always tied to a specific tenant and triggered by events occurring within that tenant's scope. If a table named `system_workflow_event_attachments` exists, its purpose would be distinct from this tenant-specific triggering mechanism. For tenant-aware workflows, the attachment and subsequent execution remain tenant-specific.
### Persistence Model
The workflow system uses several database tables to store its data:
1. **workflow_executions**: Stores metadata about workflow instances
2. **workflow_events**: Stores the event log for each workflow execution
3. **workflow_action_results**: Tracks the results of action executions
4. **workflow_timers**: Manages workflow timers
5. **workflow_action_dependencies**: Stores dependencies between actions
6. **workflow_sync_points**: Manages synchronization points for parallel execution
7. **workflow_event_processing**: Tracks the detailed lifecycle and processing status of individual events intended for asynchronous handling. When an event is enqueued (e.g., via `runtime.enqueueEvent()`), a record is created here. Its status transitions typically from `pending` -> `published` (to Redis) -> `processing` (by a worker) -> `completed` or `failed`. This table is crucial for ensuring event processing reliability, enabling retries, and providing visibility into the asynchronous event pipeline, especially in distributed environments. It links to `workflow_events` via `event_id`.
8. **system_workflow_task_definitions**: Stores definitions for system-wide, reusable task types (e.g., common error handling tasks). These definitions are not tenant-specific and are identified by their `task_type` string. They link to form definitions (often system forms).
9. **workflow_task_definitions**: Stores definitions for task types that are specific to a tenant. These are identified by a UUID (`task_definition_id`) and are scoped to a `tenant`.
10. **workflow_tasks**: Stores instances of human tasks. It links to its definition using:
- `task_definition_type` ('system' or 'tenant'): Indicates whether the task uses a system or tenant-specific definition.
- `tenant_task_definition_id` (UUID): Foreign key to `workflow_task_definitions.task_definition_id` if type is 'tenant'.
- `system_task_definition_task_type` (TEXT): Foreign key to `system_workflow_task_definitions.task_type` if type is 'system'.
11. **system_workflow_form_definitions**: Stores definitions for system-wide, reusable UI forms. These are referenced by task definitions. Key fields include `name` (unique identifier for the form), `json_schema` (defines the data structure and properties of the form), and `ui_schema` (provides hints for rendering the form).
12. **workflow_form_definitions**: Stores tenant-specific UI form definitions, mirroring the structure of `system_workflow_form_definitions`.
13. **workflow_data_store**: A cross-run key-value store scoped by `(tenant, namespace, key)`. Values are stored as JSON, support an optional TTL (lazily expired on read and periodically swept), and carry a revision counter for optimistic concurrency. Accessed via `store.*` workflow actions (`store.get`, `store.set`, `store.delete`, `store.increment`, `store.list`, `store.list_namespaces`).
14. **workflow_entity_links**: Stores typed bidirectional links between two entities (identified by entity type + ID pairs). Supports forward, reverse, and bidirectional traversal. Accessed via `links.*` workflow actions (`links.upsert`, `links.lookup`, `links.delete`, `links.list`, `links.list_namespaces`).
### Human Task Forms and Dynamic Data Display
Human tasks often require displaying dynamic information to the user based on the workflow's current context and the specifics of the task (e.g., error details, entity identifiers). The workflow system facilitates this through the interaction of form definitions (`system_workflow_form_definitions` or `workflow_form_definitions`) and the `contextData` provided when a human task is created via `actions.createHumanTask`.
There are two primary ways dynamic data is presented in forms:
1. **Direct Field Population**:
* The form's `json_schema` defines distinct properties corresponding to individual pieces of data (e.g., `resolutionNotes`, `retryCheckbox`).
* The workflow provides values for these properties directly within the `contextData` object when creating the task. For example:
```typescript
// In the workflow:
await actions.createHumanTask({
taskType: 'some_review_task',
// ... other task parameters
contextData: {
resolutionNotes: "Initial assessment complete.",
isUrgent: true
}
});
```
* The UI rendering layer then uses these values to populate the corresponding input fields (e.g., a textarea for `resolutionNotes`, a checkbox for `isUrgent`).
2. **Template String Substitution**:
* This method is particularly useful for displaying formatted, read-only information composed of multiple data points, or for providing descriptive text that includes dynamic values.
* In the form's `json_schema` (stored in `system_workflow_form_definitions`), a property (often read-only) can have its `default` value set to a template string. This template string includes placeholders that reference keys within the `contextData` object, using the syntax `${contextData.keyName}`.
* Example `json_schema` property for a form definition:
```json
{
// ... other properties ...
"errorReport": {
"type": "string",
"title": "Error Details",
"readOnly": true,
"default": "Workflow Instance ID: ${contextData.workflowInstanceId}\nError Code: ${contextData.errorCode}\nMessage: ${contextData.errorMessageText}\nEntity: ${contextData.entityType} (ID: ${contextData.entityId})"
}
// ...
}
```
* The workflow, when creating the human task, populates the `contextData` with the individual data elements whose keys match the placeholders in the template.
```typescript
// In the workflow, when an error occurs:
await actions.createHumanTask({
taskType: 'accounting_export_error', // This task type would link to the form definition above
title: 'QuickBooks Sync Error',
// ... other task parameters
contextData: {
workflowInstanceId: executionId,
errorCode: 'QBO-123',
errorMessageText: 'Customer not found in QBO.',
entityType: 'Customer',
entityId: algaCompanyId,
// ... any other data needed by the template or for other purposes
}
});
```
* The UI rendering layer is then responsible for:
1. Retrieving the form definition (including the `json_schema` with the template string in the `default` value of the `errorReport` property).
2. Accessing the `contextData` associated with the specific task instance.
3. Performing string substitution on the template string, replacing placeholders like `${contextData.errorCode}` with their corresponding values from the `contextData` (e.g., "QBO-123").
4. Displaying the resulting formatted string to the user (e.g., in a read-only textarea as specified by the `ui_schema`).
* This pattern allows for flexible and descriptive presentation of dynamic information without requiring a separate form field for every individual piece of data if they are only for display. The `accounting-mapping-error-form` example's `productDetails` field demonstrates this approach.
**Key Considerations for Template Substitution:**
* The workflow must ensure that all keys referenced in the form's template string are present in the `contextData` it provides when creating the task.
* The UI component responsible for rendering the form must implement the logic to perform this substitution.
* This method is best suited for read-only display of information. If user input is required for these individual pieces of data, direct field population (Method 1) is more appropriate.
#### Enhanced Templating for Dynamic Content
To provide more flexibility while maintaining security for user-influenced template strings (e.g., in form `default` values or dynamic UI text), the templating mechanism described above is being enhanced. This enhancement will utilize the existing Parsimmon dependency to parse and evaluate a controlled, limited set of JavaScript-like expressions within the `${...}` syntax.
**Key features of the enhanced templating:**
* **Parser:** Implemented using Parsimmon.
* **Supported Expressions (Initial Scope):**
* Variable access: `variableName` or `contextData.variableName`
* String literals: `'some string'`
* Logical OR: `expression1 || expression2`
* Date formatting: `new Date(variableOrString).toLocaleDateString()` and `new Date(variableOrString).toLocaleString()`
* **Security:** The custom parser and evaluator are designed to be secure by only allowing the explicitly defined expressions and operations, preventing arbitrary code execution.
* **Integration:** This will affect how `default` values in JSON schemas and other templated strings are processed by `server/src/utils/templateUtils.ts`, impacting components like `DynamicForm.tsx` and `ActivityDetailViewerDrawer.tsx`.
For a detailed technical design of this Parsimmon-based templating engine, please refer to "[`docs/technical/parsimmon_templating_engine.md`](../technical/parsimmon_templating_engine.md)".
By understanding these data flow patterns, developers can effectively design workflows that create informative and actionable human tasks.
## 3. TypeScript-Based Workflows
The workflow system uses TypeScript functions for defining workflows, providing a programmatic approach with full access to TypeScript's features.
#### Basic Structure
```typescript
async function workflow(context: WorkflowContext): Promise<void> {
const { actions, data, events, logger } = context;
// Initial state
context.setState('initial');
// Wait for events
const startEvent = await events.waitFor('Start');
// Execute actions, including the new composite action
// await actions.doSomething({ param1: 'value1' });
// Example of the new action:
// const taskOutcome = await actions.createTaskAndWaitForResult({
// taskType: 'user_review',
// title: 'Review Item',
// contextData: { itemId: '123' }
// });
// if (taskOutcome.success) {
// logger.info(`Task resolved with: ${JSON.stringify(taskOutcome.resolutionData)}`);
// }
// Update state
context.setState('completed');
}
```
#### Workflow Context
The `WorkflowContext` provides access to:
- **actions**: Proxy object for executing registered actions. This now includes a powerful composite action `createTaskAndWaitForResult` which simplifies creating a human task and pausing the workflow until it's resolved.
- **data**: Data manager for storing and retrieving workflow data
- **events**: Event manager for waiting for and emitting events.
- `events.waitFor()`: Pauses workflow execution until a specified external event (or one of several specified events) occurs. It relies on in-memory listeners within the `TypeScriptWorkflowRuntime` instance processing the workflow. Given the current singleton `WorkflowWorker` architecture (see Section 2), these in-memory listeners are effective as the same worker instance that initiates the wait will process the resolving event. This mechanism is crucial for the `actions.createTaskAndWaitForResult` composite action.
- `events.emit()`: Asynchronously enqueues an event, which will be persisted and published to the global Redis stream for processing by the worker.
- **logger**: Logger for workflow execution
- **setState/getCurrentState**: Methods for managing workflow state
#### Control Flow
TypeScript-based workflows use native language constructs for control flow:
```typescript
// Conditional logic
if (data.get<InvoiceData>('invoice').amount > 1000) {
await actions.requireAdditionalApproval();
}
// Switch statements
switch (event.payload.status) {
case 'approved':
await actions.processApproval();
break;
case 'rejected':
await actions.processRejection();
break;
default:
throw new Error('Invalid status');
}
// Loops
for (const item of data.get<Order>('order').items) {
await actions.processItem(item);
}
```
#### Parallel Execution
```typescript
// Execute actions in parallel
await Promise.all([
actions.validateInvoice(),
actions.checkBudget(),
actions.verifyApprover()
]);
```
#### Error Handling
```typescript
try {
await actions.riskyOperation();
} catch (error) {
logger.error('Operation failed', error);
await actions.handleFailure();
}
// For errors that are recoverable with human intervention, a more interactive pattern involves creating a human task and then pausing the workflow until the task is resolved, allowing the operation to be retried. The `actions.createTaskAndWaitForResult` method greatly simplifies this. See the "Interactive Error Resolution and Retries with Human Tasks" section for a detailed example.
```
#### Interactive Error Resolution and Retries with Human Tasks
A common and powerful pattern for handling operations that might fail due to issues requiring manual intervention (e.g., missing data, incorrect configuration, external system errors) is to:
1. Attempt the operation.
2. If it fails, use `actions.createTaskAndWaitForResult` to create a human task and automatically pause the workflow. This action encapsulates creating the task, determining the correct event to wait for (typically `TaskEventNames.taskCompleted(taskId)`), and then calling `context.events.waitFor()`.
3. The workflow resumes when `createTaskAndWaitForResult` returns, providing the outcome of the human task (e.g., data submitted by the user, or a timeout indication).
4. Based on the task's outcome, the workflow can retry the operation or take other actions.
This allows the workflow to remain in context and continue from where it left off, rather than terminating and requiring a full restart.
**Conceptual Flow (Simplified with `createTaskAndWaitForResult`):**
```
loop until operation_succeeds or max_retries_reached:
try:
perform_operation()
operation_succeeds = true
catch error_requiring_human_intervention:
task_outcome = await actions.createTaskAndWaitForResult(
taskType: 'resolve_error_X',
title: 'Resolve Error for Operation Y',
contextData: { error_details, relevant_ids },
waitForEventTimeoutMilliseconds: 3_600_000 // Optional: 1 hour timeout
)
if task_outcome.success and task_outcome.resolutionData?.userFixedTheProblem:
// Loop continues, operation will be retried
logger.info("User indicated problem resolved. Retrying operation.")
else:
logger.warn("Task did not result in a successful resolution. Aborting retry.")
// Handle non-resolution (e.g., log, escalate, break loop)
break
```
**Example: Processing an item that requires a valid mapping (using `createTaskAndWaitForResult`)**
```typescript
async function processItemWithMappingWorkflow(context: WorkflowContext): Promise<void> {
const { actions, data, logger, setState, executionId } = context; // `events` is not directly needed here
const itemId = data.get('itemIdToProcess');
let itemProcessed = false;
let attempts = 0;
const MAX_ATTEMPTS = 3; // Define a retry limit
setState('VALIDATING_ITEM_MAPPING');
while (!itemProcessed && attempts < MAX_ATTEMPTS) {
attempts++;
logger.info(`Attempt ${attempts} to process item ${itemId}.`);
try {
// Action that might fail if mapping is missing or invalid
const mappingDetails = await actions.getMappingForItem({ itemId });
if (!mappingDetails || !mappingDetails.isValid) {
throw new Error(`Mapping for item ${itemId} is missing or invalid.`);
}
// Proceed with processing using mappingDetails
await actions.processItemUsingMapping({ itemId, mappingDetails });
itemProcessed = true;
logger.info(`Item ${itemId} processed successfully.`);
setState('ITEM_PROCESSING_COMPLETE');
} catch (error: any) {
logger.error(`Error processing item ${itemId} on attempt ${attempts}: ${error.message}`);
setState(`AWAITING_MAPPING_RESOLUTION_FOR_ITEM_${itemId}_ATTEMPT_${attempts}`);
// Use the new composite action to create a task and wait for its resolution
const taskResolution = await actions.createTaskAndWaitForResult({
taskType: 'resolve_item_mapping_error', // Links to a pre-defined task and form
title: `Resolve Mapping for Item ${itemId} (Attempt ${attempts})`,
description: `Error: ${error.message}. Please review and correct the mapping for item ID: ${itemId}.`,
contextData: { // Data for the form and task context
itemId: itemId,
errorMessage: error.message,
currentAttempt: attempts,
maxAttempts: MAX_ATTEMPTS,
workflowInstanceId: executionId
},
// Optional: Timeout for waiting for the task resolution event
// waitForEventTimeoutMilliseconds: 60 * 60 * 1000 // 1 hour
});
if (taskResolution.success) {
// Task was completed, event was received.
// The payload of the event (taskResolution.resolutionData) might contain user input.
logger.info(`Task ${taskResolution.taskId} for item ${itemId} resolved. Resolution data: ${JSON.stringify(taskResolution.resolutionData)}. Retrying processing.`);
// The loop will continue, and getMappingForItem will be re-attempted.
} else {
// Task creation might have failed, or waitFor event might have timed out or errored.
logger.error(`Failed to get resolution for task for item ${itemId}. Error: ${taskResolution.error}. Details: ${JSON.stringify(taskResolution.details)}. Halting retries for this item.`);
setState('CRITICAL_ERROR_TASK_RESOLUTION_FAILED');
// Depending on policy, you might throw an error here or return
return;
}
}
} // End while loop
if (!itemProcessed) {
logger.error(`Failed to process item ${itemId} after ${MAX_ATTEMPTS} attempts.`);
setState('ITEM_PROCESSING_FAILED_MAX_ATTEMPTS');
// Optionally, create a final escalation task or take other compensatory actions.
// Can still use createHumanTask directly if no waiting is needed, or createTaskAndWaitForResult if it is.
await actions.createHumanTask({
taskType: 'item_processing_escalation',
title: `Escalation: Item ${itemId} processing failed after max attempts.`,
contextData: { itemId, attempts, workflowInstanceId: executionId }
});
}
}
```
**Key Considerations for `actions.createTaskAndWaitForResult`:**
* **Simplicity**: This action significantly simplifies the workflow code by encapsulating the `createHumanTask` call, event name construction (it uses `TaskEventNames.taskCompleted(taskId)` internally), and the `events.waitFor()` call.
* **Return Value**: The action returns an object (`CreateTaskAndWaitForResultReturn`) indicating `success` (boolean), the `taskId` (string | null), `resolutionData` (any, the payload of the task completion event), and an optional `error` message or `details` object.
* **Task Resolution Event**: The system relies on an event named according to `TaskEventNames.taskCompleted(taskId)` being emitted when the human task is resolved. This is a standard convention.
* **Retry Limits**: Always include a mechanism (like a maximum attempt counter) to prevent infinite loops.
* **Idempotency**: Ensure retried operations are idempotent.
* **`setState` for Observability**: Continue to use `context.setState()` for visibility.
* **Timeout**: The `waitForEventTimeoutMilliseconds` parameter in `createTaskAndWaitForResultParams` allows specifying a timeout for how long the workflow will wait for the task resolution event. If a timeout occurs, `taskResolution.success` will be `false`, and `taskResolution.error` will indicate a timeout.
* **Detailed Event Flow**:
1. `createTaskAndWaitForResult` internally calls the registered `actions.create_human_task` action.
2. This action (or its underlying logic) creates the human task record in the database.
3. `createTaskAndWaitForResult` then determines the expected resolution event name (typically `TaskEventNames.taskCompleted(taskId)`) and calls `context.events.waitFor()` with this event name, causing the workflow to pause.
4. When the human task is completed (e.g., through a UI interaction that calls an API like `submitTaskForm`):
a. The API endpoint handling task completion (e.g., `submitTaskForm`) is responsible for emitting the task resolution event (e.g., `TASK_COMPLETED` with the `taskId` and resolution data in its payload).
b. This event is enqueued using `runtime.enqueueEvent()`.
c. `enqueueEvent()` persists the event to the database (e.g., `workflow_events` table) and publishes it to the global Redis stream (`workflow:events:global`).
d. The singleton `WorkflowWorker` consumes this event from the Redis stream.
e. The worker invokes `runtime.processQueuedEvent()` for the consumed event.
f. `processQueuedEvent()` loads the relevant workflow execution state, applies the event, and then calls `notifyEventListeners()`.
g. `notifyEventListeners()` finds the specific in-memory listener registered by the earlier `context.events.waitFor()` call (within the same worker process) and resolves the associated promise.
h. This resolution allows `createTaskAndWaitForResult` to resume, and it then returns the task outcome (including `resolutionData` from the event payload) to the workflow.
## 4. Core Components
### TypeScript Workflow Runtime
The TypeScript Workflow Runtime is responsible for executing workflows defined as TypeScript functions. It manages workflow state, processes events, and provides the execution context for workflows.
Key features:
- Workflow registration and execution
- Event handling and processing
- State management
- Action execution coordination
- Data persistence
Example usage:
```typescript
import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
import { invoiceApprovalWorkflow } from '../workflows/invoiceApprovalWorkflow';
// Initialize the workflow runtime
const runtime = getWorkflowRuntime();
// Register a workflow
runtime.registerWorkflow(invoiceApprovalWorkflow);
// Start a new workflow execution
const executionId = await runtime.startWorkflow(
'InvoiceApproval',
{
tenant: 'acme',
initialData: {
invoice: {
id: 'inv-123',
amount: 500,
status: 'draft'
}
}
}
);
// Option 1: Enqueue an event for a specific workflow execution
await runtime.enqueueEvent({
execution_id: executionId,
event_name: 'Submit',
payload: { submittedBy: 'user-456' },
user_id: 'user-456',
tenant: 'acme'
});
// Option 2: Publish an event to the event bus to trigger attached workflows
await submitWorkflowEventAction({
event_name: 'INVOICE_SUBMITTED',
event_type: 'INVOICE_SUBMITTED',
payload: {
invoiceId: 'inv-123',
submittedBy: 'user-456'
}
});
```
### Workflow Context
The Workflow Context provides the execution environment for workflows, giving them access to actions, data, events, and logging capabilities.
Key features:
- Action execution through a proxy object
- Data storage and retrieval
- Event waiting and emission
- Logging and debugging
- State management
Example usage within a workflow function:
```typescript
async function invoiceApprovalWorkflow(context: WorkflowContext) {
const { actions, data, events, logger, setState } = context;
// Set initial state
setState('draft');
// Store workflow data
data.set('invoice', { id: 'inv-123', amount: 500, status: 'draft' });
// Wait for an event
const submitEvent = await events.waitFor('Submit');
logger.info(`Invoice submitted by ${submitEvent.user_id}`);
// Execute an action
await actions.sendNotification({
recipient: 'manager',
message: 'Invoice submitted for approval'
});
// Update state
setState('submitted');
// Wait for approval or rejection
const decisionEvent = await events.waitFor(['Approve', 'Reject']);
if (decisionEvent.name === 'Approve') {
await actions.updateInvoiceStatus({ status: 'approved' });
setState('approved');
} else {
await actions.updateInvoiceStatus({ status: 'rejected' });
setState('rejected');
}
}
```
### Action Executor
The action executor is responsible for executing actions in parallel based on their dependencies. It supports various error handling strategies and ensures that actions are executed in the correct order.
Key features:
- Parallel execution based on dependencies
- Support for synchronization points (join operations)
- Error handling strategies (stop, continue, retry, compensate)
- Transaction management
Example usage:
```typescript
const actionExecutor = createActionExecutor();
const results = await actionExecutor.executeActions(
actionsToExecute,
event,
tenant
);
console.log(`Executed ${results.length} actions`);
results.forEach(result => {
if (result.success) {
console.log(`Action ${result.actionName} succeeded: ${JSON.stringify(result.result)}`);
} else {
console.error(`Action ${result.actionName} failed: ${result.error}`);
}
});
```
### Action Registry
The action registry manages the registration and execution of actions. It ensures that actions are executed idempotently and provides a way to define and validate action parameters.
Key features:
- Action registration with parameter validation
- Idempotent action execution
- Transaction isolation level support
- Built-in actions for common operations
Example usage:
```typescript
const registry = getActionRegistry();
// Register a custom action
registry.registerSimpleAction(
'SendInvoiceEmail',
'Send an email with invoice details',
[
{ name: 'recipient', type: 'string', required: true },
{ name: 'invoiceId', type: 'string', required: true },
{ name: 'template', type: 'string', required: false, defaultValue: 'default' }
],
async (params) => {
// Implementation
return { sent: true, timestamp: new Date().toISOString() };
}
);
// Execute an action
const result = await registry.executeAction('SendInvoiceEmail', {
tenant: 'acme',
executionId: 'wf-123',
eventId: 'evt-456',
idempotencyKey: 'send-invoice-email-789',
parameters: {
recipient: 'customer@example.com',
invoiceId: 'inv-123'
}
});
```
## 5. Asynchronous Event Processing
The workflow system processes all events asynchronously through a message queue. This provides higher throughput, better fault tolerance, and improved scalability.
### Redis Streams Integration
Redis Streams is used as the message queue for distributing workflow events.
**Global Event Stream**: Currently, all workflow-related events—including initial trigger events, events emitted by workflows via `context.events.emit()`, and task completion events—are published to a single global Redis stream named `workflow:events:global`. The `WorkflowWorker` (currently a singleton instance) consumes events from this global stream.
**Event Publication Flow**: When a workflow event is submitted (e.g., via `runtime.enqueueEvent()`):
1. The event is validated and persisted to the database (e.g., into `workflow_events` and `workflow_event_processing` tables).
2. It is then published as a message to the `workflow:events:global` Redis stream for asynchronous processing.
3. Worker processes (currently one) consume messages from this stream.
**Key Features Leveraged**:
- **Consumer Groups**: Can be used to ensure each event message is processed by one consumer in a group (relevant for future multi-worker scenarios).
- **Message Acknowledgment**: Helps in handling worker failures and ensuring events are not lost.
- **Error Handling**: The system includes mechanisms for retrying event processing in case of transient failures.
### Two-Phase Event Processing
Event processing is split into two phases:
1. **Enqueue Phase**: Validate the event, persist it to the database, and publish it to Redis Streams
2. **Process Phase**: Consume the event from Redis Streams, process it using the workflow runtime, and execute the resulting actions
This separation allows for immediate response to clients while ensuring reliable processing of events.
### Worker Service
The worker service consumes events from Redis Streams and processes them using the workflow runtime. It provides:
- Worker lifecycle management (startup, shutdown, health checks)
- Error handling with proper classification and retry logic
- Telemetry and logging for monitoring
For more details, see the [Worker Service Documentation](worker-service.md).
### Reliable Processing
To ensure reliable event processing, the system uses:
- Redis-based locks for critical sections (e.g., during event processing by a worker).
- Transaction management for data consistency (e.g., when persisting events and updating workflow state).
- Error classification and recovery strategies for handling failures during event processing.
- **Standardized Identifiers**: Key identifiers such as `event_id` in the `workflow_events` table and `processing_id` in the `workflow_event_processing` table are expected to be standard Universally Unique Identifiers (UUIDs). If idempotency keys are provided and used as `event_id`, they should also conform to the UUID format or be handled by a system layer that ensures compatibility with database constraints (e.g., by hashing or mapping if the underlying column type is strictly UUID). This ensures data integrity and compatibility with database functions expecting UUIDs.
## 6. Usage Examples
### Basic TypeScript Workflow Definition
```typescript
async function approvalWorkflow(context: WorkflowContext): Promise<void> {
const { actions, events, logger } = context;
// Initial state - Processing
context.setState('processing');
logger.info('Workflow started in processing state');
// The workflow is triggered by a Submit event, which is passed as input
const { triggerEvent } = context.input;
logger.info(`Processing submission from ${triggerEvent.user_id}`);
await actions.log_event({ message: "Item submitted for approval" });
context.setState('submitted');
// Wait for Approve or Reject event
const decisionEvent = await events.waitFor(['Approve', 'Reject']);
if (decisionEvent.name === 'Approve') {
await actions.log_event({ message: "Item approved" });
context.setState('approved');
} else {
await actions.log_event({ message: "Item rejected" });
context.setState('rejected');
}
logger.info('Workflow completed');
}
```
### Asynchronous Event Processing Example
```typescript
import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
// Initialize the workflow runtime
const runtime = getWorkflowRuntime();
// Option 1: Enqueue an event for a specific workflow execution
await runtime.enqueueEvent({
execution_id: 'wf-123',
event_name: 'Approve',
payload: { approvedBy: 'user-789' },
user_id: 'user-789',
tenant: 'acme'
});
// Option 2: Publish an event to the event bus to trigger attached workflows
await submitWorkflowEventAction({
event_type: 'INVOICE_APPROVED',
payload: {
invoiceId: 'inv-123',
approvedBy: 'user-789'
}
});
// The event will be processed asynchronously by a worker
console.log('Event enqueued for processing');
```
## 7. Operational Considerations
### Monitoring
The workflow system provides several monitoring capabilities:
1. **Workflow Telemetry**: Counts, rates, and durations of workflow events and actions
2. **Event Tracing**: Correlation IDs for tracking events across the system
3. **Health Checks**: API endpoints for checking the health of the workflow system
4. **Logging**: Comprehensive logging of workflow activities
### Scaling
The workflow system can be scaled in several ways:
1. **Vertical Scaling**: Increase resources (CPU, memory) for the workflow runtime and worker processes
2. **Horizontal Scaling**: Add more worker processes across multiple servers
3. **Database Scaling**: Optimize database queries and indexes for efficient event loading and replay
4. **Redis Scaling**: Configure Redis for high availability and performance
### Troubleshooting
Common issues and their solutions:
1. **Events not being processed**:
- Check Redis connection
- Verify that events are being published to Redis Streams
- Check for errors in the worker logs
2. **High event processing latency**:
- Increase the number of workers
- Optimize database queries
- Check for resource bottlenecks
3. **Inconsistent workflow state**:
- Verify that actions are idempotent
- Check for distributed lock failures
- Ensure that the event log is complete and ordered correctly
### Integration with Domain Logic
```typescript
import { getWorkflowRuntime } from '@shared/workflow/core/workflowRuntime';
import { getActionRegistry } from '@shared/workflow/core/actionRegistry';
import { invoiceApprovalWorkflow } from '../workflows/invoiceApprovalWorkflow';
// Initialize the action registry
const registry = getActionRegistry();
// Register domain-specific actions
registry.registerDatabaseAction(
'UpdateInvoiceStatus',
'Update the status of an invoice',
[
{ name: 'invoiceId', type: 'string', required: true },
{ name: 'status', type: 'string', required: true }
],
TransactionIsolationLevel.REPEATABLE_READ,
async (params, context) => {
const result = await context.transaction('invoices')
.where('id', params.invoiceId)
.update({ status: params.status });
return { updated: result === 1 };
}
);
// Initialize the workflow runtime
const runtime = getWorkflowRuntime(registry);
// Register the TypeScript workflow
runtime.registerWorkflow(invoiceApprovalWorkflow);
// Start a new workflow execution
const executionId = await runtime.startWorkflow(
'InvoiceApproval',
{
tenant: 'acme',
initialData: {
invoice: {
id: 'inv-123',
amount: 500,
status: 'draft'
}
}
}
);
// Enqueue an event
await runtime.enqueueEvent({
execution_id: executionId,
event_name: 'Submit',
payload: { submittedBy: 'user-456' },
user_id: 'user-456',
tenant: 'acme'
});
```
## 8. Future Enhancements
Planned enhancements for the workflow system include:
1. **Workflow Designer UI**: A visual tool for designing and testing TypeScript workflows
2. **Enhanced Monitoring**: More detailed metrics and visualizations
3. **Advanced Error Recovery**: Automated recovery strategies for different error types
4. **Performance Optimizations**: Improved event processing and action execution
5. **TypeScript Workflow Analyzer**: Improved static analysis of TypeScript workflows for visualization and validation
6. **Workflow Templates**: Reusable workflow patterns and templates
7. **Enhanced Testing Tools**: Specialized tools for testing and debugging workflows
## 9. Conclusion
The workflow system provides a powerful, flexible foundation for modeling and executing complex business processes. Its event-sourced architecture, parallel execution capabilities, and distributed processing make it suitable for a wide range of applications, from simple approval flows to complex multi-stage processes.
The TypeScript-based workflow approach offers several key advantages:
1. **Familiar Programming Model**: Developers can use the full power of TypeScript, including its type system, control flow constructs, and error handling.
2. **IDE Support**: Full IDE support including code completion, refactoring, and navigation.
3. **Testability**: Workflows can be unit tested like any other TypeScript code.
4. **Flexibility**: Complex business logic can be expressed naturally using programming constructs.
5. **Maintainability**: Standard software engineering practices can be applied to workflow code.
By separating the workflow infrastructure from domain logic, the system enables clean, maintainable code while providing the reliability and auditability required for critical business processes.