Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
12 KiB
Job Runner Abstraction Layer
This guide documents the unified job runner abstraction that allows Alga PSA to use either PG Boss (Community Edition) or Temporal (Enterprise Edition) for background job processing, while maintaining a consistent job monitoring experience through the shared jobs and job_details database tables.
Overview
The job runner abstraction provides:
- Unified Interface: A single
IJobRunnerinterface for scheduling and managing background jobs - Edition-Based Selection: Automatic selection of PG Boss (CE) or Temporal (EE) based on configuration
- Shared Monitoring: Both implementations write to the same database tables for unified job visibility
- Backward Compatibility: All existing job handlers work without modification
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ JobRunnerFactory ││
│ │ - Creates PgBossJobRunner (CE) or TemporalJobRunner (EE) ││
│ └──────────────────────────────┬──────────────────────────────┘│
│ │ │
│ ┌──────────────────┴──────────────────┐ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ ┌─────────────────────────┐│
│ │ PgBossJobRunner (CE) │ │ TemporalJobRunner (EE) ││
│ │ implements IJobRunner │ │ implements IJobRunner ││
│ └───────────┬─────────────┘ └───────────┬─────────────┘│
│ │ │ │
│ └──────────────────┬─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ JobService ││
│ │ - Writes to `jobs` and `job_details` tables ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
Configuration
Environment Variables
| Variable | Default | Description |
|---|---|---|
JOB_RUNNER_TYPE |
pgboss |
Job runner backend: pgboss or temporal |
JOB_RUNNER_FALLBACK_TO_PGBOSS |
true |
Fall back to PG Boss if Temporal is unavailable |
TEMPORAL_ADDRESS |
temporal-frontend.temporal.svc.cluster.local:7233 |
Temporal server address |
TEMPORAL_NAMESPACE |
default |
Temporal namespace |
TEMPORAL_JOB_TASK_QUEUE |
alga-jobs |
Task queue for generic job workflows |
Community Edition (PG Boss)
For CE deployments, PG Boss is used automatically. No additional configuration is needed beyond the existing database connection settings.
# .env for CE
JOB_RUNNER_TYPE=pgboss
Enterprise Edition (Temporal)
For EE deployments, configure Temporal as the job runner:
# .env for EE
JOB_RUNNER_TYPE=temporal
TEMPORAL_ADDRESS=temporal-frontend.temporal.svc.cluster.local:7233
TEMPORAL_NAMESPACE=default
TEMPORAL_JOB_TASK_QUEUE=alga-jobs
JOB_RUNNER_FALLBACK_TO_PGBOSS=true
Job Handler Registry
The JobHandlerRegistry provides centralized registration and lookup of job handlers. This is essential for the Temporal worker, which runs as a separate process from the Next.js server.
Registering Handlers
All job handlers are registered via registerAllJobHandlers():
import { registerAllJobHandlers } from 'server/src/lib/jobs';
// During application startup
await registerAllJobHandlers({
jobService: myJobService,
storageService: myStorageService,
includeEnterprise: process.env.EDITION === 'enterprise',
});
Adding a New Handler
- Create your handler in
server/src/lib/jobs/handlers/:
// server/src/lib/jobs/handlers/myNewHandler.ts
import { BaseJobData } from '../interfaces';
export interface MyNewJobData extends BaseJobData {
tenantId: string;
customField: string;
}
export async function myNewHandler(data: MyNewJobData): Promise<void> {
// Your job logic here
}
- Register it in
registerAllHandlers.ts:
import { myNewHandler, MyNewJobData } from './handlers/myNewHandler';
// In registerAllJobHandlers function:
JobHandlerRegistry.register<MyNewJobData & BaseJobData>(
{
name: 'my-new-job',
handler: async (_jobId, data) => {
await myNewHandler(data);
},
retry: { maxAttempts: 3 },
timeoutMs: 300000, // 5 minutes
},
registerOpts
);
- Add it to
getAvailableJobHandlers()for documentation.
Temporal Worker Configuration
The Temporal worker handles generic job workflows alongside specialized workflows (tenant creation, portal domains, etc.).
Task Queues
The worker listens on multiple task queues:
| Queue | Purpose |
|---|---|
tenant-workflows |
Tenant creation and management |
portal-domain-workflows |
Portal domain provisioning |
email-domain-workflows |
Email domain configuration |
alga-jobs |
Generic job execution via genericJobWorkflow |
Worker Startup
The worker initializes job handlers before starting:
// In worker main()
await initializeJobHandlersForWorker();
This loads all job handlers into the registry so the executeJobHandler activity can find and invoke them.
Activity Configuration
Generic job activities use these default timeouts:
| Setting | Value |
|---|---|
startToCloseTimeout |
10 minutes |
maximumAttempts |
3 |
initialInterval |
1 second |
maximumInterval |
30 seconds |
backoffCoefficient |
2.0 |
Generic Job Workflow
The genericJobWorkflow wraps any registered job handler, providing Temporal's durability features.
Workflow Interface
interface GenericJobInput {
jobId: string; // Our database job ID
jobName: string; // Handler name (e.g., 'generate-invoice')
tenantId: string; // Tenant context
data: Record<string, unknown>; // Job-specific data
}
interface GenericJobResult {
success: boolean;
jobId: string;
error?: string;
result?: Record<string, unknown>;
completedAt: string;
}
Signals
| Signal | Purpose |
|---|---|
cancelJob |
Cancel a running job with a reason |
updateProgress |
Update job progress (0-100) |
Queries
| Query | Returns |
|---|---|
getJobState |
Current workflow state (step, progress, errors) |
Example: Canceling a Job
import { Client } from '@temporalio/client';
const client = new Client({ /* connection config */ });
const handle = client.workflow.getHandle(workflowId);
await handle.signal('cancelJob', {
reason: 'User requested cancellation',
cancelledBy: 'user@example.com',
});
Database Schema
jobs Table Extensions
The abstraction adds these columns to the jobs table:
| Column | Type | Description |
|---|---|---|
runner_type |
VARCHAR | 'pgboss' or 'temporal' |
external_id |
VARCHAR | PG Boss job ID or Temporal workflow ID |
external_run_id |
VARCHAR | Temporal run ID (for workflow history) |
Querying Jobs by Runner
-- Find all Temporal-executed jobs
SELECT * FROM jobs WHERE runner_type = 'temporal';
-- Find a job by Temporal workflow ID
SELECT * FROM jobs WHERE external_id = 'my-workflow-id';
Registered Job Handlers
| Job Name | Description | Timeout |
|---|---|---|
generate-invoice |
Generate invoices for billing cycles | 5 min |
invoice_zip |
Create ZIP archives of invoices | 10 min |
invoice_email |
Send invoice emails | default |
expired-credits |
Mark expired credits | default |
expiring-credits-notification |
Send expiration notifications | default |
credit-reconciliation |
Reconcile credit balances | default |
asset_import |
Process asset import batches | 10 min |
reconcile-bucket-usage |
Reconcile usage records | default |
cleanup-temporary-workflow-forms |
Clean up temporary forms | default |
renew-microsoft-calendar-webhooks |
Renew MS calendar subscriptions | default |
verify-google-calendar-pubsub |
Verify Google calendar setup | default |
cleanup-ai-session-keys |
Clean up AI sessions (EE only) | default |
Troubleshooting
Job Handler Not Found
If you see "No handler registered for job: X":
- Verify the handler is registered in
registerAllHandlers.ts - Check that
registerAllJobHandlers()was called before job execution - For Temporal worker: ensure
initializeJobHandlersForWorker()was called at startup
Temporal Connection Issues
If jobs fail to schedule in EE mode:
- Check
TEMPORAL_ADDRESSis correct and accessible - Verify Temporal server is running:
kubectl get pods -n temporal - Check worker logs for connection errors
- If
JOB_RUNNER_FALLBACK_TO_PGBOSS=true, jobs will fall back to PG Boss
Database Sync Issues
If job status in database doesn't match Temporal:
- Check Temporal worker logs for
updateJobStatusactivity failures - Verify database connectivity from the worker
- Check for any retry exhaustion in the activity
Missing Jobs in Dashboard
- Verify
runner_typecolumn exists (run migrations) - Check that jobs are being created via
JobService.createJob() - For Temporal jobs, verify the workflow is reaching the
updateJobStatusactivity
Best Practices
Handler Design
- Idempotency: Design handlers to be safely re-runnable
- Timeouts: Set appropriate
timeoutMsfor long-running jobs - Tenant Isolation: Always include
tenantIdin job data - Error Handling: Throw errors for retryable failures; log and complete for non-retryable
Scheduling Jobs
import { JobRunnerFactory } from 'server/src/lib/jobs';
// Get the job runner
const runner = await JobRunnerFactory.getInstance();
// Schedule an immediate job
const result = await runner.scheduleJob('generate-invoice', {
tenantId: 'tenant-123',
billingCycleId: 'cycle-456',
});
// Schedule a job for later
await runner.scheduleJobAt('cleanup-task', {
tenantId: 'tenant-123',
}, new Date('2024-12-01T00:00:00Z'));
// Schedule a recurring job
await runner.scheduleRecurringJob('daily-report', {
tenantId: 'tenant-123',
}, '0 0 * * *'); // Daily at midnight