Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
545 lines
14 KiB
TypeScript
545 lines
14 KiB
TypeScript
import { Knex } from 'knex';
|
|
import { v4 as uuidv4 } from 'uuid';
|
|
import { publishEvent } from '@alga-psa/event-bus/publishers';
|
|
|
|
/**
|
|
* Interface for workflow task definition
|
|
*/
|
|
export interface IWorkflowTaskDefinition {
|
|
task_definition_id: string;
|
|
tenant: string;
|
|
task_type: string;
|
|
name: string;
|
|
description?: string;
|
|
form_id: string;
|
|
default_priority?: string;
|
|
default_sla_days?: number;
|
|
created_at: string;
|
|
updated_at: string;
|
|
}
|
|
|
|
/**
|
|
* Interface for workflow task instance
|
|
*/
|
|
export interface IWorkflowTask {
|
|
task_id: string;
|
|
tenant: string;
|
|
execution_id: string;
|
|
event_id?: string;
|
|
// task_definition_id: string; // OLD: Stores UUID for tenant task_definitions, or task_type (string) for system_task_definitions
|
|
tenant_task_definition_id?: string | null; // FK to workflow_task_definitions.task_definition_id (UUID)
|
|
system_task_definition_task_type?: string | null; // FK to system_workflow_task_definitions.task_type (TEXT)
|
|
task_definition_type: 'tenant' | 'system'; // Indicates which FK is active
|
|
title: string;
|
|
description?: string;
|
|
status: WorkflowTaskStatus;
|
|
priority: string;
|
|
due_date?: string;
|
|
context_data?: Record<string, any>;
|
|
assigned_roles?: string[];
|
|
assigned_users?: string[];
|
|
created_at: string;
|
|
updated_at: string;
|
|
created_by?: string;
|
|
claimed_at?: string;
|
|
claimed_by?: string;
|
|
completed_at?: string;
|
|
completed_by?: string;
|
|
response_data?: Record<string, any>;
|
|
}
|
|
|
|
/**
|
|
* Enum for workflow task status
|
|
*/
|
|
export enum WorkflowTaskStatus {
|
|
PENDING = 'pending',
|
|
CLAIMED = 'claimed',
|
|
COMPLETED = 'completed',
|
|
CANCELED = 'canceled',
|
|
EXPIRED = 'expired'
|
|
}
|
|
|
|
type WorkflowTaskSearchEventType =
|
|
| 'WORKFLOW_TASK_CREATED'
|
|
| 'WORKFLOW_TASK_UPDATED'
|
|
| 'WORKFLOW_TASK_DELETED'
|
|
| 'WORKFLOW_TASK_ASSIGNMENT_CHANGED';
|
|
|
|
export async function publishWorkflowTaskSearchEvent(
|
|
eventType: WorkflowTaskSearchEventType,
|
|
tenant: string,
|
|
taskId: string,
|
|
options: {
|
|
userId?: string;
|
|
status?: string;
|
|
assignedUserIds?: string[];
|
|
changedFields?: string[];
|
|
} = {},
|
|
): Promise<void> {
|
|
try {
|
|
await publishEvent({
|
|
eventType,
|
|
payload: {
|
|
tenantId: tenant,
|
|
taskId,
|
|
userId: options.userId,
|
|
status: options.status,
|
|
assignedUserIds: options.assignedUserIds,
|
|
changedFields: options.changedFields,
|
|
timestamp: new Date().toISOString(),
|
|
},
|
|
});
|
|
} catch (eventError) {
|
|
console.error(`[WorkflowTaskModel] Failed to publish ${eventType} search event:`, eventError);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Interface for task history entry
|
|
*/
|
|
export interface IWorkflowTaskHistory {
|
|
history_id: string;
|
|
task_id: string;
|
|
tenant: string;
|
|
action: string;
|
|
from_status?: string;
|
|
to_status?: string;
|
|
user_id?: string;
|
|
timestamp: string;
|
|
details?: Record<string, any>;
|
|
}
|
|
|
|
/**
|
|
* Model for workflow_tasks table
|
|
*/
|
|
const WorkflowTaskModel = {
|
|
/**
|
|
* Create a new task
|
|
*/
|
|
createTask: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
task: Omit<IWorkflowTask, 'task_id' | 'created_at' | 'updated_at'> // event_id is optional in IWorkflowTask, allow it to be passed
|
|
): Promise<string> => {
|
|
try {
|
|
const taskId = uuidv4(); // Removed "task-" prefix
|
|
|
|
const taskToInsert: IWorkflowTask = {
|
|
...task, // Spread the incoming task payload which should match the new structure
|
|
task_id: taskId,
|
|
tenant, // Ensure tenant is part of the final object if not already in `task`
|
|
created_at: new Date().toISOString(),
|
|
updated_at: new Date().toISOString(),
|
|
// Ensure event_id is handled if it's part of the input `task` or set to undefined
|
|
event_id: task.event_id || undefined,
|
|
};
|
|
|
|
// Ensure JSON fields are correctly formatted for PostgreSQL
|
|
const finalTaskRecord = {
|
|
...taskToInsert,
|
|
assigned_roles: taskToInsert.assigned_roles ? JSON.stringify(taskToInsert.assigned_roles) : null,
|
|
assigned_users: taskToInsert.assigned_users ? JSON.stringify(taskToInsert.assigned_users) : null,
|
|
context_data: taskToInsert.context_data ? JSON.stringify(taskToInsert.context_data) : null,
|
|
response_data: taskToInsert.response_data ? JSON.stringify(taskToInsert.response_data) : null,
|
|
};
|
|
|
|
const [result] = await knex('workflow_tasks')
|
|
.insert(finalTaskRecord)
|
|
.returning('task_id');
|
|
|
|
if (!result || !result.task_id) {
|
|
throw new Error('Task creation failed, no task_id returned.');
|
|
}
|
|
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_CREATED', tenant, result.task_id, {
|
|
userId: task.created_by,
|
|
status: task.status,
|
|
assignedUserIds: task.assigned_users,
|
|
changedFields: ['title', 'description', 'assigned_users', 'status'],
|
|
});
|
|
|
|
return result.task_id;
|
|
} catch (error) {
|
|
console.error('Error creating workflow task:', error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Get a task by ID
|
|
*/
|
|
getTaskById: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string
|
|
): Promise<IWorkflowTask | null> => {
|
|
try {
|
|
const task = await knex<IWorkflowTask>('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.first();
|
|
|
|
return task || null;
|
|
} catch (error) {
|
|
console.error(`Error getting task ${taskId}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Get tasks by execution ID
|
|
*/
|
|
getTasksByExecutionId: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
executionId: string
|
|
): Promise<IWorkflowTask[]> => {
|
|
try {
|
|
const tasks = await knex<IWorkflowTask>('workflow_tasks')
|
|
.where({
|
|
execution_id: executionId,
|
|
tenant
|
|
})
|
|
.orderBy('created_at', 'desc');
|
|
|
|
return tasks;
|
|
} catch (error) {
|
|
console.error(`Error getting tasks for execution ${executionId}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Get tasks assigned to a user
|
|
*/
|
|
getTasksAssignedToUser: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
userId: string,
|
|
status?: WorkflowTaskStatus | WorkflowTaskStatus[]
|
|
): Promise<IWorkflowTask[]> => {
|
|
try {
|
|
let query = knex<IWorkflowTask>('workflow_tasks')
|
|
.where('tenant', tenant)
|
|
.whereRaw("assigned_users @> ?", [[userId]]);
|
|
|
|
if (status) {
|
|
if (Array.isArray(status)) {
|
|
query = query.whereIn('status', status);
|
|
} else {
|
|
query = query.where('status', status);
|
|
}
|
|
}
|
|
|
|
const tasks = await query.orderBy('due_date', 'asc');
|
|
|
|
return tasks;
|
|
} catch (error) {
|
|
console.error(`Error getting tasks assigned to user ${userId}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Get tasks assigned to roles
|
|
*/
|
|
getTasksAssignedToRoles: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
roles: string[],
|
|
status?: WorkflowTaskStatus | WorkflowTaskStatus[]
|
|
): Promise<IWorkflowTask[]> => {
|
|
try {
|
|
let query = knex<IWorkflowTask>('workflow_tasks')
|
|
.where('tenant', tenant)
|
|
.where(function() {
|
|
for (const role of roles) {
|
|
this.orWhereRaw("assigned_roles @> ?", [[role]]);
|
|
}
|
|
});
|
|
|
|
if (status) {
|
|
if (Array.isArray(status)) {
|
|
query = query.whereIn('status', status);
|
|
} else {
|
|
query = query.where('status', status);
|
|
}
|
|
}
|
|
|
|
const tasks = await query.orderBy('due_date', 'asc');
|
|
|
|
return tasks;
|
|
} catch (error) {
|
|
console.error(`Error getting tasks assigned to roles:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Update task status
|
|
*/
|
|
updateTaskStatus: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string,
|
|
status: WorkflowTaskStatus,
|
|
userId?: string
|
|
): Promise<boolean> => {
|
|
try {
|
|
const now = new Date().toISOString();
|
|
const updates: Partial<IWorkflowTask> = {
|
|
status,
|
|
updated_at: now
|
|
};
|
|
|
|
// Add additional fields based on the new status
|
|
if (status === WorkflowTaskStatus.CLAIMED) {
|
|
updates.claimed_at = now;
|
|
updates.claimed_by = userId;
|
|
} else if (status === WorkflowTaskStatus.COMPLETED) {
|
|
updates.completed_at = now;
|
|
// updates.completed_by = userId;
|
|
}
|
|
|
|
const result = await knex('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.update(updates);
|
|
|
|
if (result > 0) {
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_UPDATED', tenant, taskId, {
|
|
userId,
|
|
status,
|
|
changedFields: Object.keys(updates),
|
|
});
|
|
}
|
|
|
|
return result > 0;
|
|
} catch (error) {
|
|
console.error(`Error updating task ${taskId} status:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Update task response data
|
|
*/
|
|
updateTaskResponseData: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string,
|
|
responseData: Record<string, any>
|
|
): Promise<boolean> => {
|
|
try {
|
|
const result = await knex('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.update({
|
|
response_data: responseData,
|
|
updated_at: new Date().toISOString()
|
|
});
|
|
|
|
if (result > 0) {
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_UPDATED', tenant, taskId, {
|
|
changedFields: ['response_data'],
|
|
});
|
|
}
|
|
|
|
return result > 0;
|
|
} catch (error) {
|
|
console.error(`Error updating task ${taskId} response data:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Replace task assignees.
|
|
*/
|
|
updateTaskAssignment: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string,
|
|
assignedUserIds: string[],
|
|
userId?: string
|
|
): Promise<boolean> => {
|
|
try {
|
|
const result = await knex('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.update({
|
|
assigned_users: JSON.stringify(assignedUserIds),
|
|
updated_at: new Date().toISOString()
|
|
});
|
|
|
|
if (result > 0) {
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_ASSIGNMENT_CHANGED', tenant, taskId, {
|
|
userId,
|
|
assignedUserIds,
|
|
changedFields: ['assigned_users'],
|
|
});
|
|
}
|
|
|
|
return result > 0;
|
|
} catch (error) {
|
|
console.error(`Error updating workflow task ${taskId} assignment:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Delete a task.
|
|
*/
|
|
deleteTask: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string,
|
|
userId?: string
|
|
): Promise<boolean> => {
|
|
try {
|
|
const task = await knex<IWorkflowTask>('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.first();
|
|
|
|
if (!task) {
|
|
return false;
|
|
}
|
|
|
|
let deleted = 0;
|
|
await knex.transaction(async (trx) => {
|
|
await trx('workflow_task_history')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.delete();
|
|
|
|
const deletedRows = await trx('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.delete();
|
|
|
|
deleted = Number(deletedRows ?? 0);
|
|
});
|
|
|
|
if (deleted > 0) {
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_DELETED', tenant, taskId, {
|
|
userId,
|
|
status: task.status,
|
|
assignedUserIds: task.assigned_users,
|
|
});
|
|
}
|
|
|
|
return deleted > 0;
|
|
} catch (error) {
|
|
console.error(`Error deleting workflow task ${taskId}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Complete a task with response data
|
|
*/
|
|
completeTask: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string,
|
|
responseData: Record<string, any>,
|
|
userId?: string
|
|
): Promise<boolean> => {
|
|
try {
|
|
const now = new Date().toISOString();
|
|
|
|
const result = await knex('workflow_tasks')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.update({
|
|
status: WorkflowTaskStatus.COMPLETED,
|
|
response_data: responseData,
|
|
completed_at: now,
|
|
// completed_by: userId,
|
|
updated_at: now
|
|
});
|
|
|
|
if (result > 0) {
|
|
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_UPDATED', tenant, taskId, {
|
|
userId,
|
|
status: WorkflowTaskStatus.COMPLETED,
|
|
changedFields: ['status', 'response_data', 'completed_at'],
|
|
});
|
|
}
|
|
|
|
return result > 0;
|
|
} catch (error) {
|
|
console.error(`Error completing task ${taskId}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Add task history entry
|
|
*/
|
|
addTaskHistory: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
history: Omit<IWorkflowTaskHistory, 'history_id' | 'timestamp'>
|
|
): Promise<string> => {
|
|
try {
|
|
const historyId = uuidv4(); // Removed "hist-" prefix
|
|
|
|
const [result] = await knex('workflow_task_history')
|
|
.insert({
|
|
...history,
|
|
history_id: historyId,
|
|
tenant,
|
|
timestamp: new Date().toISOString()
|
|
})
|
|
.returning('history_id');
|
|
|
|
return result.history_id;
|
|
} catch (error) {
|
|
console.error(`Error adding task history for task ${history.task_id}:`, error);
|
|
throw error;
|
|
}
|
|
},
|
|
|
|
/**
|
|
* Get task history
|
|
*/
|
|
getTaskHistory: async (
|
|
knex: Knex,
|
|
tenant: string,
|
|
taskId: string
|
|
): Promise<IWorkflowTaskHistory[]> => {
|
|
try {
|
|
const history = await knex<IWorkflowTaskHistory>('workflow_task_history')
|
|
.where({
|
|
task_id: taskId,
|
|
tenant
|
|
})
|
|
.orderBy('timestamp', 'asc');
|
|
|
|
return history;
|
|
} catch (error) {
|
|
console.error(`Error getting history for task ${taskId}:`, error);
|
|
throw error;
|
|
}
|
|
}
|
|
};
|
|
|
|
export default WorkflowTaskModel;
|