PSA/shared/workflow/runtime/services/workflowStepQuotaService.ts
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

395 lines
13 KiB
TypeScript

import type { Knex } from 'knex';
import { resolveTier, type TenantTier } from '@alga-psa/types';
import logger from '@alga-psa/core/logger';
const WORKFLOW_STEP_LIMIT_METADATA_KEY = 'workflow_step_limit';
const ACTIVE_STATUSES = ['trialing', 'active', 'past_due', 'unpaid'] as const;
const STATUS_PRIORITY: Record<string, number> = {
trialing: 0,
active: 1,
past_due: 2,
unpaid: 3,
};
const TIER_DEFAULT_LIMITS: Record<TenantTier, number> = {
essentials: 150,
solo: 150,
pro: 750,
premium: 10000,
};
export type WorkflowStepQuotaPeriodSource = 'stripe_subscription' | 'fallback_calendar';
export type WorkflowStepQuotaLimitSource =
| 'stripe_price_metadata'
| 'stripe_product_metadata'
| 'tier_default'
| 'unlimited_metadata';
export type WorkflowStepQuotaSummary = {
tenant: string;
periodStart: string;
periodEnd: string;
periodSource: WorkflowStepQuotaPeriodSource;
stripeSubscriptionId: string | null;
effectiveLimit: number | null;
usedCount: number;
remaining: number | null;
tier: TenantTier;
limitSource: WorkflowStepQuotaLimitSource;
};
export type WorkflowStepQuotaReservationResult =
| {
allowed: true;
summary: WorkflowStepQuotaSummary;
usedCountAfter: number;
}
| {
allowed: false;
summary: WorkflowStepQuotaSummary;
};
export type WorkflowStepQuotaReconciliation = {
tenant: string;
periodStart: string;
periodEnd: string;
counterUsedCount: number;
ledgerStepCount: number;
drift: number;
};
type StripeSubscriptionRow = {
tenant: string;
stripe_subscription_id: string;
stripe_price_id: string;
status: string;
current_period_start: string | Date | null;
current_period_end: string | Date | null;
};
type MetadataLimit = {
effectiveLimit: number | null;
limitSource: WorkflowStepQuotaLimitSource;
};
type UsageRow = {
tenant: string;
period_start: string | Date;
period_end: string | Date;
period_source: WorkflowStepQuotaPeriodSource;
stripe_subscription_id: string | null;
effective_limit: number | null;
used_count: number;
limit_source: WorkflowStepQuotaLimitSource;
tier: TenantTier;
updated_at?: string | Date;
};
function toIso(value: string | Date): string {
return typeof value === 'string' ? new Date(value).toISOString() : value.toISOString();
}
function currentUtcMonthPeriod(now = new Date()): { periodStart: string; periodEnd: string } {
const year = now.getUTCFullYear();
const month = now.getUTCMonth();
const periodStart = new Date(Date.UTC(year, month, 1, 0, 0, 0, 0)).toISOString();
const periodEnd = new Date(Date.UTC(year, month + 1, 1, 0, 0, 0, 0)).toISOString();
return { periodStart, periodEnd };
}
function parseLimitMetadata(raw: unknown, source: 'stripe_price_metadata' | 'stripe_product_metadata'): MetadataLimit | null {
if (raw == null) return null;
const normalized = String(raw).trim().toLowerCase();
if (normalized === 'unlimited') {
return { effectiveLimit: null, limitSource: 'unlimited_metadata' };
}
const parsed = Number(normalized);
if (Number.isInteger(parsed) && parsed > 0) {
return { effectiveLimit: parsed, limitSource: source };
}
return null;
}
function isMetadataValuePresent(raw: unknown): boolean {
if (raw == null) return false;
if (typeof raw === 'string' && raw.trim() === '') return false;
return true;
}
async function hasStripeTables(knex: Knex): Promise<boolean> {
const [subs, prices, products] = await Promise.all([
knex.schema.hasTable('stripe_subscriptions'),
knex.schema.hasTable('stripe_prices'),
knex.schema.hasTable('stripe_products'),
]);
return subs && prices && products;
}
async function getTenantTier(knex: Knex, tenant: string): Promise<TenantTier> {
const row = await knex('tenants').where({ tenant }).select('plan').first<{ plan?: string | null }>();
return resolveTier(row?.plan ?? null).tier;
}
async function findPreferredSubscription(knex: Knex, tenant: string, now: Date): Promise<StripeSubscriptionRow | null> {
const nowIso = now.toISOString();
const subscriptions = await knex<StripeSubscriptionRow>('stripe_subscriptions')
.where({ tenant })
.whereIn('status', ACTIVE_STATUSES as unknown as string[])
.whereNotNull('current_period_start')
.whereNotNull('current_period_end')
.andWhere('current_period_start', '<=', nowIso)
.andWhere('current_period_end', '>', nowIso)
.select('tenant', 'stripe_subscription_id', 'stripe_price_id', 'status', 'current_period_start', 'current_period_end');
const sorted = subscriptions
.filter((row) => row.current_period_start && row.current_period_end)
.sort((a, b) => {
const left = STATUS_PRIORITY[a.status] ?? Number.MAX_SAFE_INTEGER;
const right = STATUS_PRIORITY[b.status] ?? Number.MAX_SAFE_INTEGER;
if (left !== right) return left - right;
return new Date(b.current_period_start as string).getTime() - new Date(a.current_period_start as string).getTime();
});
return sorted[0] ?? null;
}
async function resolveMetadataLimit(knex: Knex, tenant: string, priceId: string): Promise<MetadataLimit | null> {
const price = await knex('stripe_prices')
.where({ tenant, stripe_price_id: priceId })
.first<{ metadata?: Record<string, unknown> | null; stripe_product_id?: string | null }>();
const priceRaw = price?.metadata?.[WORKFLOW_STEP_LIMIT_METADATA_KEY];
const priceValue = parseLimitMetadata(priceRaw, 'stripe_price_metadata');
if (priceValue) return priceValue;
if (isMetadataValuePresent(priceRaw)) {
logger.warn('[WorkflowStepQuotaService] Invalid workflow_step_limit metadata on Stripe price; falling back', {
tenant,
stripePriceId: priceId,
metadataKey: WORKFLOW_STEP_LIMIT_METADATA_KEY,
metadataValue: priceRaw,
});
}
if (!price?.stripe_product_id) return null;
const product = await knex('stripe_products')
.where({ tenant, stripe_product_id: price.stripe_product_id })
.first<{ metadata?: Record<string, unknown> | null }>();
const productRaw = product?.metadata?.[WORKFLOW_STEP_LIMIT_METADATA_KEY];
const productValue = parseLimitMetadata(productRaw, 'stripe_product_metadata');
if (productValue) return productValue;
if (isMetadataValuePresent(productRaw)) {
logger.warn('[WorkflowStepQuotaService] Invalid workflow_step_limit metadata on Stripe product; falling back to tier default', {
tenant,
stripePriceId: priceId,
stripeProductId: price.stripe_product_id,
metadataKey: WORKFLOW_STEP_LIMIT_METADATA_KEY,
metadataValue: productRaw,
});
}
return null;
}
export class WorkflowStepQuotaService {
async resolveQuotaSummary(knex: Knex, tenant: string, now = new Date()): Promise<WorkflowStepQuotaSummary> {
const tier = await getTenantTier(knex, tenant);
const defaultLimit = TIER_DEFAULT_LIMITS[tier];
let periodStart: string;
let periodEnd: string;
let periodSource: WorkflowStepQuotaPeriodSource = 'fallback_calendar';
let stripeSubscriptionId: string | null = null;
let effectiveLimit: number | null = defaultLimit;
let limitSource: WorkflowStepQuotaLimitSource = 'tier_default';
if (await hasStripeTables(knex)) {
const subscription = await findPreferredSubscription(knex, tenant, now);
if (subscription?.current_period_start && subscription.current_period_end) {
periodStart = toIso(subscription.current_period_start);
periodEnd = toIso(subscription.current_period_end);
periodSource = 'stripe_subscription';
stripeSubscriptionId = subscription.stripe_subscription_id;
const metadataLimit = await resolveMetadataLimit(knex, tenant, subscription.stripe_price_id);
if (metadataLimit) {
effectiveLimit = metadataLimit.effectiveLimit;
limitSource = metadataLimit.limitSource;
}
} else {
const fallback = currentUtcMonthPeriod(now);
periodStart = fallback.periodStart;
periodEnd = fallback.periodEnd;
logger.info('[WorkflowStepQuotaService] Using fallback calendar period (no valid active Stripe subscription period)', {
tenant,
periodStart,
periodEnd,
tier,
});
}
} else {
const fallback = currentUtcMonthPeriod(now);
periodStart = fallback.periodStart;
periodEnd = fallback.periodEnd;
logger.info('[WorkflowStepQuotaService] Using fallback calendar period (Stripe tables unavailable)', {
tenant,
periodStart,
periodEnd,
tier,
});
}
const usage = await knex<UsageRow>('workflow_step_usage_periods')
.where({ tenant, period_start: periodStart, period_end: periodEnd })
.first();
const usedCount = usage?.used_count ?? 0;
const remaining = effectiveLimit == null ? null : Math.max(effectiveLimit - usedCount, 0);
return {
tenant,
periodStart,
periodEnd,
periodSource,
stripeSubscriptionId,
effectiveLimit,
usedCount,
remaining,
tier,
limitSource,
};
}
async reserveStepStart(knex: Knex, tenant: string, now = new Date()): Promise<WorkflowStepQuotaReservationResult> {
return knex.transaction(async (trx) => {
const summary = await this.resolveQuotaSummary(trx, tenant, now);
const metadataJson = {
reservedAt: now.toISOString(),
};
await trx('workflow_step_usage_periods')
.insert({
tenant: summary.tenant,
period_start: summary.periodStart,
period_end: summary.periodEnd,
period_source: summary.periodSource,
stripe_subscription_id: summary.stripeSubscriptionId,
effective_limit: summary.effectiveLimit,
used_count: 0,
limit_source: summary.limitSource,
tier: summary.tier,
metadata_json: metadataJson,
created_at: trx.fn.now(),
updated_at: trx.fn.now(),
})
.onConflict(['tenant', 'period_start', 'period_end'])
.merge({
period_source: summary.periodSource,
stripe_subscription_id: summary.stripeSubscriptionId,
effective_limit: summary.effectiveLimit,
limit_source: summary.limitSource,
tier: summary.tier,
metadata_json: metadataJson,
updated_at: new Date().toISOString(),
});
const usage = await trx<UsageRow>('workflow_step_usage_periods')
.where({ tenant: summary.tenant, period_start: summary.periodStart, period_end: summary.periodEnd })
.forUpdate()
.first();
if (!usage) {
throw new Error('workflow_step_usage_periods row missing after upsert');
}
if (usage.effective_limit != null && usage.used_count >= usage.effective_limit) {
logger.warn('[WorkflowStepQuotaService] Workflow step quota exceeded at reservation', {
tenant: summary.tenant,
periodStart: summary.periodStart,
periodEnd: summary.periodEnd,
periodSource: summary.periodSource,
limitSource: summary.limitSource,
effectiveLimit: usage.effective_limit,
usedCount: usage.used_count,
});
return {
allowed: false,
summary: {
...summary,
usedCount: usage.used_count,
remaining: 0,
},
};
}
const [updated] = await trx<UsageRow>('workflow_step_usage_periods')
.where({ tenant: summary.tenant, period_start: summary.periodStart, period_end: summary.periodEnd })
.update({
used_count: trx.raw('used_count + 1'),
updated_at: trx.fn.now(),
})
.returning('*');
const usedCountAfter = updated.used_count;
logger.debug('[WorkflowStepQuotaService] Reserved workflow step quota', {
tenant: summary.tenant,
periodStart: summary.periodStart,
periodEnd: summary.periodEnd,
periodSource: summary.periodSource,
limitSource: summary.limitSource,
effectiveLimit: updated.effective_limit,
usedCountAfter,
});
return {
allowed: true,
usedCountAfter,
summary: {
...summary,
usedCount: usedCountAfter,
remaining: updated.effective_limit == null ? null : Math.max(updated.effective_limit - usedCountAfter, 0),
},
};
});
}
async reconcileUsagePeriod(
knex: Knex,
tenant: string,
periodStart: string,
periodEnd: string
): Promise<WorkflowStepQuotaReconciliation> {
const usage = await knex<UsageRow>('workflow_step_usage_periods')
.where({
tenant,
period_start: periodStart,
period_end: periodEnd,
})
.first();
const ledgerRow = await knex('workflow_run_steps as s')
.join('workflow_runs as r', 'r.run_id', 's.run_id')
.where('r.tenant', tenant)
.andWhere('s.started_at', '>=', periodStart)
.andWhere('s.started_at', '<', periodEnd)
.count<{ count: string }>('s.step_id as count')
.first();
const counterUsedCount = usage?.used_count ?? 0;
const ledgerStepCount = Number(ledgerRow?.count ?? 0);
const drift = counterUsedCount - ledgerStepCount;
return {
tenant,
periodStart,
periodEnd,
counterUsedCount,
ledgerStepCount,
drift,
};
}
}
export const workflowStepQuotaService = new WorkflowStepQuotaService();