Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

966 lines
40 KiB
JavaScript

const { randomUUID } = require('node:crypto');
const { buildBasePayloadForEvent, pickOne, pickUser } = require('./notification-fixture.cjs');
function getApiKey() {
return process.env.WORKFLOW_HARNESS_API_KEY || process.env.ALGA_API_KEY || '';
}
function ticketIdExprForEvent(eventName) {
if (eventName === 'TICKET_MERGED') return 'payload.sourceTicketId';
if (eventName === 'TICKET_SPLIT') return 'payload.originalTicketId';
return 'payload.ticketId';
}
function projectIdExprForEvent(eventName) {
if (String(eventName || '').startsWith('PROJECT_')) return 'payload.projectId';
if (eventName === 'TASK_COMMENT_ADDED' || eventName === 'TASK_COMMENT_UPDATED') return 'payload.projectId';
if (String(eventName || '').startsWith('INVOICE_')) return 'payload.invoiceId';
if (String(eventName || '').startsWith('PAYMENT_')) return 'payload.paymentId';
if (String(eventName || '').startsWith('CONTRACT_')) return 'payload.contractId';
if (String(eventName || '').startsWith('COMPANY_')) return 'payload.companyId';
if (String(eventName || '').startsWith('APPOINTMENT_')) return 'payload.appointmentId';
if (String(eventName || '').startsWith('TECHNICIAN_')) return 'payload.appointmentId';
if (String(eventName || '').startsWith('TIME_ENTRY_')) return 'payload.timeEntryId';
if (String(eventName || '').startsWith('SCHEDULE_BLOCK_')) return 'payload.scheduleBlockId';
if (String(eventName || '').startsWith('SCHEDULE_ENTRY_')) return 'payload.entryId';
if (eventName === 'CAPACITY_THRESHOLD_REACHED') return 'payload.teamId';
if (String(eventName || '').startsWith('INTEGRATION_')) return 'payload.integrationId';
if (eventName === 'EMAIL_PROVIDER_CONNECTED') return 'payload.providerId';
return 'payload.projectId';
}
function ensureCallWorkflowInputMapping(callStep, { kind, eventName }) {
if (!callStep || typeof callStep !== 'object') throw new Error('ensureCallWorkflowInputMapping requires a call step object');
const inputMapping =
callStep.inputMapping && typeof callStep.inputMapping === 'object' && !Array.isArray(callStep.inputMapping) ? callStep.inputMapping : {};
// eslint-disable-next-line no-param-reassign
callStep.inputMapping = inputMapping;
if (kind === 'ticket_comment') {
const idExpr = ticketIdExprForEvent(eventName);
const match = /^payload\.(\w+)$/.exec(idExpr);
const field = match ? match[1] : 'ticketId';
inputMapping[field] = { $expr: idExpr };
return;
}
if (kind === 'project_task') {
const idExpr = projectIdExprForEvent(eventName);
const match = /^payload\.(\w+)$/.exec(idExpr);
const field = match ? match[1] : 'projectId';
inputMapping[field] = { $expr: idExpr };
return;
}
throw new Error(`Unknown callWorkflow kind: ${kind}`);
}
async function assertRunSucceeded(ctx, runRow) {
if (runRow.status === 'SUCCEEDED') return;
const steps = await ctx.getRunSteps(runRow.run_id);
throw new Error(`Expected run SUCCEEDED, got ${runRow.status}. Steps: ${JSON.stringify(ctx.summarizeSteps(steps))}`);
}
async function createProject(ctx, { tenantId, apiKey, clientId }) {
const projectName = `Fixture project ${randomUUID()}`;
const createRes = await ctx.http.request('/api/v1/projects', {
method: 'POST',
headers: { 'x-api-key': apiKey },
json: {
client_id: clientId,
project_name: projectName,
create_default_phase: true
}
});
const projectId = createRes.json?.data?.project_id;
if (!projectId) throw new Error('Project create response missing data.project_id');
return projectId;
}
async function cleanupProject(ctx, { tenantId, apiKey, projectId }) {
let projectDeleted = false;
try {
await ctx.http.request(`/api/v1/projects/${projectId}`, {
method: 'DELETE',
headers: { 'x-api-key': apiKey }
});
projectDeleted = true;
} catch {
// Fall back to DB cleanup if project deletion fails due to FK constraints.
}
if (projectDeleted) return;
const phaseIds = await ctx.db.query(`select phase_id from project_phases where tenant = $1 and project_id = $2`, [tenantId, projectId]);
const phaseIdList = phaseIds.map((r) => r.phase_id);
if (phaseIdList.length) {
const taskIds = await ctx.db.query(`select task_id from project_tasks where tenant = $1 and phase_id = any($2::uuid[])`, [
tenantId,
phaseIdList
]);
const taskIdList = taskIds.map((r) => r.task_id);
if (taskIdList.length) {
await ctx.dbWrite.query(`delete from task_checklist_items where tenant = $1 and task_id = any($2::uuid[])`, [tenantId, taskIdList]);
await ctx.dbWrite.query(`delete from project_tasks where tenant = $1 and task_id = any($2::uuid[])`, [tenantId, taskIdList]);
}
await ctx.dbWrite.query(`delete from project_phases where tenant = $1 and phase_id = any($2::uuid[])`, [tenantId, phaseIdList]);
}
await ctx.dbWrite.query(`delete from project_ticket_links where tenant = $1 and project_id = $2`, [tenantId, projectId]);
await ctx.dbWrite.query(`delete from project_status_mappings where tenant = $1 and project_id = $2`, [tenantId, projectId]);
await ctx.dbWrite.query(`delete from projects where tenant = $1 and project_id = $2`, [tenantId, projectId]);
}
async function createTicket(ctx, { tenantId, apiKey }) {
const client = await pickOne(ctx, {
label: 'a client',
sql: `select client_id from clients where tenant = $1 order by created_at asc limit 1`,
params: [tenantId]
});
const board = await pickOne(ctx, {
label: 'a ticket board',
sql: `select board_id from boards where tenant = $1 order by is_default desc, display_order asc limit 1`,
params: [tenantId]
});
const status = await pickOne(ctx, {
label: 'a ticket status',
sql: `select status_id from statuses where tenant = $1 and board_id = $2 and status_type = 'ticket' order by is_default desc, order_number asc limit 1`,
params: [tenantId, board.board_id]
});
const priority = await pickOne(ctx, {
label: 'a ticket priority',
sql: `select priority_id from priorities where tenant = $1 order by order_number asc limit 1`,
params: [tenantId]
});
const title = `Fixture ticket ${randomUUID()}`;
const createRes = await ctx.http.request('/api/v1/tickets', {
method: 'POST',
headers: { 'x-api-key': apiKey },
json: {
title,
client_id: client.client_id,
board_id: board.board_id,
status_id: status.status_id,
priority_id: priority.priority_id
}
});
const ticketId = createRes.json?.data?.ticket_id;
if (!ticketId) throw new Error('Ticket create response missing data.ticket_id');
return ticketId;
}
async function cleanupTicket(ctx, { tenantId, apiKey, ticketId }) {
try {
await ctx.http.request(`/api/v1/tickets/${ticketId}`, {
method: 'DELETE',
headers: { 'x-api-key': apiKey }
});
return;
} catch {
// Ticket deletion is blocked when comments reference the ticket; clean up those rows first.
}
await ctx.dbWrite.query(`delete from comments where tenant = $1 and ticket_id = $2`, [tenantId, ticketId]);
await ctx.dbWrite.query(`delete from tickets where tenant = $1 and ticket_id = $2`, [tenantId, ticketId]);
}
async function cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }) {
const noteLike = `%${marker}%`;
await ctx.dbWrite.query(`delete from comments where tenant = $1 and ticket_id = $2 and note like $3`, [tenantId, ticketId, noteLike]);
}
async function cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }) {
const titleLike = `%${marker}%`;
const taskIds = await ctx.db.query(
`
select t.task_id
from project_tasks t
join project_phases p on p.phase_id = t.phase_id and p.tenant = t.tenant
where p.tenant = $1
and p.project_id = $2
and t.task_name like $3
`,
[tenantId, projectId, titleLike]
);
const taskIdList = taskIds.map((r) => r.task_id);
if (!taskIdList.length) return;
await ctx.dbWrite.query(`delete from task_checklist_items where tenant = $1 and task_id = any($2::uuid[])`, [tenantId, taskIdList]);
await ctx.dbWrite.query(`delete from project_tasks where tenant = $1 and task_id = any($2::uuid[])`, [tenantId, taskIdList]);
}
async function getOrCreateTicketId(ctx, { tenantId, apiKey }) {
const rows = await ctx.db.query(`select ticket_id from tickets where tenant = $1 order by entered_at desc limit 1`, [tenantId]);
if (rows.length) return rows[0].ticket_id;
if (!apiKey) throw new Error('No tickets exist; missing API key for fallback ticket creation.');
return createTicket(ctx, { tenantId, apiKey });
}
async function getOrCreateProjectId(ctx, { tenantId, apiKey }) {
const phaseRows = await ctx.db.query(`select project_id from project_phases where tenant = $1 limit 1`, [tenantId]);
if (phaseRows.length) return phaseRows[0].project_id;
if (!apiKey) throw new Error('No projects exist; missing API key for fallback project creation.');
const client = await pickOne(ctx, {
label: 'a client',
sql: `select client_id from clients where tenant = $1 order by created_at asc limit 1`,
params: [tenantId]
});
return createProject(ctx, { tenantId, apiKey, clientId: client.client_id });
}
async function listTicketComments(ctx, { tenantId, ticketId, limit = 50 }) {
return ctx.db.query(
`
select comment_id, note, is_internal, metadata, created_at
from comments
where tenant = $1 and ticket_id = $2
order by created_at desc
limit ${Number(limit) || 50}
`,
[tenantId, ticketId]
);
}
async function listProjectTasks(ctx, { tenantId, projectId, limit = 50 }) {
return ctx.db.query(
`
select t.task_id, t.task_name, t.created_at
from project_tasks t
join project_phases p on p.phase_id = t.phase_id and p.tenant = t.tenant
where p.tenant = $1 and p.project_id = $2
order by t.created_at desc
limit ${Number(limit) || 50}
`,
[tenantId, projectId]
);
}
async function triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload }) {
await ctx.http.request('/api/workflow/events', {
method: 'POST',
json: {
eventName,
correlationKey,
payloadSchemaRef: schemaRef,
payload
}
});
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function getLatestFixtureRun({ ctx, workflowId, tenantId, startedAfter, fixtureNotifyUserId, fixtureDedupeKey }) {
const rows = await ctx.db.query(
`
select
run_id,
workflow_id,
workflow_version,
tenant,
status,
event_type,
source_payload_schema_ref,
trigger_mapping_applied,
started_at,
completed_at,
updated_at,
error_json
from workflow_runs
where workflow_id = $1
and tenant = $2
and started_at >= $3
and input_json->>'fixtureNotifyUserId' = $4
and ($5::text is null or input_json->>'fixtureDedupeKey' = $5)
order by started_at desc
limit 1
`,
[workflowId, tenantId, startedAfter, fixtureNotifyUserId, fixtureDedupeKey ?? null]
);
return rows[0] ?? null;
}
async function waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId, fixtureDedupeKey, timeoutMs, pollMs = 500 }) {
if (!fixtureNotifyUserId) throw new Error('waitForFixtureRun requires fixtureNotifyUserId');
const workflowId = ctx.workflow.id;
const tenantId = ctx.config.tenantId;
const timeout = Number(timeoutMs ?? ctx.config.timeoutMs ?? 60_000);
const deadline = Date.now() + timeout;
let last = null;
while (Date.now() < deadline) {
// eslint-disable-next-line no-await-in-loop
last = await getLatestFixtureRun({ ctx, workflowId, tenantId, startedAfter, fixtureNotifyUserId, fixtureDedupeKey });
if (last) {
const status = String(last.status || '');
const isTerminal = status === 'SUCCEEDED' || status === 'FAILED' || status === 'CANCELED';
if (isTerminal) return last;
}
// eslint-disable-next-line no-await-in-loop
await sleep(pollMs);
}
const err = new Error(
`Timed out waiting for workflow run (workflowId=${workflowId}, tenantId=${tenantId}, startedAfter=${startedAfter}, fixtureNotifyUserId=${fixtureNotifyUserId}).`
);
err.details = { lastSeen: last };
throw err;
}
async function withWorkflowPaused(ctx, workflowId, fn) {
const tenantId = ctx.config.tenantId;
const rows = await ctx.db.query(`select is_paused from workflow_definitions where workflow_id = $1 and tenant = $2`, [workflowId, tenantId]);
const wasPaused = rows[0]?.is_paused ?? false;
if (!wasPaused) {
await ctx.dbWrite.query(`update workflow_definitions set is_paused = true where workflow_id = $1 and tenant = $2`, [workflowId, tenantId]);
}
try {
return await fn();
} finally {
if (!wasPaused) {
await ctx.dbWrite.query(`update workflow_definitions set is_paused = false where workflow_id = $1 and tenant = $2`, [workflowId, tenantId]);
}
}
}
async function runTicketCommentDefault(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
const correlationKey = ticketId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const found = comments.find((c) => typeof c.note === 'string' && c.note.includes(marker));
if (!found) {
throw new Error(`Expected a ticket comment containing "${marker}" on ticket ${ticketId}. Found ${comments.length} comment(s).`);
}
}
async function runTicketCommentIdempotent(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
const correlationKey = ticketId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter1 = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow1 = await waitForFixtureRun(ctx, { startedAfter: startedAfter1, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow1);
const startedAfter2 = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow2 = await waitForFixtureRun(ctx, { startedAfter: startedAfter2, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow2);
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const markerComments = comments.filter((c) => typeof c.note === 'string' && c.note.includes(marker));
const dedupeComments = markerComments.filter((c) => typeof c.note === 'string' && c.note.includes(dedupeKey));
const found = dedupeComments.length ? dedupeComments : markerComments;
if (found.length < 1) {
throw new Error(
`Expected at least 1 ticket comment containing "${marker}" (dedupeKey=${dedupeKey}) on ticket ${ticketId}. Found ${markerComments.length} marker comment(s) (${comments.length} total).`
);
}
}
async function runTicketCommentForEach(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
const correlationKey = ticketId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const markerComments = comments.filter((c) => typeof c.note === 'string' && c.note.includes(marker));
const dedupeComments = markerComments.filter((c) => typeof c.note === 'string' && c.note.includes(dedupeKey));
const found = dedupeComments.length ? dedupeComments : markerComments;
if (found.length < 2) {
throw new Error(
`Expected at least 2 ticket comments containing "${marker}" (dedupeKey=${dedupeKey}) on ticket ${ticketId}. Found ${found.length}.`
);
}
}
async function runTicketCommentTryCatch(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
const correlationKey = ticketId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureBadUserId: randomUUID(),
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const markerComments = comments.filter((c) => typeof c.note === 'string' && c.note.includes(marker));
const dedupeComments = markerComments.filter((c) => typeof c.note === 'string' && c.note.includes(dedupeKey));
const found = dedupeComments.length ? dedupeComments : markerComments;
if (found.length < 1) {
throw new Error(
`Expected at least 1 ticket comment containing "${marker}" (dedupeKey=${dedupeKey}) on ticket ${ticketId}. Found ${markerComments.length} marker comment(s) (${comments.length} total).`
);
}
}
async function runTicketCommentMultiBranch(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
const correlationKey = ticketId;
async function runVariant(variant) {
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey,
fixtureVariant: variant
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
return dedupeKey;
}
const a = await runVariant('A');
const b = await runVariant('B');
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const hasA = comments.some((c) => typeof c.note === 'string' && c.note.includes(marker) && c.note.includes(a));
const hasB = comments.some((c) => typeof c.note === 'string' && c.note.includes(marker) && c.note.includes(b));
if (!hasA || !hasB) {
throw new Error(`Expected ticket comments for both variants (A+B) containing "${marker}" on ticket ${ticketId}.`);
}
}
async function runTicketCommentFixture(ctx, opts) {
const { fixtureName, eventName, schemaRef, pattern = 'default' } = opts ?? {};
if (!fixtureName || !eventName || !schemaRef) throw new Error('runTicketCommentFixture requires fixtureName, eventName, schemaRef');
switch (pattern) {
case 'default':
return runTicketCommentDefault(ctx, { fixtureName, eventName, schemaRef });
case 'idempotent':
return runTicketCommentIdempotent(ctx, { fixtureName, eventName, schemaRef });
case 'forEach':
return runTicketCommentForEach(ctx, { fixtureName, eventName, schemaRef });
case 'tryCatch':
return runTicketCommentTryCatch(ctx, { fixtureName, eventName, schemaRef });
case 'multiBranch':
return runTicketCommentMultiBranch(ctx, { fixtureName, eventName, schemaRef });
default:
throw new Error(`Unknown ticket comment fixture pattern: ${pattern}`);
}
}
function isProjectPayloadViaNotificationFixture(eventName) {
// These fixtures don't use the shared notification fixture payload builder, and schemas may not allow fixtureDedupeKey.
return !['INTEGRATION_SYNC_FAILED', 'INTEGRATION_WEBHOOK_RECEIVED', 'EMAIL_PROVIDER_CONNECTED', 'PAYMENT_RECORDED'].includes(eventName);
}
async function runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef }) {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
let correlationKey = projectId;
let payload;
if (!isProjectPayloadViaNotificationFixture(eventName)) {
if (eventName === 'INTEGRATION_SYNC_FAILED') {
const syncId = `fixture-sync-${randomUUID()}`;
correlationKey = syncId;
payload = {
integrationId: projectId,
provider: 'fixture',
syncId,
errorMessage: 'fixture sync failed',
retryable: true,
fixtureNotifyUserId: user.user_id
};
} else if (eventName === 'INTEGRATION_WEBHOOK_RECEIVED') {
const webhookId = `fixture-webhook-${randomUUID()}`;
correlationKey = webhookId;
payload = {
integrationId: projectId,
provider: 'fixture',
webhookId,
eventName: 'fixture.updated',
fixtureNotifyUserId: user.user_id
};
} else if (eventName === 'EMAIL_PROVIDER_CONNECTED') {
correlationKey = projectId;
payload = {
providerId: projectId,
providerType: 'google',
providerName: 'Fixture Provider',
mailbox: 'fixture-mailbox@example.com',
connectedAt: new Date().toISOString(),
fixtureNotifyUserId: user.user_id
};
} else if (eventName === 'PAYMENT_RECORDED') {
correlationKey = projectId;
payload = {
paymentId: projectId,
amount: '42.00',
currency: 'USD',
method: 'wire',
fixtureNotifyUserId: user.user_id
};
} else {
throw new Error(`Unsupported non-notification fixture eventName: ${eventName}`);
}
} else {
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const dedupeKey = randomUUID();
payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
}
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const fixtureDedupeKey = payload?.fixtureDedupeKey ?? null;
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey });
await assertRunSucceeded(ctx, runRow);
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const found = tasks.find((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
if (!found) {
throw new Error(`Expected a project task containing "${marker}" on project ${projectId}. Found ${tasks.length} task(s).`);
}
}
async function runProjectTaskIdempotent(ctx, { fixtureName, eventName, schemaRef }) {
if (!isProjectPayloadViaNotificationFixture(eventName)) {
return runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef });
}
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
const correlationKey = projectId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter1 = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow1 = await waitForFixtureRun(ctx, { startedAfter: startedAfter1, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow1);
const startedAfter2 = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow2 = await waitForFixtureRun(ctx, { startedAfter: startedAfter2, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow2);
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const found = tasks.filter((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
if (found.length < 1) {
throw new Error(`Expected a project task containing "${marker}" on project ${projectId}. Found ${tasks.length} task(s).`);
}
}
async function runProjectTaskForEach(ctx, { fixtureName, eventName, schemaRef }) {
if (!isProjectPayloadViaNotificationFixture(eventName)) {
return runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef });
}
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
const correlationKey = projectId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const found = tasks.filter((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
if (found.length < 2) {
throw new Error(`Expected at least 2 project tasks containing "${marker}" on project ${projectId}. Found ${found.length}.`);
}
}
async function runProjectTaskTryCatch(ctx, { fixtureName, eventName, schemaRef }) {
if (!isProjectPayloadViaNotificationFixture(eventName)) {
return runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef });
}
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
const correlationKey = projectId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureBadUserId: randomUUID(),
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const found = tasks.find((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
if (!found) {
throw new Error(`Expected a project task containing "${marker}" on project ${projectId}. Found ${tasks.length} task(s).`);
}
}
async function runProjectTaskMultiBranch(ctx, { fixtureName, eventName, schemaRef }) {
if (!isProjectPayloadViaNotificationFixture(eventName)) {
return runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef });
}
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const tenantId = ctx.config.tenantId;
const marker = `[fixture ${fixtureName}]`;
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
const correlationKey = projectId;
async function runVariant(variant) {
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey,
fixtureVariant: variant
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
}
await runVariant('A');
await runVariant('B');
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const found = tasks.filter((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
if (found.length < 2) {
throw new Error(`Expected project tasks for both variants (A+B) containing "${marker}" on project ${projectId}. Found ${found.length}.`);
}
}
async function runProjectTaskFixture(ctx, opts) {
const { fixtureName, eventName, schemaRef, pattern = 'default' } = opts ?? {};
if (!fixtureName || !eventName || !schemaRef) throw new Error('runProjectTaskFixture requires fixtureName, eventName, schemaRef');
switch (pattern) {
case 'default':
return runProjectTaskDefault(ctx, { fixtureName, eventName, schemaRef });
case 'idempotent':
return runProjectTaskIdempotent(ctx, { fixtureName, eventName, schemaRef });
case 'forEach':
return runProjectTaskForEach(ctx, { fixtureName, eventName, schemaRef });
case 'tryCatch':
return runProjectTaskTryCatch(ctx, { fixtureName, eventName, schemaRef });
case 'multiBranch':
return runProjectTaskMultiBranch(ctx, { fixtureName, eventName, schemaRef });
default:
throw new Error(`Unknown project task fixture pattern: ${pattern}`);
}
}
async function publishWorkflow(ctx, { workflowId, version }) {
await ctx.http.request(`/api/workflow-definitions/${workflowId}/${version}/publish`, {
method: 'POST',
json: {}
});
}
async function updateDraft(ctx, { workflowId, definition }) {
await ctx.http.request(`/api/workflow-definitions/${workflowId}/1`, {
method: 'PUT',
json: { definition }
});
}
async function getExportedDraftDefinition(ctx, { workflowId }) {
const res = await ctx.http.request(`/api/workflow-definitions/${workflowId}/export`, { method: 'GET' });
const bundle = res.json;
if (!bundle || !Array.isArray(bundle.workflows) || !bundle.workflows[0]?.draft?.definition) {
throw new Error(`Export did not return a draft definition for workflow ${workflowId}`);
}
return bundle.workflows[0].draft.definition;
}
async function getNextPublishVersion(ctx, { workflowId }) {
const rows = await ctx.db.query(`select max(version) as max_version from workflow_definition_versions where workflow_id = $1`, [workflowId]);
const max = rows[0]?.max_version ?? null;
const n = max === null || max === undefined ? 0 : Number(max);
return Number.isFinite(n) && n > 0 ? n + 1 : 1;
}
async function runCallWorkflowBizFixture(ctx, { fixtureName, eventName, schemaRef, kind }) {
const tenantId = ctx.config.tenantId;
const parentWorkflowId = ctx.workflow.id;
const childKey = `subfixture.${fixtureName}`;
const childWorkflowId =
Array.isArray(ctx.workflow?.importSummary?.createdWorkflows)
? ctx.workflow.importSummary.createdWorkflows.find((w) => w.key === childKey)?.workflowId ?? null
: null;
if (!childWorkflowId) {
throw new Error(`callWorkflow fixture missing child workflowId for key ${childKey}`);
}
// Publish child (ensure a version exists for callWorkflow.workflowVersion).
const childVersion = await getNextPublishVersion(ctx, { workflowId: childWorkflowId });
await publishWorkflow(ctx, { workflowId: childWorkflowId, version: childVersion });
// Patch parent draft to point callWorkflow step at child workflowId + version.
const parentDraft = await getExportedDraftDefinition(ctx, { workflowId: parentWorkflowId });
const callStep = Array.isArray(parentDraft.steps)
? parentDraft.steps.find((s) => s && typeof s === 'object' && s.type === 'control.callWorkflow')
: null;
if (!callStep) {
throw new Error(`callWorkflow fixture parent definition missing control.callWorkflow step (${fixtureName})`);
}
callStep.workflowId = childWorkflowId;
callStep.workflowVersion = childVersion;
ensureCallWorkflowInputMapping(callStep, { kind, eventName });
await updateDraft(ctx, { workflowId: parentWorkflowId, definition: parentDraft });
const parentVersion = await getNextPublishVersion(ctx, { workflowId: parentWorkflowId });
await publishWorkflow(ctx, { workflowId: parentWorkflowId, version: parentVersion });
const marker = `[fixture ${fixtureName}]`;
const childMarker = `[fixture ${fixtureName} child]`;
if (kind === 'ticket_comment') {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const user = await pickUser(ctx, { tenantId });
const ticketId = await getOrCreateTicketId(ctx, { tenantId, apiKey });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker });
await cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker: childMarker });
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker }));
ctx.onCleanup(() => cleanupTicketCommentsByMarker(ctx, { tenantId, ticketId, marker: childMarker }));
const correlationKey = ticketId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const comments = await listTicketComments(ctx, { tenantId, ticketId, limit: 200 });
const hasParent = comments.some((c) => typeof c.note === 'string' && c.note.includes(marker));
const hasChild = comments.some((c) => typeof c.note === 'string' && c.note.includes(childMarker));
if (!hasParent || !hasChild) {
throw new Error(`Expected both parent + child comments for "${marker}" on ticket ${ticketId}.`);
}
return;
}
if (kind === 'project_task') {
const apiKey = getApiKey();
if (!apiKey) throw new Error('Missing WORKFLOW_HARNESS_API_KEY (or ALGA_API_KEY) for /api/v1 calls.');
const user = await pickUser(ctx, { tenantId });
const projectId = await getOrCreateProjectId(ctx, { tenantId, apiKey });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker });
await cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker: childMarker });
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker }));
ctx.onCleanup(() => cleanupProjectTasksByMarker(ctx, { tenantId, projectId, marker: childMarker }));
const correlationKey = projectId;
const dedupeKey = randomUUID();
const base = buildBasePayloadForEvent({ eventName, correlationKey, userId: user.user_id });
const payload = {
...base,
fixtureNotifyUserId: user.user_id,
fixtureDedupeKey: dedupeKey
};
const startedAfter = new Date().toISOString();
await triggerEvent(ctx, { eventName, schemaRef, correlationKey, payload });
const runRow = await waitForFixtureRun(ctx, { startedAfter, fixtureNotifyUserId: user.user_id, fixtureDedupeKey: dedupeKey });
await assertRunSucceeded(ctx, runRow);
const tasks = await listProjectTasks(ctx, { tenantId, projectId, limit: 200 });
const hasParent = tasks.some((t) => typeof t.task_name === 'string' && t.task_name.includes(marker));
const hasChild = tasks.some((t) => typeof t.task_name === 'string' && t.task_name.includes(childMarker));
if (!hasParent || !hasChild) {
throw new Error(`Expected both parent + child tasks for "${marker}" on project ${projectId}.`);
}
return;
}
throw new Error(`Unknown callWorkflow biz fixture kind: ${kind}`);
}
module.exports = {
runTicketCommentFixture,
runProjectTaskFixture,
runCallWorkflowBizFixture
};