Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
267 lines
9.7 KiB
JavaScript
267 lines
9.7 KiB
JavaScript
// `comments` was distributed earlier without the required
|
|
// truncate_local_data_after_distributing_table() follow-up, stranding
|
|
// pre-distribution NULL-thread_id rows in the coordinator parent heap. They are
|
|
// invisible to Citus-routed DML but break ALTER ... SET NOT NULL (core PG DDL
|
|
// scans the parent heap). That cleanup is refused while ANY non-distributed
|
|
// table has an FK to comments. Several do (email_reply_tokens, vectors,
|
|
// ticket_bundle_mirrors, ...). This migration distributes every such local
|
|
// referrer co-located with comments, then runs the official cleanup.
|
|
// Rule: distributing a possibly-non-empty table MUST be followed by
|
|
// truncate_local_data_after_distributing_table().
|
|
|
|
exports.up = async function up(knex) {
|
|
const citus = await knex.raw(
|
|
"SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'citus') AS enabled"
|
|
);
|
|
if (!citus.rows?.[0]?.enabled) {
|
|
return;
|
|
}
|
|
|
|
// Colocation anchor must be distributed; nothing to do otherwise.
|
|
if (!(await isDistributed(knex, 'comments'))) {
|
|
return;
|
|
}
|
|
|
|
const referrers = await knex.raw(
|
|
`SELECT DISTINCT con.conrelid::regclass::text AS tbl
|
|
FROM pg_constraint con
|
|
WHERE con.contype = 'f'
|
|
AND con.confrelid IN ('comments'::regclass, 'project_task_comments'::regclass)
|
|
AND con.conrelid <> con.confrelid
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM citus_tables ct WHERE ct.table_name = con.conrelid
|
|
)
|
|
ORDER BY 1`
|
|
);
|
|
|
|
const distributedNow = [];
|
|
for (const { tbl } of referrers.rows ?? []) {
|
|
await makeDistributable(knex, tbl);
|
|
// Joining the colocation group pulls the table into an FK graph that may
|
|
// reach a reference table; Citus then forbids the default parallel DDL of
|
|
// create_distributed_table. Run it in its own transaction in sequential
|
|
// mode (SET LOCAL auto-resets on commit) with no prior reference access.
|
|
await knex.transaction(async (trx) => {
|
|
await trx.raw("SET LOCAL citus.multi_shard_modify_mode TO 'sequential'");
|
|
await trx.raw(
|
|
"SELECT create_distributed_table(?, 'tenant', colocate_with => 'comments')",
|
|
[tbl]
|
|
);
|
|
});
|
|
distributedNow.push(tbl);
|
|
}
|
|
|
|
for (const tbl of [...distributedNow, 'email_reply_tokens', 'comments', 'project_task_comments']) {
|
|
await truncateLocalDataIfNeeded(knex, tbl);
|
|
}
|
|
};
|
|
|
|
// Prepare a local table for create_distributed_table: it must have the
|
|
// distribution column, no unique/exclude constraint or unique index that omits
|
|
// it, and no rows whose composite FKs point at now-deleted distributed parents
|
|
// (those abort the shard copy; the referenced entity is gone so the row is
|
|
// dead).
|
|
async function makeDistributable(knex, table) {
|
|
const hasTenant = await knex.raw(
|
|
`SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_name = ? AND column_name = 'tenant'
|
|
) AS ok`,
|
|
[table]
|
|
);
|
|
if (!hasTenant.rows?.[0]?.ok) {
|
|
throw new Error(
|
|
`Cannot distribute ${table}: it FKs comments/project_task_comments but ` +
|
|
`has no tenant column. Handle it explicitly before this migration.`
|
|
);
|
|
}
|
|
|
|
const blockingUniques = await knex.raw(
|
|
`SELECT con.conname AS conname
|
|
FROM pg_constraint con
|
|
WHERE con.conrelid = ?::regclass
|
|
AND con.contype IN ('u', 'x')
|
|
AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM unnest(con.conkey) AS k(attnum)
|
|
JOIN pg_attribute a
|
|
ON a.attrelid = con.conrelid AND a.attnum = k.attnum
|
|
WHERE a.attname = 'tenant'
|
|
)`,
|
|
[table]
|
|
);
|
|
for (const row of blockingUniques.rows ?? []) {
|
|
await knex.raw('ALTER TABLE ?? DROP CONSTRAINT ??', [table, row.conname]);
|
|
}
|
|
|
|
const blockingIndexes = await knex.raw(
|
|
`SELECT i.relname AS indexname
|
|
FROM pg_index x
|
|
JOIN pg_class i ON i.oid = x.indexrelid
|
|
JOIN pg_class t ON t.oid = x.indrelid
|
|
WHERE t.relname = ?
|
|
AND x.indisunique
|
|
AND NOT x.indisprimary
|
|
AND NOT EXISTS (SELECT 1 FROM pg_constraint c WHERE c.conindid = x.indexrelid)
|
|
AND NOT EXISTS (
|
|
SELECT 1
|
|
FROM unnest(x.indkey) AS k(attnum)
|
|
JOIN pg_attribute a
|
|
ON a.attrelid = x.indrelid AND a.attnum = k.attnum
|
|
WHERE a.attname = 'tenant'
|
|
)`,
|
|
[table]
|
|
);
|
|
for (const row of blockingIndexes.rows ?? []) {
|
|
await knex.raw('DROP INDEX IF EXISTS ??', [row.indexname]);
|
|
}
|
|
|
|
await purgeOrphanRows(knex, table);
|
|
}
|
|
|
|
// Delete rows whose FK columns reference a parent row that no longer exists.
|
|
// Citus forbids local<->distributed joins, so every step touches one table
|
|
// kind only: read local, check existence on the (distributed) parent, delete
|
|
// local by primary key.
|
|
async function purgeOrphanRows(knex, table) {
|
|
const pkRes = await knex.raw(
|
|
`SELECT a.attname
|
|
FROM pg_constraint c
|
|
JOIN unnest(c.conkey) WITH ORDINALITY AS k(attnum, ord) ON true
|
|
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = k.attnum
|
|
WHERE c.conrelid = ?::regclass AND c.contype = 'p'
|
|
ORDER BY k.ord`,
|
|
[table]
|
|
);
|
|
const pkCols = (pkRes.rows ?? []).map((r) => r.attname);
|
|
if (pkCols.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const fkRes = await knex.raw(
|
|
`SELECT c.conname,
|
|
c.confrelid::regclass::text AS parent,
|
|
(SELECT string_agg(a.attname, ',' ORDER BY k.ord)
|
|
FROM unnest(c.conkey) WITH ORDINALITY k(attnum, ord)
|
|
JOIN pg_attribute a ON a.attrelid = c.conrelid AND a.attnum = k.attnum
|
|
) AS local_cols,
|
|
(SELECT string_agg(a.attname, ',' ORDER BY k.ord)
|
|
FROM unnest(c.confkey) WITH ORDINALITY k(attnum, ord)
|
|
JOIN pg_attribute a ON a.attrelid = c.confrelid AND a.attnum = k.attnum
|
|
) AS parent_cols
|
|
FROM pg_constraint c
|
|
WHERE c.conrelid = ?::regclass AND c.contype = 'f'`,
|
|
[table]
|
|
);
|
|
// string_agg keeps these as scalar text regardless of driver array parsing.
|
|
const fks = (fkRes.rows ?? []).map((r) => ({
|
|
conname: r.conname,
|
|
parent: r.parent,
|
|
local_cols: r.local_cols.split(','),
|
|
parent_cols: r.parent_cols.split(','),
|
|
}));
|
|
if (fks.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const fkCols = [...new Set(fks.flatMap((f) => f.local_cols))];
|
|
const selectCols = [...new Set([...pkCols, ...fkCols])];
|
|
const { rows } = await knex.raw(
|
|
`SELECT ${selectCols.map((_, i) => `??::text AS c${i}`).join(', ')} FROM ??`,
|
|
[...selectCols, table]
|
|
);
|
|
if (rows.length === 0) {
|
|
return;
|
|
}
|
|
const colIndex = new Map(selectCols.map((c, i) => [c, `c${i}`]));
|
|
const val = (row, col) => row[colIndex.get(col)];
|
|
|
|
// Existence set per FK, built from the parent (single-table, Citus-routable).
|
|
// Keep `tenant` as uuid so shard pruning still applies; compare other cols
|
|
// as text so we don't need each parent column's exact type.
|
|
const okSets = [];
|
|
for (const fk of fks) {
|
|
const pairs = [];
|
|
for (const row of rows) {
|
|
if (fk.local_cols.some((lc) => val(row, lc) == null)) continue;
|
|
pairs.push(fk.local_cols.map((lc) => val(row, lc)));
|
|
}
|
|
const uniq = [...new Map(pairs.map((p) => [p.join('|'), p])).values()];
|
|
if (uniq.length === 0) {
|
|
okSets.push(new Set());
|
|
continue;
|
|
}
|
|
const tuple = fk.parent_cols
|
|
.map((pc) => (pc === 'tenant' ? 'tenant' : `??::text`))
|
|
.join(', ');
|
|
const ph = '(' + fk.parent_cols
|
|
.map((pc) => (pc === 'tenant' ? '?::uuid' : '?::text'))
|
|
.join(', ') + ')';
|
|
const idents = fk.parent_cols.filter((pc) => pc !== 'tenant');
|
|
const sql =
|
|
`SELECT ${fk.parent_cols.map((_, i) => `??::text AS k${i}`).join(', ')} ` +
|
|
`FROM ?? WHERE (${tuple}) IN (${uniq.map(() => ph).join(', ')})`;
|
|
const binds = [
|
|
...fk.parent_cols, // SELECT ??::text AS k{i}
|
|
fk.parent, // FROM ??
|
|
...idents, // tuple ??::text (non-tenant parent cols)
|
|
...uniq.flat(), // IN (...) values
|
|
];
|
|
const res = await knex.raw(sql, binds);
|
|
okSets.push(
|
|
new Set((res.rows ?? []).map((r) => fk.parent_cols.map((_, i) => r[`k${i}`]).join('|')))
|
|
);
|
|
}
|
|
|
|
const orphans = rows.filter((row) =>
|
|
fks.some((fk, i) => {
|
|
if (fk.local_cols.some((lc) => val(row, lc) == null)) return false;
|
|
const key = fk.local_cols.map((lc) => val(row, lc)).join('|');
|
|
return !okSets[i].has(key);
|
|
})
|
|
);
|
|
if (orphans.length === 0) {
|
|
return;
|
|
}
|
|
|
|
// Local-only delete by primary key (no distributed table referenced).
|
|
const pkTuple = pkCols.map(() => '??::text').join(', ');
|
|
const rowPh = '(' + pkCols.map(() => '?').join(', ') + ')';
|
|
await knex.raw(
|
|
`DELETE FROM ?? WHERE (${pkTuple}) IN (${orphans.map(() => rowPh).join(', ')})`,
|
|
[table, ...pkCols, ...orphans.flatMap((o) => pkCols.map((c) => val(o, c)))]
|
|
);
|
|
}
|
|
|
|
async function isDistributed(knex, table) {
|
|
const res = await knex.raw(
|
|
`SELECT EXISTS (
|
|
SELECT 1 FROM pg_dist_partition WHERE logicalrelid = ?::regclass
|
|
) AS is_distributed`,
|
|
[table]
|
|
);
|
|
return Boolean(res.rows?.[0]?.is_distributed);
|
|
}
|
|
|
|
async function truncateLocalDataIfNeeded(knex, table) {
|
|
if (!(await isDistributed(knex, table))) {
|
|
return;
|
|
}
|
|
// 0-byte parent heap = cleanly distributed; nothing to do (also the no-op
|
|
// guard for re-runs and fresh installs).
|
|
const heap = await knex.raw('SELECT pg_relation_size(?::regclass) AS bytes', [table]);
|
|
if (Number(heap.rows?.[0]?.bytes ?? 0) === 0) {
|
|
return;
|
|
}
|
|
await knex.raw('SELECT truncate_local_data_after_distributing_table(?::regclass)', [table]);
|
|
}
|
|
|
|
// Forward-only schema hygiene; undistribute_table() is too heavy/risky to
|
|
// auto-reverse and dropped non-tenant uniques are superseded by tenant-scoped
|
|
// keys.
|
|
exports.down = async function down(_knex) {};
|
|
|
|
// create_distributed_table() / truncate_local_data_after_distributing_table()
|
|
// must run outside a transaction block.
|
|
exports.config = { transaction: false };
|