PSA/ee/server/migrations/20260529130000_distribute_workflow_v2_tables.cjs
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

350 lines
15 KiB
JavaScript

// Migration B (DISTRIBUTE) of the Workflow Runtime V2 Citus colocation work.
// Runs AFTER Deploy 1 (code writes `tenant` only). Distributes the v2 tables into
// colocation group 41 (the uuid group of the v1 workflow tables) on `tenant`.
//
// Per-table sequence: drop FKs/uniques/PK -> recreate PK as (tenant, <id>) ->
// create_distributed_table(..., 'tenant', colocate_with => 'workflow_tasks')
// -> truncate_local_data_after_distributing_table (Citus leaves the original rows
// as LOCAL coordinator data after distributing a non-empty table, and that
// leftover data blocks the FK/constraint re-adds) -> re-add tenant-scoped uniques
// and FKs. `tenant_id` columns remain (vestigial) until the cleanup migration C.
//
// MUST be validated on a Citus staging clone before production: the exact set of
// pre-existing constraints/indexes varies, and this migration drops/recreates
// them defensively. See .ai/workflow-v2-citus-colocation-plan.md.
exports.config = { transaction: false };
const UUID_REGEX =
"'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$'";
// Colocate the v2 tables with an existing distributed v1 workflow table in the
// uuid group 41. workflow_tasks is present and distributed on tenant in group 41.
const COLOCATE_WITH = 'workflow_tasks';
// Single natural-id PK column per table (tenant is prepended).
const PK_ID = {
workflow_definitions: 'workflow_id',
workflow_runs: 'run_id',
workflow_definition_versions: 'version_id',
workflow_run_steps: 'step_id',
workflow_run_waits: 'wait_id',
workflow_run_snapshots: 'snapshot_id',
workflow_action_invocations: 'invocation_id',
workflow_run_logs: 'log_id',
workflow_runtime_events: 'event_id',
tenant_workflow_schedule: 'id',
};
// Distribute parents before children so FK targets exist + are colocated first.
const DISTRIBUTE_ORDER = [
'workflow_definitions',
'workflow_runs',
'workflow_definition_versions',
'workflow_run_steps',
'workflow_run_waits',
'workflow_run_snapshots',
'workflow_action_invocations',
'workflow_run_logs',
'workflow_runtime_events',
'tenant_workflow_schedule',
];
const RUN_CHILDREN = [
'workflow_run_steps',
'workflow_run_waits',
'workflow_run_snapshots',
'workflow_action_invocations',
'workflow_run_logs',
];
// Tenant-scoped uniques to re-add after distribution (tenant-prefixed).
const UNIQUES = [
{ table: 'workflow_definition_versions', name: 'workflow_definition_versions_tenant_workflow_version_unique', cols: ['tenant', 'workflow_id', 'version'] },
{ table: 'workflow_action_invocations', name: 'workflow_action_invocations_tenant_idempotency_unique', cols: ['tenant', 'action_id', 'action_version', 'idempotency_key'] },
{ table: 'tenant_workflow_schedule', name: 'tenant_workflow_schedule_tenant_workflow_unique', cols: ['tenant', 'workflow_id'] },
];
// Tenant-scoped FKs to re-add (all ON DELETE CASCADE, matching the originals).
const FKS = [
{ name: 'workflow_runs_tenant_workflow_fk', table: 'workflow_runs', cols: ['tenant', 'workflow_id'], ref: 'workflow_definitions', refCols: ['tenant', 'workflow_id'] },
{ name: 'workflow_definition_versions_tenant_workflow_fk', table: 'workflow_definition_versions', cols: ['tenant', 'workflow_id'], ref: 'workflow_definitions', refCols: ['tenant', 'workflow_id'] },
{ name: 'workflow_run_steps_tenant_run_fk', table: 'workflow_run_steps', cols: ['tenant', 'run_id'], ref: 'workflow_runs', refCols: ['tenant', 'run_id'] },
{ name: 'workflow_run_waits_tenant_run_fk', table: 'workflow_run_waits', cols: ['tenant', 'run_id'], ref: 'workflow_runs', refCols: ['tenant', 'run_id'] },
{ name: 'workflow_run_snapshots_tenant_run_fk', table: 'workflow_run_snapshots', cols: ['tenant', 'run_id'], ref: 'workflow_runs', refCols: ['tenant', 'run_id'] },
{ name: 'workflow_action_invocations_tenant_run_fk', table: 'workflow_action_invocations', cols: ['tenant', 'run_id'], ref: 'workflow_runs', refCols: ['tenant', 'run_id'] },
{ name: 'workflow_run_logs_tenant_run_fk', table: 'workflow_run_logs', cols: ['tenant', 'run_id'], ref: 'workflow_runs', refCols: ['tenant', 'run_id'] },
];
const isCitusEnabled = async (knex) => {
const r = await knex.raw("SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'citus') AS enabled");
return Boolean(r.rows?.[0]?.enabled);
};
const ensureSequentialMode = async (knex) => {
await knex.raw("SET citus.multi_shard_modify_mode TO 'sequential'");
};
const isDistributed = async (knex, table) => {
const r = await knex.raw(
`SELECT EXISTS (SELECT 1 FROM pg_dist_partition WHERE logicalrelid = ?::regclass) AS d`,
[table]
);
return Boolean(r.rows?.[0]?.d);
};
const dropForeignKeys = async (knex, table) => {
const r = await knex.raw(
`SELECT conname FROM pg_constraint WHERE conrelid = ?::regclass AND contype = 'f'`,
[table]
);
for (const row of r.rows) {
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT IF EXISTS ?? CASCADE', [table, row.conname]);
}
};
const dropUniqueConstraints = async (knex, table) => {
const r = await knex.raw(
`SELECT conname FROM pg_constraint WHERE conrelid = ?::regclass AND contype = 'u'`,
[table]
);
for (const row of r.rows) {
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT IF EXISTS ?? CASCADE', [table, row.conname]);
}
};
// Drop UNIQUE indexes that are not backed by a constraint (e.g. partial unique
// indexes). Non-unique indexes are left alone — Citus only requires the
// distribution column in UNIQUE/PK/exclusion constraints.
const dropUniqueIndexes = async (knex, table) => {
const r = await knex.raw(
`SELECT i.relname AS idxname
FROM pg_index x
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_class t ON t.oid = x.indrelid
WHERE t.relname = ? AND x.indisunique AND NOT x.indisprimary
AND NOT EXISTS (SELECT 1 FROM pg_constraint c WHERE c.conindid = x.indexrelid)`,
[table]
);
for (const row of r.rows) {
await knex.raw('DROP INDEX IF EXISTS ??', [row.idxname]);
}
};
// Drop non-internal triggers — Citus rejects create_distributed_table on a table
// with triggers unless citus.enable_unsafe_triggers is set. The v2 tables have
// none (updated_at is set in app code), so this is a defensive no-op.
const dropTriggers = async (knex, table) => {
const r = await knex.raw(
`SELECT tgname FROM pg_trigger WHERE tgrelid = ?::regclass AND NOT tgisinternal`,
[table]
);
for (const row of r.rows) {
await knex.raw('DROP TRIGGER IF EXISTS ?? ON ??', [row.tgname, table]);
}
};
const getPrimaryKey = async (knex, table) => {
const r = await knex.raw(
`SELECT c.conname AS constraint_name
FROM pg_constraint c
WHERE c.conrelid = ?::regclass AND c.contype = 'p'`,
[table]
);
return r.rows?.[0]?.constraint_name ?? null;
};
const recreateTenantPrimaryKey = async (knex, table) => {
const existing = await getPrimaryKey(knex, table);
if (existing) {
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT ?? CASCADE', [table, existing]);
}
await knex.raw('ALTER TABLE ?? ADD PRIMARY KEY (tenant, ??)', [table, PK_ID[table]]);
};
// Fresh Citus chains never distributed the v1 workflow tables this migration
// colocates with. The system catalogs (tenant-less) become reference tables,
// but converting them would drag workflow_tasks — which FKs distributed
// users — into the conversion, so its FKs are dropped and re-added around
// it. isReference (not just pg_dist_partition presence) matters: the first
// conversion auto-adds connected locals as citus-local tables, which also
// appear in pg_dist_partition. No-op on prod, where workflow_tasks is
// already distributed.
const ensureColocationTargetDistributed = async (knex) => {
if (await isDistributed(knex, COLOCATE_WITH)) return;
const isReference = async (table) => {
const r = await knex.raw(
`SELECT 1 FROM pg_dist_partition WHERE logicalrelid = ?::regclass AND partmethod = 'n' AND repmodel = 't'`,
[table]
);
return r.rows.length > 0;
};
const { rows: wtFks } = await knex.raw(
`SELECT conname, pg_get_constraintdef(oid) AS def
FROM pg_constraint
WHERE conrelid = 'workflow_tasks'::regclass AND contype = 'f'
AND confrelid IN ('system_workflow_task_definitions'::regclass,
'workflow_task_definitions'::regclass)`
);
for (const fk of wtFks) {
await knex.raw('ALTER TABLE workflow_tasks DROP CONSTRAINT IF EXISTS ??', [fk.conname]);
}
for (const ref of ['system_workflow_form_definitions', 'system_workflow_task_definitions']) {
if (!(await isReference(ref))) {
await knex.raw('SELECT create_reference_table(?::regclass)', [ref]);
}
}
if (!(await isDistributed(knex, 'workflow_task_definitions'))) {
await knex.raw(`SELECT create_distributed_table('workflow_task_definitions', 'tenant')`);
}
await knex.raw(`SELECT create_distributed_table('workflow_tasks', 'tenant')`);
for (const fk of wtFks) {
await knex.raw(`ALTER TABLE workflow_tasks ADD CONSTRAINT "${fk.conname}" ${fk.def}`);
}
};
exports.up = async function up(knex) {
if (!(await isCitusEnabled(knex))) {
console.log('Citus not enabled, skipping workflow v2 distribution');
return;
}
await ensureSequentialMode(knex);
await ensureColocationTargetDistributed(knex);
const present = [];
for (const table of DISTRIBUTE_ORDER) {
if (await knex.schema.hasTable(table)) present.push(table);
}
// Only NOT-yet-distributed tables need prep + distribution. Already-distributed
// tables (e.g. from a prior partial run that failed at the constraint step) skip
// straight to the idempotent constraint re-add: re-running their backfill would
// fail with "modifying the partition value of rows is not allowed", because
// `tenant` is now the distribution column.
const toPrep = [];
for (const table of present) {
if (!(await isDistributed(knex, table))) toPrep.push(table);
}
// 1. Backfill any NULL tenant (rollover rows written by old code), then enforce
// NOT NULL (required: the distribution column must be NOT NULL).
for (const table of toPrep) {
if (await knex.schema.hasColumn(table, 'tenant_id')) {
await knex.raw(
`UPDATE ?? SET tenant = tenant_id::uuid WHERE tenant IS NULL AND tenant_id ~ ${UUID_REGEX}`,
[table]
);
}
}
for (const table of RUN_CHILDREN) {
if (!toPrep.includes(table)) continue;
await knex.raw(
`UPDATE ?? AS c SET tenant = r.tenant
FROM workflow_runs r
WHERE c.run_id = r.run_id AND c.tenant IS NULL AND r.tenant IS NOT NULL`,
[table]
);
}
if (toPrep.includes('workflow_definition_versions')) {
await knex.raw(
`UPDATE workflow_definition_versions AS v SET tenant = d.tenant
FROM workflow_definitions d
WHERE v.workflow_id = d.workflow_id AND v.tenant IS NULL AND d.tenant IS NOT NULL`
);
}
for (const table of toPrep) {
const nulls = await knex(table).whereNull('tenant').count({ c: '*' }).first();
if (Number(nulls?.c ?? 0) > 0) {
throw new Error(`Cannot distribute ${table}: ${nulls.c} rows still have NULL tenant`);
}
await knex.raw('ALTER TABLE ?? ALTER COLUMN tenant SET NOT NULL', [table]);
}
// 2. Drop all FKs (so PKs they reference can be rebuilt).
for (const table of toPrep) {
await dropForeignKeys(knex, table);
}
// 3. Per table: drop triggers + uniques + unique indexes, recreate PK as (tenant, <id>).
for (const table of toPrep) {
await dropTriggers(knex, table);
await dropUniqueConstraints(knex, table);
await dropUniqueIndexes(knex, table);
await recreateTenantPrimaryKey(knex, table);
}
// 4. Distribute, then immediately truncate the leftover LOCAL coordinator data —
// it otherwise blocks the constraint re-adds below. Cast to ::regclass so the
// Citus functions resolve unambiguously.
if (toPrep.length) {
console.log(`Colocating workflow v2 tables with ${COLOCATE_WITH}`);
}
for (const table of toPrep) {
await knex.raw(`SELECT create_distributed_table(?::regclass, 'tenant', colocate_with => ?)`, [table, COLOCATE_WITH]);
await knex.raw('SELECT truncate_local_data_after_distributing_table(?::regclass)', [table]);
}
// 5. Re-add tenant-scoped uniques and FKs (targets are now distributed+colocated).
// DROP IF EXISTS first so a re-run after a partial failure is idempotent.
for (const u of UNIQUES) {
if (!present.includes(u.table)) continue;
const cols = u.cols.map(() => '??').join(', ');
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT IF EXISTS ??', [u.table, u.name]);
await knex.raw(`ALTER TABLE ?? ADD CONSTRAINT ?? UNIQUE (${cols})`, [u.table, u.name, ...u.cols]);
}
// Restore workflow_definitions' per-tenant unique key (partial → unique index;
// dropped above because the old one was on tenant_id, not the dist column).
if (present.includes('workflow_definitions')) {
await knex.raw('DROP INDEX IF EXISTS workflow_definitions_tenant_key_unique');
await knex.raw(
'CREATE UNIQUE INDEX workflow_definitions_tenant_key_unique ON workflow_definitions (tenant, key) WHERE key IS NOT NULL'
);
}
// Add FKs as NOT VALID: there are legacy rows (e.g. ~388 pre-split cross-tenant
// workflow_runs) whose (tenant, workflow_id) has no same-tenant definition. The
// constraint is enforced for all NEW writes (the app always scopes a run to its
// definition's tenant); the legacy rows are grandfathered. Clean them up and run
// `ALTER TABLE .. VALIDATE CONSTRAINT ..` later as a separate task.
for (const fk of FKS) {
if (!present.includes(fk.table) || !present.includes(fk.ref)) continue;
const cols = fk.cols.map(() => '??').join(', ');
const refCols = fk.refCols.map(() => '??').join(', ');
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT IF EXISTS ??', [fk.table, fk.name]);
await knex.raw(
`ALTER TABLE ?? ADD CONSTRAINT ?? FOREIGN KEY (${cols}) REFERENCES ?? (${refCols}) ON DELETE CASCADE NOT VALID`,
[fk.table, fk.name, ...fk.cols, fk.ref, ...fk.refCols]
);
}
// 6. Verify: every targeted table is distributed on tenant and shares one
// colocationid. (Fetch all rows and filter in JS — array bindings in raw
// SQL are unreliable in knex.)
const check = await knex.raw(
`SELECT logicalrelid::text AS tbl,
column_to_column_name(logicalrelid, partkey) AS dist_col,
colocationid
FROM pg_dist_partition`
);
const rows = (check.rows || []).filter((r) => present.includes(r.tbl));
const badCol = rows.filter((r) => r.dist_col !== 'tenant');
if (badCol.length) {
throw new Error(`Tables not distributed on tenant: ${badCol.map((r) => r.tbl).join(', ')}`);
}
const missing = present.filter((t) => !rows.some((r) => r.tbl === t));
if (missing.length) {
throw new Error(`Tables not distributed at all: ${missing.join(', ')}`);
}
const groups = new Set(rows.map((r) => r.colocationid));
if (groups.size !== 1) {
throw new Error(`Workflow v2 tables landed in multiple colocation groups: ${[...groups].join(', ')}`);
}
console.log(`Workflow v2 tables distributed into colocation group ${[...groups][0]}`);
};
exports.down = async function down() {
// Deliberately no-op: create_distributed_table cannot be safely reversed once
// writes resume. Roll forward instead.
};