PSA/shared/workflow/utils/distributedTransaction.ts
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

189 lines
6.1 KiB
TypeScript

import { v4 as uuidv4 } from 'uuid';
import { Knex } from 'knex';
import { logger } from '@alga-psa/core';
import { acquireDistributedLock, releaseDistributedLock, LockError, LockErrorType } from './distributedLock';
/**
* Error types for distributed transaction operations
*/
export enum TransactionErrorType {
LOCK_ACQUISITION_FAILED = 'lock_acquisition_failed',
TRANSACTION_FAILED = 'transaction_failed',
TIMEOUT = 'timeout',
INTERNAL_ERROR = 'internal_error'
}
/**
* Error class for distributed transaction operations
*/
export class TransactionError extends Error {
type: TransactionErrorType;
cause?: Error;
constructor(message: string, type: TransactionErrorType, cause?: Error) {
super(message);
this.name = 'TransactionError';
this.type = type;
this.cause = cause;
}
}
/**
* Options for executing a distributed transaction
*/
export interface DistributedTransactionOptions {
/**
* Maximum time to wait for lock acquisition in milliseconds
* Default: 10000 (10 seconds)
*/
lockWaitTimeMs?: number;
/**
* Time-to-live for the lock in milliseconds
* Default: 30000 (30 seconds)
*/
lockTtlMs?: number;
/**
* Transaction isolation level
* Default: 'repeatable read'
*/
isolationLevel?: Knex.IsolationLevels;
/**
* Whether to throw an error if lock acquisition fails
* Default: true
*/
throwOnLockFailure?: boolean;
}
/**
* Default transaction options
*/
const DEFAULT_TRANSACTION_OPTIONS: Required<DistributedTransactionOptions> = {
lockWaitTimeMs: 10000,
lockTtlMs: 30000,
isolationLevel: 'repeatable read',
throwOnLockFailure: true
};
/**
* A distributed transaction manager that uses distributed locks to ensure
* that only one process can execute a transaction on a specific resource at a time.
*
* This is useful for cross-process coordination in a distributed system.
*/
export class DistributedTransactionManager {
/**
* Execute a function within a distributed transaction
*
* @param knex Knex instance
* @param resourceKey Key identifying the resource being accessed
* @param fn Function to execute within the transaction
* @param options Transaction options
* @returns Result of the function
* @throws TransactionError if the transaction fails
*/
static async executeTransaction<T>(
knex: Knex,
resourceKey: string,
fn: (trx: Knex.Transaction) => Promise<T>,
options: DistributedTransactionOptions = {}
): Promise<T> {
const opts: Required<DistributedTransactionOptions> = {
...DEFAULT_TRANSACTION_OPTIONS,
...options
};
// Generate a unique owner ID for this transaction
const ownerId = `transaction-${uuidv4()}`;
// Format the lock key to include the resource key
const lockKey = `transaction:${resourceKey}`;
try {
// Acquire a distributed lock for the resource
const lockAcquired = await acquireDistributedLock(lockKey, ownerId, {
waitTimeMs: opts.lockWaitTimeMs,
ttlMs: opts.lockTtlMs,
throwOnFailure: opts.throwOnLockFailure
});
if (!lockAcquired) {
const errorMessage = `Failed to acquire lock for resource ${resourceKey}`;
logger.warn(`[DistributedTransactionManager] ${errorMessage}`);
if (opts.throwOnLockFailure) {
throw new TransactionError(errorMessage, TransactionErrorType.LOCK_ACQUISITION_FAILED);
}
// Return a default value if we don't throw
return undefined as unknown as T;
}
logger.debug(`[DistributedTransactionManager] Acquired lock for resource ${resourceKey}`);
try {
// Execute the function within a database transaction
return await knex.transaction(async (trx) => {
return await fn(trx);
}, {
isolationLevel: opts.isolationLevel
});
} catch (error) {
const errorMessage = `Transaction failed for resource ${resourceKey}: ${error instanceof Error ? error.message : String(error)}`;
logger.error(`[DistributedTransactionManager] ${errorMessage}`);
throw new TransactionError(
errorMessage,
TransactionErrorType.TRANSACTION_FAILED,
error instanceof Error ? error : undefined
);
} finally {
// Release the distributed lock
try {
await releaseDistributedLock(lockKey, ownerId, false);
logger.debug(`[DistributedTransactionManager] Released lock for resource ${resourceKey}`);
} catch (error) {
// Just log the error, don't throw
logger.warn(`[DistributedTransactionManager] Failed to release lock for resource ${resourceKey}: ${error instanceof Error ? error.message : String(error)}`);
}
}
} catch (error) {
// Handle lock errors
if (error instanceof LockError) {
const errorType = error.type === LockErrorType.TIMEOUT
? TransactionErrorType.TIMEOUT
: TransactionErrorType.LOCK_ACQUISITION_FAILED;
throw new TransactionError(error.message, errorType, error);
}
// Handle other errors
const errorMessage = `Error in distributed transaction for resource ${resourceKey}: ${error instanceof Error ? error.message : String(error)}`;
logger.error(`[DistributedTransactionManager] ${errorMessage}`);
throw new TransactionError(errorMessage, TransactionErrorType.INTERNAL_ERROR, error instanceof Error ? error : undefined);
}
}
}
/**
* Execute a function within a distributed transaction
*
* @param knex Knex instance
* @param resourceKey Key identifying the resource being accessed
* @param fn Function to execute within the transaction
* @param options Transaction options
* @returns Result of the function
* @throws TransactionError if the transaction fails
*/
export async function executeDistributedTransaction<T>(
knex: Knex,
resourceKey: string,
fn: (trx: Knex.Transaction) => Promise<T>,
options: DistributedTransactionOptions = {}
): Promise<T> {
return DistributedTransactionManager.executeTransaction(knex, resourceKey, fn, options);
}