PSA/shared/workflow/persistence/workflowEntityLinkModel.ts
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

273 lines
7.5 KiB
TypeScript

import { Knex } from 'knex';
export type WorkflowEntityRef = {
type: string;
id: string;
};
export type WorkflowEntityLinkRecord = {
tenant: string;
link_id: string;
namespace: string;
left_type: string;
left_id: string;
right_type: string;
right_id: string;
relation: string;
attributes: Record<string, unknown>;
created_by_run_id?: string | null;
created_at: string;
updated_at: string;
};
export type WorkflowEntityLinkMatch = {
link_id: string;
type: string;
id: string;
relation: string;
attributes: Record<string, unknown>;
};
export type WorkflowEntityLinkListOptions = {
left_type?: string;
right_type?: string;
relation?: string;
limit?: number;
cursor?: number | string | null;
};
export type WorkflowEntityLinkListResult = {
items: WorkflowEntityLinkRecord[];
next_cursor: number | null;
};
export type WorkflowEntityLinkNamespace = {
namespace: string;
link_count: number;
};
const TABLE = 'workflow_entity_links';
const UNIQUE_COLUMNS = [
'tenant',
'namespace',
'left_type',
'left_id',
'right_type',
'right_id',
'relation',
];
const DEFAULT_LIMIT = 100;
const MAX_LIMIT = 200;
const nowIso = () => new Date().toISOString();
const normalizeLimit = (limit?: number): number => {
if (!Number.isFinite(limit)) return DEFAULT_LIMIT;
return Math.max(1, Math.min(MAX_LIMIT, Math.trunc(limit as number)));
};
const normalizeCursor = (cursor?: number | string | null): number => {
const parsed = typeof cursor === 'string' ? Number.parseInt(cursor, 10) : cursor;
return Number.isFinite(parsed) && (parsed as number) > 0 ? Math.trunc(parsed as number) : 0;
};
const targetFromLeft = (row: WorkflowEntityLinkRecord): WorkflowEntityLinkMatch => ({
link_id: row.link_id,
type: row.right_type,
id: row.right_id,
relation: row.relation,
attributes: row.attributes ?? {},
});
const targetFromRight = (row: WorkflowEntityLinkRecord): WorkflowEntityLinkMatch => ({
link_id: row.link_id,
type: row.left_type,
id: row.left_id,
relation: row.relation,
attributes: row.attributes ?? {},
});
const WorkflowEntityLinkModel = {
upsert: async (
knex: Knex,
tenant: string,
input: {
namespace: string;
left: WorkflowEntityRef;
right: WorkflowEntityRef;
relation?: string;
attributes?: Record<string, unknown>;
created_by_run_id?: string | null;
}
): Promise<{ record: WorkflowEntityLinkRecord; created: boolean }> => {
const timestamp = nowIso();
const insertData = {
tenant,
namespace: input.namespace,
left_type: input.left.type,
left_id: input.left.id,
right_type: input.right.type,
right_id: input.right.id,
relation: input.relation ?? 'related',
attributes: input.attributes ?? {},
created_by_run_id: input.created_by_run_id ?? null,
created_at: timestamp,
updated_at: timestamp,
};
const inserted = await knex<WorkflowEntityLinkRecord>(TABLE)
.insert(insertData)
.onConflict(UNIQUE_COLUMNS)
.ignore()
.returning('*');
if (inserted[0]) {
return { record: inserted[0], created: true };
}
const [record] = await knex<WorkflowEntityLinkRecord>(TABLE)
.where({
tenant,
namespace: insertData.namespace,
left_type: insertData.left_type,
left_id: insertData.left_id,
right_type: insertData.right_type,
right_id: insertData.right_id,
relation: insertData.relation,
})
.update({
attributes: insertData.attributes,
updated_at: timestamp,
})
.returning('*');
return { record, created: false };
},
lookup: async (
knex: Knex,
tenant: string,
input: {
namespace: string;
from: WorkflowEntityRef;
direction?: 'forward' | 'reverse' | 'either';
relation?: string;
right_type?: string;
limit?: number;
}
): Promise<{ matches: WorkflowEntityLinkMatch[] }> => {
const direction = input.direction ?? 'forward';
const limit = normalizeLimit(input.limit);
const matches: WorkflowEntityLinkMatch[] = [];
if (direction === 'forward' || direction === 'either') {
const query = knex<WorkflowEntityLinkRecord>(TABLE)
.where({
tenant,
namespace: input.namespace,
left_type: input.from.type,
left_id: input.from.id,
})
.orderBy('created_at', 'asc')
.orderBy('link_id', 'asc')
.limit(limit);
if (input.relation) query.andWhere({ relation: input.relation });
if (input.right_type) query.andWhere({ right_type: input.right_type });
const rows = await query;
matches.push(...rows.map(targetFromLeft));
}
if (matches.length < limit && (direction === 'reverse' || direction === 'either')) {
const query = knex<WorkflowEntityLinkRecord>(TABLE)
.where({
tenant,
namespace: input.namespace,
right_type: input.from.type,
right_id: input.from.id,
})
.orderBy('created_at', 'asc')
.orderBy('link_id', 'asc')
.limit(limit - matches.length);
if (input.relation) query.andWhere({ relation: input.relation });
if (input.right_type) query.andWhere({ left_type: input.right_type });
const rows = await query;
matches.push(...rows.map(targetFromRight));
}
return { matches };
},
delete: async (
knex: Knex,
tenant: string,
input: {
namespace: string;
left?: WorkflowEntityRef;
right?: WorkflowEntityRef;
relation?: string;
}
): Promise<number> => {
if (!input.left && !input.right) {
throw new Error('WORKFLOW_ENTITY_LINK_DELETE_REQUIRES_LEFT_OR_RIGHT');
}
const query = knex<WorkflowEntityLinkRecord>(TABLE).where({
tenant,
namespace: input.namespace,
});
if (input.left) {
query.andWhere({ left_type: input.left.type, left_id: input.left.id });
}
if (input.right) {
query.andWhere({ right_type: input.right.type, right_id: input.right.id });
}
if (input.relation) {
query.andWhere({ relation: input.relation });
}
return query.delete();
},
list: async (
knex: Knex,
tenant: string,
namespace: string,
options: WorkflowEntityLinkListOptions = {}
): Promise<WorkflowEntityLinkListResult> => {
const limit = normalizeLimit(options.limit);
const cursor = normalizeCursor(options.cursor);
const query = knex<WorkflowEntityLinkRecord>(TABLE).where({ tenant, namespace });
if (options.left_type) query.andWhere({ left_type: options.left_type });
if (options.right_type) query.andWhere({ right_type: options.right_type });
if (options.relation) query.andWhere({ relation: options.relation });
const rows = await query
.orderBy('created_at', 'asc')
.orderBy('link_id', 'asc')
.limit(limit + 1)
.offset(cursor);
const hasMore = rows.length > limit;
return {
items: rows.slice(0, limit),
next_cursor: hasMore ? cursor + limit : null,
};
},
listNamespaces: async (knex: Knex, tenant: string): Promise<WorkflowEntityLinkNamespace[]> => {
const rows = (await knex<WorkflowEntityLinkRecord>(TABLE)
.where({ tenant })
.select('namespace')
.count<{ link_count: string | number }[]>({ link_count: '*' })
.groupBy('namespace')
.orderBy('namespace', 'asc')) as Array<{ namespace: string; link_count: string | number }>;
return rows.map((row) => ({
namespace: row.namespace,
link_count: Number(row.link_count),
}));
},
};
export default WorkflowEntityLinkModel;