Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
485 lines
18 KiB
TypeScript
485 lines
18 KiB
TypeScript
import type { Knex } from 'knex';
|
|
import type {
|
|
NormalizedRmmAlertEvent,
|
|
RmmAlertProcessingContext,
|
|
RmmAlertProcessingResult,
|
|
RmmAlertRuleActions,
|
|
RmmAlertRuleRow,
|
|
RmmMaintenanceWindowRow,
|
|
} from './contracts';
|
|
import { rmmAlertRuleActionsSchema } from './contracts';
|
|
import { computeDedupKey } from './dedupKey';
|
|
import { evaluateAlertRules } from './ruleEvaluator';
|
|
import { findMatchingWindow } from './windowMatcher';
|
|
import { addAlertInternalNote, createTicketForAlert, providerLabel } from './ticketCreator';
|
|
import { isTicketUntouched } from './untouched';
|
|
|
|
/**
|
|
* Single entry point for normalized RMM alert events (webhooks and the
|
|
* reconciliation poller). All ingest work is local DB writes inside one
|
|
* transaction; workflow events and notifications publish after commit.
|
|
* Replayed deliveries are no-ops, so at-least-once sources are safe.
|
|
*/
|
|
export interface ProcessRmmAlertEventOptions {
|
|
/**
|
|
* Reconciliation passes this so a still-active suppressed alert re-enters
|
|
* the pipeline once its maintenance window has ended (the window check runs
|
|
* again against the event's occurredAt).
|
|
*/
|
|
reprocessSuppressed?: boolean;
|
|
}
|
|
|
|
export async function processRmmAlertEvent(
|
|
ctx: RmmAlertProcessingContext,
|
|
event: NormalizedRmmAlertEvent,
|
|
options: ProcessRmmAlertEventOptions = {}
|
|
): Promise<RmmAlertProcessingResult> {
|
|
switch (event.kind) {
|
|
case 'triggered':
|
|
return processTriggered(ctx, event, options);
|
|
case 'reset':
|
|
return processReset(ctx, event);
|
|
case 'acknowledged':
|
|
return processAcknowledged(ctx, event);
|
|
}
|
|
}
|
|
|
|
interface ResolvedAlertContext {
|
|
assetId: string | null;
|
|
clientId: string | null;
|
|
organizationName: string | null;
|
|
}
|
|
|
|
async function resolveAlertContext(knex: Knex, event: NormalizedRmmAlertEvent): Promise<ResolvedAlertContext> {
|
|
let assetId: string | null = null;
|
|
let clientId: string | null = null;
|
|
let organizationName: string | null = null;
|
|
|
|
if (event.externalDeviceId) {
|
|
const mapping = await knex('tenant_external_entity_mappings')
|
|
.where({
|
|
tenant: event.tenantId,
|
|
integration_type: event.provider,
|
|
alga_entity_type: 'asset',
|
|
external_entity_id: event.externalDeviceId,
|
|
})
|
|
.first('alga_entity_id');
|
|
if (mapping?.alga_entity_id) {
|
|
const asset = await knex('assets')
|
|
.where({ tenant: event.tenantId, asset_id: mapping.alga_entity_id })
|
|
.first('asset_id', 'client_id');
|
|
if (asset) {
|
|
assetId = asset.asset_id;
|
|
clientId = asset.client_id ?? null;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (event.externalOrganizationId) {
|
|
const orgMapping = await knex('rmm_organization_mappings')
|
|
.where({
|
|
tenant: event.tenantId,
|
|
integration_id: event.integrationId,
|
|
external_organization_id: event.externalOrganizationId,
|
|
})
|
|
.first('client_id', 'external_organization_name');
|
|
organizationName = orgMapping?.external_organization_name ?? null;
|
|
if (!clientId) clientId = orgMapping?.client_id ?? null;
|
|
}
|
|
|
|
return { assetId, clientId, organizationName };
|
|
}
|
|
|
|
async function processTriggered(
|
|
ctx: RmmAlertProcessingContext,
|
|
event: NormalizedRmmAlertEvent,
|
|
options: ProcessRmmAlertEventOptions = {}
|
|
): Promise<RmmAlertProcessingResult> {
|
|
const { knex, deps } = ctx;
|
|
const warnings: string[] = [];
|
|
const context = await resolveAlertContext(knex, event);
|
|
const dedupKey = computeDedupKey(event);
|
|
|
|
const result = await knex.transaction(async (trx): Promise<RmmAlertProcessingResult> => {
|
|
const existing = await trx('rmm_alerts')
|
|
.where({
|
|
tenant: event.tenantId,
|
|
integration_id: event.integrationId,
|
|
external_alert_id: event.externalAlertId,
|
|
})
|
|
.first('alert_id', 'status', 'ticket_id');
|
|
|
|
// Same external alert in a live state again = redelivery; nothing to do.
|
|
// Suppressed alerts fall through when reconciliation reprocesses them.
|
|
const duplicateStatuses = options.reprocessSuppressed
|
|
? ['active', 'acknowledged']
|
|
: ['active', 'acknowledged', 'suppressed'];
|
|
if (existing && duplicateStatuses.includes(existing.status)) {
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: existing.alert_id })
|
|
.update({ updated_at: new Date().toISOString() });
|
|
return {
|
|
outcome: 'skipped',
|
|
alertId: existing.alert_id,
|
|
ticketId: existing.ticket_id ?? null,
|
|
warnings,
|
|
};
|
|
}
|
|
|
|
const baseRow = {
|
|
tenant: event.tenantId,
|
|
integration_id: event.integrationId,
|
|
external_alert_id: event.externalAlertId,
|
|
external_device_id: event.externalDeviceId ?? null,
|
|
asset_id: context.assetId,
|
|
severity: event.severity,
|
|
status: 'active',
|
|
source_type: event.sourceType ?? null,
|
|
alert_class: event.alertClass ?? null,
|
|
activity_type: event.activityType ?? null,
|
|
message: event.message ?? null,
|
|
device_name: event.deviceName ?? null,
|
|
dedup_key: dedupKey,
|
|
triggered_at: event.occurredAt,
|
|
last_occurrence_at: event.occurredAt,
|
|
metadata: JSON.stringify(event.raw),
|
|
updated_at: new Date().toISOString(),
|
|
};
|
|
|
|
let alertId: string;
|
|
if (existing) {
|
|
// A previously-resolved alert id re-triggering re-enters the pipeline.
|
|
alertId = existing.alert_id;
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: alertId })
|
|
.update({ ...baseRow, ticket_id: null, resolved_at: null, suppressed_by_window_id: null });
|
|
} else {
|
|
const inserted = await trx('rmm_alerts')
|
|
.insert({ ...baseRow, created_at: new Date().toISOString() })
|
|
.returning(['alert_id']);
|
|
alertId = inserted[0].alert_id;
|
|
}
|
|
|
|
// Maintenance windows suppress before any rule work.
|
|
const windows = (await trx('rmm_maintenance_windows')
|
|
.where({ tenant: event.tenantId, is_active: true })) as RmmMaintenanceWindowRow[];
|
|
const matchedWindow = findMatchingWindow(windows, {
|
|
integrationId: event.integrationId,
|
|
clientId: context.clientId,
|
|
assetId: context.assetId,
|
|
occurredAt: event.occurredAt,
|
|
});
|
|
if (matchedWindow) {
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: alertId })
|
|
.update({ status: 'suppressed', suppressed_by_window_id: matchedWindow.window_id });
|
|
return {
|
|
outcome: 'suppressed',
|
|
alertId,
|
|
suppressedByWindowId: matchedWindow.window_id,
|
|
warnings,
|
|
};
|
|
}
|
|
|
|
const rules = (await trx('rmm_alert_rules')
|
|
.where({ tenant: event.tenantId, integration_id: event.integrationId, is_active: true })
|
|
.orderBy('priority_order', 'asc')) as RmmAlertRuleRow[];
|
|
const evaluation = evaluateAlertRules(rules, event);
|
|
warnings.push(...evaluation.warnings);
|
|
|
|
if (evaluation.rule) {
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: alertId })
|
|
.update({ matched_rule_id: evaluation.rule.rule_id });
|
|
}
|
|
|
|
const actions = parseActions(evaluation.rule, warnings);
|
|
if (!evaluation.rule || !actions || !actions.createTicket) {
|
|
return { outcome: 'recorded_only', alertId, matchedRuleId: evaluation.rule?.rule_id ?? null, warnings };
|
|
}
|
|
|
|
// Dedup: an open ticket for the same (device, condition) absorbs this alert.
|
|
const sibling = await trx('rmm_alerts as a')
|
|
.join('tickets as t', function joinTickets() {
|
|
this.on('t.tenant', 'a.tenant').andOn('t.ticket_id', 'a.ticket_id');
|
|
})
|
|
.join('statuses as s', function joinStatuses() {
|
|
this.on('s.tenant', 't.tenant').andOn('s.status_id', 't.status_id');
|
|
})
|
|
.where('a.tenant', event.tenantId)
|
|
.andWhere('a.integration_id', event.integrationId)
|
|
.andWhere('a.dedup_key', dedupKey)
|
|
.andWhereNot('a.alert_id', alertId)
|
|
.whereNotNull('a.ticket_id')
|
|
.andWhere('s.is_closed', false)
|
|
// Oldest sibling = the row that created the ticket; it carries the
|
|
// authoritative occurrence_count (newer siblings are absorbed copies).
|
|
.orderBy('a.created_at', 'asc')
|
|
.first('a.alert_id as sibling_alert_id', 'a.ticket_id', 'a.occurrence_count');
|
|
|
|
if (sibling?.ticket_id) {
|
|
const occurrence = Number(sibling.occurrence_count ?? 1) + 1;
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: alertId })
|
|
.update({ ticket_id: sibling.ticket_id });
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: sibling.sibling_alert_id })
|
|
.update({ occurrence_count: occurrence, last_occurrence_at: event.occurredAt });
|
|
await addAlertInternalNote(
|
|
trx,
|
|
event.tenantId,
|
|
sibling.ticket_id,
|
|
`Alert re-triggered — occurrence ${occurrence}.\nExternal alert ID: ${event.externalAlertId}\n${event.message ?? ''}`.trim()
|
|
);
|
|
return {
|
|
outcome: 'occurrence_appended',
|
|
alertId,
|
|
ticketId: sibling.ticket_id,
|
|
matchedRuleId: evaluation.rule.rule_id,
|
|
warnings,
|
|
};
|
|
}
|
|
|
|
if (!context.clientId) {
|
|
warnings.push('No client resolvable for alert (unmapped asset and organization); alert recorded without ticket');
|
|
return { outcome: 'recorded_only', alertId, matchedRuleId: evaluation.rule.rule_id, warnings };
|
|
}
|
|
|
|
const ticket = await createTicketForAlert(trx, {
|
|
event,
|
|
actions,
|
|
clientId: context.clientId,
|
|
assetId: context.assetId,
|
|
organizationName: context.organizationName,
|
|
});
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: alertId })
|
|
.update({ ticket_id: ticket.ticket_id, auto_ticket_created: true });
|
|
|
|
return {
|
|
outcome: 'ticket_created',
|
|
alertId,
|
|
ticketId: ticket.ticket_id,
|
|
matchedRuleId: evaluation.rule.rule_id,
|
|
warnings,
|
|
};
|
|
});
|
|
|
|
if (result.outcome !== 'suppressed' && result.outcome !== 'skipped') {
|
|
await publishSafely(ctx, 'RMM_ALERT_TRIGGERED', event, result, context.assetId);
|
|
if (result.outcome === 'ticket_created' && result.matchedRuleId) {
|
|
await notifySafely(ctx, event, result, context.assetId);
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async function processReset(
|
|
ctx: RmmAlertProcessingContext,
|
|
event: NormalizedRmmAlertEvent
|
|
): Promise<RmmAlertProcessingResult> {
|
|
const { knex } = ctx;
|
|
const warnings: string[] = [];
|
|
let resolvedQuietly = false;
|
|
let resolvedAssetId: string | null = null;
|
|
|
|
const result = await knex.transaction(async (trx): Promise<RmmAlertProcessingResult> => {
|
|
const existing = await trx('rmm_alerts')
|
|
.where({
|
|
tenant: event.tenantId,
|
|
integration_id: event.integrationId,
|
|
external_alert_id: event.externalAlertId,
|
|
})
|
|
.first('alert_id', 'status', 'ticket_id', 'matched_rule_id', 'asset_id');
|
|
|
|
if (!existing || existing.status === 'resolved' || existing.status === 'auto_resolved') {
|
|
return { outcome: 'skipped', alertId: existing?.alert_id, warnings };
|
|
}
|
|
resolvedAssetId = existing.asset_id ?? null;
|
|
|
|
const wasSuppressed = existing.status === 'suppressed';
|
|
await trx('rmm_alerts').where({ tenant: event.tenantId, alert_id: existing.alert_id }).update({
|
|
status: 'resolved',
|
|
resolved_at: event.occurredAt,
|
|
updated_at: new Date().toISOString(),
|
|
});
|
|
|
|
// Suppressed alerts resolve quietly: no ticket exists and no events fire.
|
|
if (wasSuppressed) {
|
|
resolvedQuietly = true;
|
|
return { outcome: 'resolved', alertId: existing.alert_id, warnings };
|
|
}
|
|
|
|
if (existing.ticket_id && existing.matched_rule_id) {
|
|
const rule = (await trx('rmm_alert_rules')
|
|
.where({ tenant: event.tenantId, rule_id: existing.matched_rule_id })
|
|
.first()) as RmmAlertRuleRow | undefined;
|
|
const actions = parseActions(rule ?? null, warnings);
|
|
if (actions?.autoResolveTicket) {
|
|
await addAlertInternalNote(
|
|
trx,
|
|
event.tenantId,
|
|
existing.ticket_id,
|
|
`Alert resolved in ${providerLabel(event.provider)}.\nExternal alert ID: ${event.externalAlertId}`
|
|
);
|
|
if (await isTicketUntouched(trx, event.tenantId, existing.ticket_id)) {
|
|
const statusId = await resolveCloseStatusId(trx, event.tenantId, actions, existing.ticket_id);
|
|
if (statusId) {
|
|
await trx('tickets')
|
|
.where({ tenant: event.tenantId, ticket_id: existing.ticket_id })
|
|
.update({ status_id: statusId, updated_at: new Date().toISOString() });
|
|
await trx('rmm_alerts')
|
|
.where({ tenant: event.tenantId, alert_id: existing.alert_id })
|
|
.update({ status: 'auto_resolved' });
|
|
await addAlertInternalNote(
|
|
trx,
|
|
event.tenantId,
|
|
existing.ticket_id,
|
|
'Ticket closed automatically: the alert resolved and the ticket had no human activity.'
|
|
);
|
|
} else {
|
|
warnings.push('No closed status available for auto-resolution; ticket left open');
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return { outcome: 'resolved', alertId: existing.alert_id, ticketId: existing.ticket_id ?? null, warnings };
|
|
});
|
|
|
|
if (result.outcome === 'resolved' && !resolvedQuietly) {
|
|
await publishSafely(ctx, 'RMM_ALERT_RESOLVED', event, result, resolvedAssetId);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
async function processAcknowledged(
|
|
ctx: RmmAlertProcessingContext,
|
|
event: NormalizedRmmAlertEvent
|
|
): Promise<RmmAlertProcessingResult> {
|
|
const { knex } = ctx;
|
|
const updated = await knex('rmm_alerts')
|
|
.where({
|
|
tenant: event.tenantId,
|
|
integration_id: event.integrationId,
|
|
external_alert_id: event.externalAlertId,
|
|
status: 'active',
|
|
})
|
|
.update({
|
|
status: 'acknowledged',
|
|
acknowledged_at: event.occurredAt,
|
|
updated_at: new Date().toISOString(),
|
|
});
|
|
return { outcome: updated > 0 ? 'acknowledged' : 'skipped', warnings: [] };
|
|
}
|
|
|
|
function parseActions(rule: RmmAlertRuleRow | null, warnings: string[]): RmmAlertRuleActions | null {
|
|
if (!rule) return null;
|
|
const raw = typeof rule.actions === 'string' ? safeJsonParse(rule.actions) : rule.actions;
|
|
const parsed = rmmAlertRuleActionsSchema.safeParse(raw ?? {});
|
|
if (!parsed.success) {
|
|
warnings.push(`Rule ${rule.rule_id} has invalid actions; treated as record-only`);
|
|
return null;
|
|
}
|
|
return parsed.data;
|
|
}
|
|
|
|
function safeJsonParse(value: string): unknown {
|
|
try {
|
|
return JSON.parse(value);
|
|
} catch {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
async function resolveCloseStatusId(
|
|
trx: Knex.Transaction,
|
|
tenantId: string,
|
|
actions: RmmAlertRuleActions,
|
|
ticketId: string
|
|
): Promise<string | null> {
|
|
if (actions.autoResolveStatusId) return actions.autoResolveStatusId;
|
|
// Statuses are board-scoped (statuses.status_type/board_id); prefer the
|
|
// ticket's own board, falling back to any closed ticket status.
|
|
const ticket = await trx('tickets')
|
|
.where({ tenant: tenantId, ticket_id: ticketId })
|
|
.first('board_id');
|
|
const closedOnBoard = ticket?.board_id
|
|
? await trx('statuses')
|
|
.where({ tenant: tenantId, status_type: 'ticket', is_closed: true, board_id: ticket.board_id })
|
|
.orderBy('order_number', 'asc')
|
|
.first('status_id')
|
|
: null;
|
|
if (closedOnBoard?.status_id) return closedOnBoard.status_id;
|
|
const closed = await trx('statuses')
|
|
.where({ tenant: tenantId, status_type: 'ticket', is_closed: true })
|
|
.orderBy('order_number', 'asc')
|
|
.first('status_id');
|
|
return closed?.status_id ?? null;
|
|
}
|
|
|
|
async function publishSafely(
|
|
ctx: RmmAlertProcessingContext,
|
|
eventType: 'RMM_ALERT_TRIGGERED' | 'RMM_ALERT_RESOLVED',
|
|
event: NormalizedRmmAlertEvent,
|
|
result: RmmAlertProcessingResult,
|
|
assetId?: string | null
|
|
): Promise<void> {
|
|
if (!ctx.deps?.publishWorkflowEvent) return;
|
|
try {
|
|
// Shape matches RmmAlertEventPayloadSchema: optional fields are omitted,
|
|
// never null.
|
|
const payload: Record<string, unknown> = {
|
|
tenantId: event.tenantId,
|
|
integrationId: event.integrationId,
|
|
provider: event.provider,
|
|
alertId: result.alertId,
|
|
externalAlertId: event.externalAlertId,
|
|
severity: event.severity,
|
|
};
|
|
if (event.externalDeviceId) payload.externalDeviceId = event.externalDeviceId;
|
|
if (assetId) payload.assetId = assetId;
|
|
if (result.ticketId) payload.ticketId = result.ticketId;
|
|
if (event.message) payload.message = event.message;
|
|
if (event.sourceType) payload.sourceType = event.sourceType;
|
|
if (event.alertClass) payload.alertClass = event.alertClass;
|
|
if (eventType === 'RMM_ALERT_TRIGGERED') payload.triggeredAt = event.occurredAt;
|
|
if (eventType === 'RMM_ALERT_RESOLVED') payload.resolvedAt = event.occurredAt;
|
|
|
|
await ctx.deps.publishWorkflowEvent({ eventType, tenantId: event.tenantId, payload });
|
|
} catch (error) {
|
|
ctx.deps?.logger?.warn?.(`[rmm-alerts] Failed to publish ${eventType}: ${String(error)}`);
|
|
}
|
|
}
|
|
|
|
async function notifySafely(
|
|
ctx: RmmAlertProcessingContext,
|
|
event: NormalizedRmmAlertEvent,
|
|
result: RmmAlertProcessingResult,
|
|
assetId: string | null
|
|
): Promise<void> {
|
|
if (!ctx.deps?.notifyUsers || !result.matchedRuleId) return;
|
|
try {
|
|
const rule = (await ctx.knex('rmm_alert_rules')
|
|
.where({ tenant: event.tenantId, rule_id: result.matchedRuleId })
|
|
.first()) as RmmAlertRuleRow | undefined;
|
|
const actions = rule ? parseActions(rule, []) : null;
|
|
if (!actions?.notifyUserIds?.length) return;
|
|
await ctx.deps.notifyUsers({
|
|
tenantId: event.tenantId,
|
|
userIds: actions.notifyUserIds,
|
|
alert: {
|
|
alertId: result.alertId!,
|
|
message: event.message,
|
|
severity: event.severity,
|
|
assetId,
|
|
ticketId: result.ticketId ?? null,
|
|
},
|
|
});
|
|
} catch (error) {
|
|
ctx.deps?.logger?.warn?.(`[rmm-alerts] Failed to send alert notifications: ${String(error)}`);
|
|
}
|
|
}
|