PSA/shared/task-inbox/taskInboxService.ts
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

374 lines
12 KiB
TypeScript

import type { Knex } from 'knex';
import { v4 as uuidv4 } from 'uuid';
import WorkflowTaskModel, {
WorkflowTaskStatus,
publishWorkflowTaskSearchEvent,
} from '../workflow/persistence/workflowTaskModel';
import { TaskCreationParams } from '../workflow/persistence/taskInboxInterfaces';
/**
* Service for managing workflow tasks in the Task Inbox system
*/
export class TaskInboxService {
/**
* Create a new task in the Task Inbox
*
* @param knex Knex instance
* @param tenant Tenant ID
* @param executionId Workflow execution ID
* @param params Task creation parameters
* @param userId User ID of the creator
* @returns The created task ID
*/
async createTask(
knex: Knex,
tenant: string,
executionId: string,
params: TaskCreationParams,
userId?: string
): Promise<string> {
try {
// taskId is generated by WorkflowTaskModel.createTask
let resolvedDefinitionType: 'tenant' | 'system';
let tenantTaskDefinitionIdForTask: string | null = null;
let systemTaskDefinitionTaskTypeForTask: string | null = null;
// 1. Attempt to find tenantTaskDef
const tenantTaskDef = await knex('workflow_task_definitions')
.where({
form_id: params.taskType,
tenant: tenant, // Current tenant
})
.first();
if (tenantTaskDef) {
tenantTaskDefinitionIdForTask = tenantTaskDef.task_definition_id;
resolvedDefinitionType = 'tenant';
} else {
// 2. If tenantTaskDef not found, attempt to find systemTaskDef
const systemTaskDef = await knex('system_workflow_task_definitions')
.where({
task_type: params.taskType,
})
.first();
if (systemTaskDef) {
systemTaskDefinitionTaskTypeForTask = systemTaskDef.task_type; // For system tasks, task_type is the ID
resolvedDefinitionType = 'system';
} else {
// If neither tenant nor system definition is found, throw an error.
// Task definitions must exist before tasks can be created.
throw new Error(`Task definition with taskType '${params.taskType}' not found for tenant '${tenant}' or as a system definition.`);
}
}
let assignedRoles: string[] | undefined = undefined;
if (params.assignTo?.roles) {
if (typeof params.assignTo.roles === 'string') {
assignedRoles = [params.assignTo.roles];
} else if (Array.isArray(params.assignTo.roles)) {
assignedRoles = params.assignTo.roles;
}
}
let assignedUsers: string[] | undefined = undefined;
if (params.assignTo?.users) {
if (typeof params.assignTo.users === 'string') {
assignedUsers = [params.assignTo.users];
} else if (Array.isArray(params.assignTo.users)) {
assignedUsers = params.assignTo.users;
}
}
if (assignedRoles !== undefined && !Array.isArray(assignedRoles)) {
assignedRoles = undefined;
}
if (assignedUsers !== undefined && !Array.isArray(assignedUsers)) {
assignedUsers = undefined;
}
// Prepare the task object with the new dual FK structure.
// IWorkflowTask will need to be updated to reflect these new fields.
// For now, we construct the object as expected by WorkflowTaskModel.createTask after its own update.
const taskPayload = {
tenant,
execution_id: executionId,
tenant_task_definition_id: tenantTaskDefinitionIdForTask,
system_task_definition_task_type: systemTaskDefinitionTaskTypeForTask,
task_definition_type: resolvedDefinitionType,
title: params.title,
description: params.description,
status: WorkflowTaskStatus.PENDING,
priority: params.priority || 'medium',
due_date: params.dueDate ? new Date(params.dueDate).toISOString() : undefined,
context_data: params.contextData,
assigned_roles: assignedRoles,
assigned_users: assignedUsers,
created_by: userId
};
const generatedTaskId = await WorkflowTaskModel.createTask(knex, tenant, taskPayload as any);
// Add task history entry - using the generated task ID from the database
try {
await WorkflowTaskModel.addTaskHistory(knex, tenant, {
task_id: generatedTaskId, // Use the task ID returned from createTask
tenant,
action: 'create',
from_status: undefined,
to_status: WorkflowTaskStatus.PENDING,
user_id: userId
});
} catch (error) {
console.error('Error adding task history:', error);
}
return generatedTaskId; // Return the generated task ID from the database
} catch (error) {
console.error('Error creating task:', error);
throw error;
}
}
/**
* Creates a task with an inline form definition
*
* @param knex Knex instance
* @param tenant Tenant ID
* @param executionId Workflow execution ID
* @param params Task and form parameters
* @param userId User ID of the creator
* @returns The created task ID
*/
async createTaskWithInlineForm(
knex: Knex,
tenant: string,
executionId: string,
params: {
title: string;
description?: string;
priority?: string;
dueDate?: string | Date;
assignTo?: {
roles?: string[] | string;
users?: string[] | string;
};
contextData?: Record<string, any>;
form: {
jsonSchema: Record<string, any>;
uiSchema?: Record<string, any>;
defaultValues?: Record<string, any>;
};
formCategory?: string;
},
userId?: string
): Promise<string> {
try {
let assignedRoles: string[] | undefined = undefined;
let assignedUsers: string[] | undefined = undefined;
if (params.assignTo?.roles) {
if (typeof params.assignTo.roles === 'string') {
assignedRoles = [params.assignTo.roles];
} else if (Array.isArray(params.assignTo.roles)) {
assignedRoles = params.assignTo.roles;
}
}
if (params.assignTo?.users) {
if (typeof params.assignTo.users === 'string') {
assignedUsers = [params.assignTo.users];
} else if (Array.isArray(params.assignTo.users)) {
assignedUsers = params.assignTo.users;
}
}
// Start a transaction to ensure atomicity
const taskId = await knex.transaction(async (trx: Knex.Transaction) => {
// Generate a unique form ID
const formId = uuidv4();
const tempTaskType = `inline_task_${formId}`;
// Create the temporary form in the tenant's workflow_form_definitions table
await trx('workflow_form_definitions').insert({
form_id: formId,
tenant: tenant,
name: `Inline Form - ${params.title}`,
description: `Inline form for task: ${params.title}`,
version: '1.0',
status: 'active',
category: params.formCategory || 'inline_forms',
is_temporary: true,
created_by: userId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
});
// Add the form schema
await trx('workflow_form_schemas').insert({
schema_id: uuidv4(),
form_id: formId,
tenant: tenant,
json_schema: params.form.jsonSchema,
ui_schema: params.form.uiSchema || {},
default_values: params.form.defaultValues || {},
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
});
// Create a task definition in the tenant's task definitions table
const taskDefinitionId = uuidv4();
await trx('workflow_task_definitions').insert({
task_definition_id: taskDefinitionId,
tenant: tenant,
name: tempTaskType,
description: `Auto-generated task definition for inline form`,
form_id: formId,
form_type: 'tenant',
default_priority: params.priority || 'medium',
created_by: userId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
});
// Generate task ID
const taskId = uuidv4();
// Insert the task with references to the tenant task definition
await trx('workflow_tasks').insert({
task_id: taskId,
tenant: tenant,
execution_id: executionId,
task_definition_type: 'tenant',
system_task_definition_task_type: null,
tenant_task_definition_id: taskDefinitionId,
title: params.title,
description: params.description,
status: WorkflowTaskStatus.PENDING,
priority: params.priority || 'medium',
due_date: params.dueDate ? new Date(params.dueDate).toISOString() : null,
context_data: params.contextData ? JSON.stringify(params.contextData) : null,
assigned_roles: assignedRoles ? JSON.stringify(assignedRoles) : null,
assigned_users: assignedUsers ? JSON.stringify(assignedUsers) : null,
created_by: userId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
});
// Add task history entry
await WorkflowTaskModel.addTaskHistory(trx, tenant, {
task_id: taskId,
tenant: tenant,
action: 'create',
from_status: undefined,
to_status: WorkflowTaskStatus.PENDING,
user_id: userId
});
return taskId;
});
await publishWorkflowTaskSearchEvent('WORKFLOW_TASK_CREATED', tenant, taskId, {
userId,
status: WorkflowTaskStatus.PENDING,
assignedUserIds: assignedUsers,
changedFields: ['title', 'description', 'assigned_users', 'status'],
});
return taskId;
} catch (error) {
console.error('Error creating task with inline form:', error);
throw error;
}
}
/**
* Clean up temporary forms for a specific tenant
*
* @param knex Knex instance
* @param tenant Tenant ID
* @returns Number of forms deleted
*/
async cleanupTemporaryForms(knex: Knex, tenant: string): Promise<number> {
try {
// First get the IDs of all temporary forms for this tenant
const tempForms = await knex('workflow_form_definitions')
.where({ tenant: tenant, is_temporary: true })
.select('form_id');
if (tempForms.length === 0) {
return 0;
}
const formIds = tempForms.map((f: { form_id: string }) => f.form_id);
// Delete task definitions that reference these forms
await knex('workflow_task_definitions')
.where({ tenant: tenant })
.whereIn('form_id', formIds)
.delete();
// Delete schemas
await knex('workflow_form_schemas')
.where({ tenant: tenant })
.whereIn('form_id', formIds)
.delete();
// Then delete the forms themselves
const deletedCount = await knex('workflow_form_definitions')
.where({ tenant: tenant, is_temporary: true })
.delete();
return deletedCount;
} catch (error) {
console.error(`Error cleaning up temporary forms for tenant ${tenant}:`, error);
throw error;
}
}
/**
* Clean up temporary forms for all tenants
*
* @param knex Knex instance
* @returns Total number of forms deleted
*/
async cleanupAllTemporaryForms(knex: Knex): Promise<number> {
try {
// Get distinct tenants that have temporary forms
const tenants = await knex('workflow_form_definitions')
.where({ is_temporary: true })
.distinct('tenant')
.pluck('tenant');
let totalDeleted = 0;
// Clean up for each tenant
for (const tenant of tenants) {
const deleted = await this.cleanupTemporaryForms(knex, tenant);
totalDeleted += deleted;
}
return totalDeleted;
} catch (error) {
console.error('Error cleaning up temporary forms for all tenants:', error);
throw error;
}
}
}
// Singleton instance
let taskInboxServiceInstance: TaskInboxService | null = null;
/**
* Get the task inbox service instance
*/
export function getTaskInboxService(): TaskInboxService {
if (!taskInboxServiceInstance) {
taskInboxServiceInstance = new TaskInboxService();
}
return taskInboxServiceInstance;
}