PSA/docs/architecture/job_scheduler.md
Hermes 284313f908
Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Initial import of AlgaPSA codebase from PSA server
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz

Source: /opt/alga-psa on psa.joliet.tech
2026-06-22 16:12:17 -05:00

13 KiB

Job Scheduler System

Overview

The job scheduler system provides a robust and scalable solution for managing background tasks and scheduled jobs. It supports two execution backends:

  • PG Boss (Community Edition) - PostgreSQL-based job queue
  • Temporal (Enterprise Edition) - Workflow orchestration with enhanced durability

Both backends write to the same database tables (jobs and job_details), providing a unified monitoring experience regardless of the underlying execution engine.

Key Features

  • Immediate job execution
  • Scheduled jobs with specific run times
  • Recurring jobs with cron-like syntax
  • Job monitoring and metrics
  • Automatic retries with backoff
  • Job history tracking
  • Edition-based backend selection (CE: PG Boss, EE: Temporal)
  • Unified job monitoring dashboard

Architecture

Community Edition (PG Boss)

┌─────────────────────────────────────────────────────────────────┐
│                      Next.js Application                         │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   JobRunnerFactory                           ││
│  │              Creates PgBossJobRunner                         ││
│  └──────────────────────────────┬──────────────────────────────┘│
│                                 │                                │
│                                 ▼                                │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   PgBossJobRunner                            ││
│  │  - Wraps JobScheduler (pg-boss)                             ││
│  │  - Writes to jobs/job_details tables                        ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

Enterprise Edition (Temporal)

┌─────────────────────────────────────────────────────────────────┐
│                      Next.js Application                         │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   JobRunnerFactory                           ││
│  │             Creates TemporalJobRunner                        ││
│  └──────────────────────────────┬──────────────────────────────┘│
│                                 │                                │
│                                 ▼                                │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                   TemporalJobRunner                          ││
│  │  - Schedules Temporal workflows                             ││
│  │  - Writes to jobs/job_details tables                        ││
│  └──────────────────────────────┬──────────────────────────────┘│
│                                 │                                │
│                                 ▼                                │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │               Temporal Worker (separate process)             ││
│  │  - Executes genericJobWorkflow                              ││
│  │  - Updates job status in database                           ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

Configuration

Environment Variables

Variable Default Description
JOB_RUNNER_TYPE pgboss Backend: pgboss or temporal
JOB_RUNNER_FALLBACK_TO_PGBOSS true Fall back to PG Boss if Temporal unavailable

Temporal-specific (EE only)

Variable Default Description
TEMPORAL_ADDRESS temporal-frontend... Temporal server address
TEMPORAL_NAMESPACE default Temporal namespace
TEMPORAL_JOB_TASK_QUEUE alga-jobs Task queue for jobs

Usage

import { JobRunnerFactory } from 'server/src/lib/jobs';

// Get the singleton job runner instance
const runner = await JobRunnerFactory.getInstance();

// Schedule an immediate job
await runner.scheduleJob('generate-invoice', {
  tenantId: 'tenant-123',
  billingCycleId: 'cycle-456',
});

// Schedule a job for later
await runner.scheduleJobAt('send-reminder', {
  tenantId: 'tenant-123',
  userId: 'user-789',
}, new Date('2024-12-01T09:00:00Z'));

// Schedule a recurring job
await runner.scheduleRecurringJob('daily-cleanup', {
  tenantId: 'tenant-123',
}, '0 0 * * *'); // Daily at midnight

Legacy API (PG Boss Direct)

The legacy JobScheduler API is still available for backward compatibility:

import { JobScheduler } from 'server/src/lib/jobs';

const scheduler = await JobScheduler.getInstance();

// Immediate job
await scheduler.scheduleImmediateJob('process-order', { orderId: 123 });

// Scheduled job
const runAt = new Date(Date.now() + 3600 * 1000);
await scheduler.scheduleScheduledJob('send-reminder', runAt, { userId: 456 });

// Recurring job
await scheduler.scheduleRecurringJob('daily-report', '0 0 * * *', {});

Registered Job Handlers

Job Name Description Timeout
generate-invoice Generate invoices for billing cycles 5 min
invoice_zip Create ZIP archives of invoices 10 min
invoice_email Send invoice emails default
expired-credits Mark expired credits default
expiring-credits-notification Send expiration notifications default
credit-reconciliation Reconcile credit balances default
asset_import Process asset import batches 10 min
reconcile-bucket-usage Reconcile usage records default
cleanup-temporary-workflow-forms Clean up temporary forms default
renew-microsoft-calendar-webhooks Renew MS calendar subscriptions default
verify-google-calendar-pubsub Verify Google calendar setup default
cleanup-ai-session-keys Clean up AI sessions (EE only) default
createClientContractLineCycles Create billing cycles default
createNextTimePeriods Create time periods default

Adding a New Job Handler

1. Create the Handler

// server/src/lib/jobs/handlers/myNewHandler.ts
import { BaseJobData } from '../interfaces';

export interface MyJobData extends BaseJobData {
  tenantId: string;
  customField: string;
}

export async function myNewHandler(data: MyJobData): Promise<void> {
  // Your job logic here
  console.log('Processing job for tenant:', data.tenantId);
}

2. Register the Handler

Add to server/src/lib/jobs/registerAllHandlers.ts:

import { myNewHandler, MyJobData } from './handlers/myNewHandler';

// Inside registerAllJobHandlers():
JobHandlerRegistry.register<MyJobData & BaseJobData>(
  {
    name: 'my-new-job',
    handler: async (_jobId, data) => {
      await myNewHandler(data);
    },
    retry: { maxAttempts: 3 },
    timeoutMs: 300000, // 5 minutes
  },
  registerOpts
);

3. Schedule the Job

const runner = await JobRunnerFactory.getInstance();
await runner.scheduleJob('my-new-job', {
  tenantId: 'tenant-123',
  customField: 'value',
});

Monitoring and Metrics

Dashboard

The job monitoring dashboard is available at /msp/jobs and displays:

  • Real-time job metrics (active, queued, completed, failed)
  • Job history with filtering
  • Job details and error inspection
  • Runner type indicator (PG Boss vs Temporal)

Programmatic Access

const runner = await JobRunnerFactory.getInstance();

// Get job status
const status = await runner.getJobStatus('job-id', 'tenant-id');
// { status: 'completed', progress: 100, metadata: {...} }

// Check health
const healthy = await runner.isHealthy();

Database Queries

-- Get all jobs for a tenant
SELECT * FROM jobs WHERE tenant = 'tenant-id' ORDER BY created_at DESC;

-- Get job details/steps
SELECT * FROM job_details WHERE job_id = 'job-id' ORDER BY processed_at;

-- Find jobs by runner type
SELECT * FROM jobs WHERE runner_type = 'temporal';

-- Find job by Temporal workflow ID
SELECT * FROM jobs WHERE external_id = 'workflow-id';

Error Handling

The system implements:

  • Automatic retries (3 attempts by default)
  • Exponential backoff between retries
  • Error logging and monitoring
  • Manual job cancellation
  • Graceful degradation (Temporal → PG Boss fallback)

Retry Configuration

JobHandlerRegistry.register({
  name: 'my-job',
  handler: async (jobId, data) => { /* ... */ },
  retry: {
    maxAttempts: 5,
    backoffCoefficient: 2.0,
    initialIntervalMs: 1000,
    maxIntervalMs: 60000,
  },
  timeoutMs: 600000,
});

Database Schema

jobs Table

CREATE TABLE jobs (
  tenant UUID NOT NULL,
  job_id UUID DEFAULT gen_random_uuid() NOT NULL,
  type VARCHAR NOT NULL,
  metadata JSONB,
  status job_status NOT NULL,  -- pending, processing, completed, failed, active, queued
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP,
  user_id UUID NOT NULL,
  runner_type VARCHAR DEFAULT 'pgboss' NOT NULL,  -- 'pgboss' or 'temporal'
  external_id VARCHAR,          -- PG Boss job ID or Temporal workflow ID
  external_run_id VARCHAR,      -- Temporal run ID
  PRIMARY KEY (tenant, job_id)
);

job_details Table

CREATE TABLE job_details (
  tenant UUID NOT NULL,
  detail_id UUID DEFAULT gen_random_uuid() NOT NULL,
  job_id UUID NOT NULL,
  step_name VARCHAR NOT NULL,
  status job_status NOT NULL,
  result JSONB,
  processed_at TIMESTAMP,
  retry_count INTEGER DEFAULT 0,
  metadata JSONB,
  PRIMARY KEY (tenant, detail_id),
  FOREIGN KEY (tenant, job_id) REFERENCES jobs(tenant, job_id)
);

Temporal-Specific Features (EE)

When using Temporal as the backend, additional features are available:

Workflow Signals

import { Client } from '@temporalio/client';

const client = new Client({ /* ... */ });
const handle = client.workflow.getHandle(workflowId);

// Cancel a job
await handle.signal('cancelJob', {
  reason: 'User requested cancellation',
  cancelledBy: 'admin@example.com',
});

// Update progress
await handle.signal('updateProgress', {
  progress: 50,
  message: 'Halfway done',
});

Workflow Queries

// Get current job state
const state = await handle.query('getJobState');
// { step: 'executing', progress: 50, startedAt: '...' }

Troubleshooting

Job Handler Not Found

If you see "No handler registered for job: X":

  1. Verify the handler is registered in registerAllHandlers.ts
  2. Check that registerAllJobHandlers() was called at startup
  3. For Temporal: ensure initializeJobHandlersForWorker() was called

Jobs Not Processing

  1. Check PG Boss is connected: verify database connection
  2. Check Temporal worker is running: kubectl get pods -l app=temporal-worker
  3. Check task queues match between scheduler and worker

Temporal Connection Issues

  1. Verify TEMPORAL_ADDRESS is correct
  2. Check Temporal server is running
  3. If JOB_RUNNER_FALLBACK_TO_PGBOSS=true, jobs will use PG Boss

See Also