import { Knex } from 'knex'; export type WorkflowDefinitionRecord = { workflow_id: string; // uuid Citus distribution column. The legacy textual `tenant_id` column is // being phased out (dropped in the cleanup migration) and is not referenced here. tenant: string; key?: string | null; name: string; description?: string | null; payload_schema_ref: string; payload_schema_mode?: 'inferred' | 'pinned' | string | null; pinned_payload_schema_ref?: string | null; payload_schema_provenance?: string | null; trigger?: Record | null; draft_definition: Record; draft_version: number; status: string; validation_status?: string | null; validation_errors?: Record[] | null; validation_warnings?: Record[] | null; validation_context_json?: Record | null; validation_payload_schema_hash?: string | null; validated_at?: string | null; published_version?: number | null; is_system?: boolean; is_visible?: boolean; is_paused?: boolean; concurrency_limit?: number | null; auto_pause_on_failure?: boolean; failure_rate_threshold?: number | string | null; failure_rate_min_runs?: number | null; retention_policy_override?: Record | null; created_by?: string | null; updated_by?: string | null; created_at: string; updated_at: string; }; const serializeJsonArrayForPgJsonColumn = (value: unknown): unknown => { // node-postgres treats JS arrays as Postgres arrays, not JSON, which breaks inserts into `json/jsonb` columns. // Serialize explicitly so Postgres receives valid JSON text (e.g. `[{"...": "..."}]`). return Array.isArray(value) ? JSON.stringify(value) : value; }; const normalizeWorkflowDefinitionWrite = ( data: Partial ): Partial => { const out: Partial = { ...data }; if ('validation_errors' in out) { out.validation_errors = serializeJsonArrayForPgJsonColumn(out.validation_errors) as any; } if ('validation_warnings' in out) { out.validation_warnings = serializeJsonArrayForPgJsonColumn(out.validation_warnings) as any; } return out; }; const assertTenantId = (tenantId: string | null | undefined): string => { const normalized = String(tenantId ?? '').trim(); if (!normalized) { throw new Error('tenant_id is required for workflow definition access'); } return normalized; }; const WorkflowDefinitionModelV2 = { create: async (knex: Knex, tenantId: string, data: Partial): Promise => { const tenant = assertTenantId(tenantId); const normalized = normalizeWorkflowDefinitionWrite(data); const [record] = await knex('workflow_definitions') .insert({ ...normalized, tenant, is_system: false, created_at: new Date().toISOString(), updated_at: new Date().toISOString() }) .returning('*'); return record; }, update: async (knex: Knex, tenantId: string, workflowId: string, data: Partial): Promise => { const tenant = assertTenantId(tenantId); const normalized = normalizeWorkflowDefinitionWrite(data); delete (normalized as Record).tenant_id; delete (normalized as Record).tenant; const [record] = await knex('workflow_definitions') .where({ workflow_id: workflowId, tenant }) .update({ ...normalized, is_system: false, updated_at: new Date().toISOString() }) .returning('*'); return record; }, getById: async (knex: Knex, tenantId: string, workflowId: string): Promise => { const tenant = assertTenantId(tenantId); const record = await knex('workflow_definitions') .where({ workflow_id: workflowId, tenant }) .first(); return record || null; }, list: async (knex: Knex, tenantId: string): Promise => { const tenant = assertTenantId(tenantId); return knex('workflow_definitions') .select('*') .where({ tenant }); } }; export default WorkflowDefinitionModelV2;