Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
1742 lines
56 KiB
TypeScript
1742 lines
56 KiB
TypeScript
import type { EmailMessageDetails } from '../../interfaces/inbound-email.interfaces';
|
|
import { createHash } from 'node:crypto';
|
|
import { convertHtmlToBlockNote, convertMarkdownToBlocks } from '../../lib/utils/contentConversion';
|
|
import { extractEmailDomain, normalizeEmailAddress } from '../../lib/email/addressUtils';
|
|
import {
|
|
processInboundEmailArtifactsBestEffort,
|
|
type ProcessInboundEmailArtifactsResult,
|
|
} from './processInboundEmailArtifacts';
|
|
import {
|
|
buildInboundWatchListRecipients,
|
|
mergeTicketWatchListRecipients,
|
|
setTicketWatchListOnAttributes,
|
|
type TicketWatchListRecipientInput,
|
|
} from '../../lib/tickets/watchList';
|
|
import {
|
|
resolveInboundReplyAcknowledgementDecider,
|
|
type InboundReplyAckDeciderResult,
|
|
} from './inboundReplyAcknowledgementDecider';
|
|
import { evaluateInboundEmailRules } from './inboundEmailRules';
|
|
|
|
export interface ProcessInboundEmailInAppInput {
|
|
tenantId: string;
|
|
providerId: string;
|
|
emailData: EmailMessageDetails;
|
|
}
|
|
|
|
export interface ProcessInboundEmailInAppOptions {
|
|
collectDiagnostics?: boolean;
|
|
}
|
|
|
|
export interface ProcessInboundEmailInAppDiagnostics extends Record<string, unknown> {
|
|
parser: {
|
|
confidence: number | null;
|
|
strategy: string | null;
|
|
heuristics: string[];
|
|
warnings: string[];
|
|
parseError: string | null;
|
|
tokenPresent: boolean;
|
|
replyTokenHash: string | null;
|
|
replyTokenSuffix: string | null;
|
|
};
|
|
headersSnapshot: {
|
|
messageId: string;
|
|
threadId: string | null;
|
|
inReplyTo: string | null;
|
|
references: string[];
|
|
from: string | null;
|
|
to: string[];
|
|
subject: string | null;
|
|
};
|
|
threading: {
|
|
tokenLookupAttempted: boolean;
|
|
tokenLookupMatched: boolean;
|
|
tokenLookupMissReason: 'token_missing' | 'token_not_found' | 'token_lookup_error' | null;
|
|
tokenLookupError: string | null;
|
|
headerLookupAttempted: boolean;
|
|
headerLookupMatched: boolean;
|
|
headerLookupMissReason: 'header_no_match' | 'header_lookup_error' | null;
|
|
headerLookupError: string | null;
|
|
matchedBy: 'reply_token' | 'thread_headers' | null;
|
|
matchedTicketId: string | null;
|
|
matchedCommentId: string | null;
|
|
threadId: string | null;
|
|
inReplyTo: string | null;
|
|
references: string[];
|
|
originalMessageIdCandidate: string | null;
|
|
failureReason:
|
|
| 'invalid_email_data'
|
|
| 'missing_defaults'
|
|
| 'self_notification'
|
|
| 'rule_skip'
|
|
| 'new_ticket_created'
|
|
| 'deduped'
|
|
| null;
|
|
};
|
|
outcome?: {
|
|
kind: 'skipped' | 'deduped' | 'replied' | 'created';
|
|
matchedBy?: 'reply_token' | 'thread_headers';
|
|
ticketId?: string;
|
|
ticketNumber?: string;
|
|
commentId?: string;
|
|
dedupeKey?: string;
|
|
reason?: 'missing_defaults' | 'invalid_email_data' | 'self_notification' | 'rule_skip';
|
|
rule?: { ruleId: string; ruleName: string };
|
|
};
|
|
}
|
|
|
|
type ProcessInboundEmailInAppBaseResult =
|
|
| {
|
|
outcome: 'skipped';
|
|
reason: 'missing_defaults' | 'invalid_email_data' | 'self_notification' | 'rule_skip';
|
|
rule?: { ruleId: string; ruleName: string };
|
|
}
|
|
| {
|
|
outcome: 'deduped';
|
|
dedupeKey: string;
|
|
ticketId?: string;
|
|
commentId?: string;
|
|
}
|
|
| {
|
|
outcome: 'replied';
|
|
matchedBy: 'reply_token' | 'thread_headers';
|
|
ticketId: string;
|
|
commentId: string;
|
|
}
|
|
| {
|
|
outcome: 'created';
|
|
ticketId: string;
|
|
ticketNumber?: string;
|
|
commentId: string;
|
|
};
|
|
|
|
export type ProcessInboundEmailInAppResult = ProcessInboundEmailInAppBaseResult & {
|
|
diagnostics?: ProcessInboundEmailInAppDiagnostics;
|
|
};
|
|
|
|
const REPLY_TOKEN_SUFFIX_LENGTH = 8;
|
|
|
|
function getReplyTokenFingerprint(token?: string): {
|
|
replyTokenHash: string | null;
|
|
replyTokenSuffix: string | null;
|
|
} {
|
|
const trimmedToken = typeof token === 'string' ? token.trim() : '';
|
|
if (!trimmedToken) {
|
|
return {
|
|
replyTokenHash: null,
|
|
replyTokenSuffix: null,
|
|
};
|
|
}
|
|
|
|
return {
|
|
replyTokenHash: createHash('sha256').update(trimmedToken).digest('hex'),
|
|
replyTokenSuffix: trimmedToken.slice(-REPLY_TOKEN_SUFFIX_LENGTH),
|
|
};
|
|
}
|
|
|
|
type InboundReplyReopenPolicyContext = {
|
|
ticketId: string;
|
|
boardId: string;
|
|
statusId: string | null;
|
|
statusIsClosed: boolean;
|
|
ticketIsClosed: boolean;
|
|
closedAt: string | null;
|
|
inboundReplyReopenEnabled: boolean;
|
|
inboundReplyReopenCutoffHours: number;
|
|
inboundReplyReopenStatusId: string | null;
|
|
inboundReplyAiAckSuppressionEnabled: boolean;
|
|
};
|
|
|
|
type InboundReplyDecisionMetadata = {
|
|
policyEnabled: boolean;
|
|
wasClosedTicketMatch: boolean;
|
|
cutoffHours: number | null;
|
|
cutoffExceeded: boolean;
|
|
senderKind: 'internal' | 'client';
|
|
action: 'reopen' | 'comment_only' | 'new_ticket';
|
|
reopenTargetSource?: 'explicit' | 'board_default' | null;
|
|
reopenTargetStatusId?: string | null;
|
|
aiSuppression: {
|
|
enabled: boolean;
|
|
attempted: boolean;
|
|
decision: 'ACK' | 'NOT_ACK' | null;
|
|
source: 'default' | 'ee_ai' | null;
|
|
reason: string | null;
|
|
model: string | null;
|
|
error: string | null;
|
|
rawOutput: string | null;
|
|
};
|
|
};
|
|
function buildDiagnostics(params: {
|
|
emailData: EmailMessageDetails;
|
|
senderEmail: string | null;
|
|
parsedEmail?: any | null;
|
|
parseError?: string | null;
|
|
conversationToken?: string;
|
|
}): ProcessInboundEmailInAppDiagnostics {
|
|
const fingerprint = getReplyTokenFingerprint(params.conversationToken);
|
|
|
|
return {
|
|
parser: {
|
|
confidence: typeof params.parsedEmail?.confidence === 'number' ? params.parsedEmail.confidence : null,
|
|
strategy:
|
|
typeof params.parsedEmail?.strategy === 'string' && params.parsedEmail.strategy.trim()
|
|
? params.parsedEmail.strategy.trim()
|
|
: null,
|
|
heuristics: Array.isArray(params.parsedEmail?.appliedHeuristics)
|
|
? params.parsedEmail.appliedHeuristics.filter((value: unknown): value is string => typeof value === 'string')
|
|
: [],
|
|
warnings: Array.isArray(params.parsedEmail?.warnings)
|
|
? params.parsedEmail.warnings.filter((value: unknown): value is string => typeof value === 'string')
|
|
: [],
|
|
parseError: params.parseError ?? null,
|
|
tokenPresent: Boolean(params.conversationToken),
|
|
replyTokenHash: fingerprint.replyTokenHash,
|
|
replyTokenSuffix: fingerprint.replyTokenSuffix,
|
|
},
|
|
headersSnapshot: {
|
|
messageId: params.emailData.id,
|
|
threadId: params.emailData.threadId ?? null,
|
|
inReplyTo: params.emailData.inReplyTo ?? null,
|
|
references: params.emailData.references ?? [],
|
|
from: params.senderEmail,
|
|
to: (params.emailData.to ?? []).map((recipient) => recipient.email),
|
|
subject: params.emailData.subject ?? null,
|
|
},
|
|
threading: {
|
|
tokenLookupAttempted: Boolean(params.conversationToken),
|
|
tokenLookupMatched: false,
|
|
tokenLookupMissReason: params.conversationToken ? null : 'token_missing',
|
|
tokenLookupError: null,
|
|
headerLookupAttempted: false,
|
|
headerLookupMatched: false,
|
|
headerLookupMissReason: null,
|
|
headerLookupError: null,
|
|
matchedBy: null,
|
|
matchedTicketId: null,
|
|
matchedCommentId: null,
|
|
threadId: params.emailData.threadId ?? null,
|
|
inReplyTo: params.emailData.inReplyTo ?? null,
|
|
references: params.emailData.references ?? [],
|
|
originalMessageIdCandidate: params.emailData.inReplyTo ?? params.emailData.id ?? null,
|
|
failureReason: null,
|
|
},
|
|
};
|
|
}
|
|
|
|
function withDiagnostics<T extends ProcessInboundEmailInAppBaseResult>(
|
|
result: T,
|
|
diagnostics?: ProcessInboundEmailInAppDiagnostics
|
|
): ProcessInboundEmailInAppResult {
|
|
if (!diagnostics) {
|
|
return result;
|
|
}
|
|
|
|
diagnostics.outcome =
|
|
result.outcome === 'skipped'
|
|
? { kind: result.outcome, reason: result.reason, ...(result.rule ? { rule: result.rule } : {}) }
|
|
: result.outcome === 'deduped'
|
|
? {
|
|
kind: result.outcome,
|
|
dedupeKey: result.dedupeKey,
|
|
ticketId: result.ticketId,
|
|
commentId: result.commentId,
|
|
}
|
|
: result.outcome === 'replied'
|
|
? {
|
|
kind: result.outcome,
|
|
matchedBy: result.matchedBy,
|
|
ticketId: result.ticketId,
|
|
commentId: result.commentId,
|
|
}
|
|
: {
|
|
kind: result.outcome,
|
|
ticketId: result.ticketId,
|
|
ticketNumber: result.ticketNumber,
|
|
commentId: result.commentId,
|
|
};
|
|
|
|
return {
|
|
...result,
|
|
diagnostics,
|
|
};
|
|
}
|
|
|
|
function extractConversationToken(parsedEmail: any): string | undefined {
|
|
const direct = parsedEmail?.tokens?.conversationToken;
|
|
if (typeof direct === 'string' && direct.trim()) {
|
|
return direct.trim();
|
|
}
|
|
const nested = parsedEmail?.metadata?.parser?.tokens?.conversationToken;
|
|
if (typeof nested === 'string' && nested.trim()) {
|
|
return nested.trim();
|
|
}
|
|
return undefined;
|
|
}
|
|
|
|
function stripAutomatedReplyMarkers(text: string): string {
|
|
return text
|
|
.replace(/\\?\[ALGA-REPLY-TOKEN[^\]\n\r]*(?:\])?/gi, ' ')
|
|
.replace(/ALGA-REPLY-TOKEN:[^\n\r]*/gi, ' ')
|
|
.replace(/ALGA-(?:TICKET|PROJECT|COMMENT|THREAD)-ID:[^\n\r]*/gi, ' ')
|
|
.replace(/---\s*Please reply above this line\s*---/gi, ' ')
|
|
.replace(/\s+/g, ' ')
|
|
.trim();
|
|
}
|
|
|
|
function hasSubstantiveReplyContent(parsedEmail: any, emailData: EmailMessageDetails): boolean {
|
|
const candidateText =
|
|
parsedEmail?.sanitizedText ??
|
|
parsedEmail?.text ??
|
|
emailData.body?.text ??
|
|
'';
|
|
|
|
return stripAutomatedReplyMarkers(String(candidateText)).length > 0;
|
|
}
|
|
|
|
function normalizePositiveInteger(value: unknown, fallback: number): number {
|
|
const parsed = Number(value);
|
|
if (!Number.isFinite(parsed) || parsed <= 0) {
|
|
return fallback;
|
|
}
|
|
return Math.max(1, Math.floor(parsed));
|
|
}
|
|
|
|
function toIsoOrNull(value: unknown): string | null {
|
|
if (!value) return null;
|
|
const date = value instanceof Date ? value : new Date(String(value));
|
|
return Number.isNaN(date.getTime()) ? null : date.toISOString();
|
|
}
|
|
|
|
function isClosedTicketBeyondReopenCutoff(params: {
|
|
closedAt: string | null;
|
|
receivedAt?: string;
|
|
cutoffHours: number;
|
|
}): boolean {
|
|
if (!params.closedAt || !params.receivedAt) {
|
|
return false;
|
|
}
|
|
|
|
const closedAtMs = new Date(params.closedAt).getTime();
|
|
const receivedAtMs = new Date(params.receivedAt).getTime();
|
|
if (Number.isNaN(closedAtMs) || Number.isNaN(receivedAtMs)) {
|
|
return false;
|
|
}
|
|
|
|
const cutoffMs = params.cutoffHours * 60 * 60 * 1000;
|
|
return (receivedAtMs - closedAtMs) > cutoffMs;
|
|
}
|
|
|
|
async function loadInboundReplyPolicyContext(params: {
|
|
tenantId: string;
|
|
ticketId: string;
|
|
}): Promise<InboundReplyReopenPolicyContext | null> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
|
|
return withAdminTransaction(async (trx: any) => {
|
|
const ticket = await trx('tickets')
|
|
.select(
|
|
'ticket_id',
|
|
'board_id',
|
|
'status_id',
|
|
'is_closed',
|
|
'closed_at',
|
|
)
|
|
.where('tenant', params.tenantId)
|
|
.andWhere('ticket_id', params.ticketId)
|
|
.first();
|
|
|
|
if (!ticket?.ticket_id || !ticket?.board_id) {
|
|
return null;
|
|
}
|
|
|
|
const status = ticket.status_id
|
|
? await trx('statuses')
|
|
.select('is_closed')
|
|
.where({
|
|
tenant: params.tenantId,
|
|
status_id: ticket.status_id,
|
|
})
|
|
.first()
|
|
: null;
|
|
|
|
const board = await trx('boards')
|
|
.select(
|
|
'inbound_reply_reopen_enabled',
|
|
'inbound_reply_reopen_cutoff_hours',
|
|
'inbound_reply_reopen_status_id',
|
|
'inbound_reply_ai_ack_suppression_enabled',
|
|
)
|
|
.where({
|
|
tenant: params.tenantId,
|
|
board_id: ticket.board_id,
|
|
})
|
|
.first();
|
|
|
|
const statusIsClosed = Boolean(status?.is_closed);
|
|
const ticketIsClosed = ticket.is_closed === null || ticket.is_closed === undefined
|
|
? statusIsClosed
|
|
: Boolean(ticket.is_closed);
|
|
|
|
return {
|
|
ticketId: ticket.ticket_id,
|
|
boardId: ticket.board_id,
|
|
statusId: ticket.status_id ?? null,
|
|
statusIsClosed,
|
|
ticketIsClosed,
|
|
closedAt: toIsoOrNull(ticket.closed_at),
|
|
inboundReplyReopenEnabled: Boolean(board?.inbound_reply_reopen_enabled),
|
|
inboundReplyReopenCutoffHours: normalizePositiveInteger(board?.inbound_reply_reopen_cutoff_hours, 168),
|
|
inboundReplyReopenStatusId: board?.inbound_reply_reopen_status_id ?? null,
|
|
inboundReplyAiAckSuppressionEnabled: Boolean(board?.inbound_reply_ai_ack_suppression_enabled),
|
|
};
|
|
});
|
|
}
|
|
|
|
async function resolveBoardReopenStatusTarget(params: {
|
|
tenantId: string;
|
|
boardId: string;
|
|
explicitStatusId: string | null;
|
|
}): Promise<{ statusId: string; source: 'explicit' | 'board_default' }> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
const { TicketModel } = await import('../../models/ticketModel');
|
|
|
|
return withAdminTransaction(async (trx: any) => {
|
|
if (params.explicitStatusId) {
|
|
const explicitStatus = await trx('statuses')
|
|
.select('status_id', 'is_closed')
|
|
.where({
|
|
tenant: params.tenantId,
|
|
board_id: params.boardId,
|
|
status_id: params.explicitStatusId,
|
|
status_type: 'ticket',
|
|
})
|
|
.first();
|
|
|
|
if (explicitStatus?.status_id && !explicitStatus.is_closed) {
|
|
return {
|
|
statusId: explicitStatus.status_id,
|
|
source: 'explicit' as const,
|
|
};
|
|
}
|
|
}
|
|
|
|
const defaultStatusId = await TicketModel.getDefaultStatusId(params.tenantId, trx, params.boardId);
|
|
if (!defaultStatusId) {
|
|
throw new Error(`No default open ticket status found for board ${params.boardId}`);
|
|
}
|
|
|
|
return {
|
|
statusId: defaultStatusId,
|
|
source: 'board_default' as const,
|
|
};
|
|
});
|
|
}
|
|
|
|
async function applyInboundReplyReopenTransition(params: {
|
|
tenantId: string;
|
|
ticketId: string;
|
|
statusId: string;
|
|
updatedByUserId?: string;
|
|
}): Promise<void> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
const {
|
|
writeTicketActivity,
|
|
TICKET_ACTIVITY_EVENT,
|
|
TICKET_ACTIVITY_ENTITY,
|
|
TICKET_ACTIVITY_ACTOR,
|
|
TICKET_ACTIVITY_SOURCE,
|
|
} = await import('../../lib/ticketActivity/index');
|
|
|
|
await withAdminTransaction(async (trx: any) => {
|
|
const previous = await trx('tickets')
|
|
.select('status_id')
|
|
.where({ tenant: params.tenantId, ticket_id: params.ticketId })
|
|
.first();
|
|
|
|
await trx('tickets')
|
|
.where({
|
|
tenant: params.tenantId,
|
|
ticket_id: params.ticketId,
|
|
})
|
|
.update({
|
|
status_id: params.statusId,
|
|
is_closed: false,
|
|
closed_at: null,
|
|
closed_by: null,
|
|
updated_at: trx.fn.now(),
|
|
updated_by: params.updatedByUserId ?? null,
|
|
});
|
|
|
|
// Reopen activity row. Source is inbound_email even though we don't have
|
|
// an email contact_id here — the reopen was triggered by inbound mail.
|
|
await writeTicketActivity(trx, {
|
|
tenant: params.tenantId,
|
|
ticketId: params.ticketId,
|
|
eventType: TICKET_ACTIVITY_EVENT.REOPENED,
|
|
entityType: TICKET_ACTIVITY_ENTITY.TICKET,
|
|
entityId: params.ticketId,
|
|
actor: {
|
|
actorType: TICKET_ACTIVITY_ACTOR.SYSTEM,
|
|
userId: params.updatedByUserId ?? null,
|
|
},
|
|
source: TICKET_ACTIVITY_SOURCE.INBOUND_EMAIL,
|
|
changes: {
|
|
status_id: { old: previous?.status_id ?? null, new: params.statusId },
|
|
closed_at: { old: null, new: null },
|
|
},
|
|
details: {
|
|
reopen_trigger: 'inbound_email_reply',
|
|
},
|
|
});
|
|
});
|
|
}
|
|
|
|
function buildDedupeKey(input: ProcessInboundEmailInAppInput): string {
|
|
return `inbound-email:${input.tenantId}:${input.providerId}:${input.emailData.id}`;
|
|
}
|
|
|
|
function blocksFallbackFromText(text: string) {
|
|
return [
|
|
{
|
|
type: 'paragraph',
|
|
content: [{ type: 'text', text, styles: {} }],
|
|
},
|
|
];
|
|
}
|
|
|
|
async function blocksFromEmailBody(params: {
|
|
html?: string;
|
|
text?: string;
|
|
}): Promise<unknown[]> {
|
|
const html = params.html?.trim();
|
|
const text = params.text?.trim();
|
|
|
|
if (html) {
|
|
try {
|
|
const blocks = await convertHtmlToBlockNote(html, { flattenTables: true });
|
|
return blocks.length ? blocks : blocksFallbackFromText(text ?? '');
|
|
} catch {
|
|
return blocksFallbackFromText(text ?? '');
|
|
}
|
|
}
|
|
|
|
if (text) {
|
|
try {
|
|
const blocks = convertMarkdownToBlocks(text);
|
|
return blocks.length ? blocks : blocksFallbackFromText(text);
|
|
} catch {
|
|
return blocksFallbackFromText(text);
|
|
}
|
|
}
|
|
|
|
return blocksFallbackFromText('');
|
|
}
|
|
|
|
async function findExistingEmailComment(params: {
|
|
tenantId: string;
|
|
ticketId: string;
|
|
messageId: string;
|
|
}): Promise<string | null> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
return withAdminTransaction(async (trx: any) => {
|
|
const row = await trx('comments as c')
|
|
.select('c.comment_id as commentId')
|
|
.where('c.tenant', params.tenantId)
|
|
.andWhere('c.ticket_id', params.ticketId)
|
|
.andWhere(function (this: any) {
|
|
this.whereRaw("c.metadata->'email'->>'messageId' = ?", [params.messageId]).orWhereRaw(
|
|
"c.metadata->>'messageId' = ?",
|
|
[params.messageId]
|
|
);
|
|
})
|
|
.first();
|
|
return row?.commentId ?? null;
|
|
});
|
|
}
|
|
|
|
async function findExistingEmailTicket(params: {
|
|
tenantId: string;
|
|
providerId: string;
|
|
messageId: string;
|
|
}): Promise<{ ticketId: string; ticketNumber?: string } | null> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
return withAdminTransaction(async (trx: any) => {
|
|
const row = await trx('tickets as t')
|
|
.select('t.ticket_id as ticketId', 't.ticket_number as ticketNumber')
|
|
.where('t.tenant', params.tenantId)
|
|
.andWhereRaw("t.email_metadata->>'messageId' = ?", [params.messageId])
|
|
.andWhere(function (this: any) {
|
|
this.whereRaw("t.email_metadata->>'providerId' = ?", [params.providerId]).orWhereRaw(
|
|
"t.email_metadata->>'provider_id' = ?",
|
|
[params.providerId]
|
|
);
|
|
})
|
|
.first();
|
|
return row?.ticketId ? { ticketId: row.ticketId, ticketNumber: row.ticketNumber } : null;
|
|
});
|
|
}
|
|
|
|
async function resolveReplyTargetFromComment(params: {
|
|
tenantId: string;
|
|
commentId: string;
|
|
}): Promise<{ ticketId: string; threadId: string; parentCommentId: string } | null> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
return withAdminTransaction(async (trx: any) => {
|
|
const source = await trx('comments')
|
|
.select('ticket_id as ticketId', 'thread_id as threadId')
|
|
.where({ tenant: params.tenantId, comment_id: params.commentId })
|
|
.first();
|
|
|
|
if (!source?.ticketId || !source?.threadId) {
|
|
return null;
|
|
}
|
|
|
|
const latest = await trx('comments')
|
|
.select('comment_id as parentCommentId')
|
|
.where({ tenant: params.tenantId, thread_id: source.threadId })
|
|
.orderBy('created_at', 'desc')
|
|
.orderBy('comment_id', 'desc')
|
|
.first();
|
|
|
|
return latest?.parentCommentId
|
|
? {
|
|
ticketId: source.ticketId,
|
|
threadId: source.threadId,
|
|
parentCommentId: latest.parentCommentId,
|
|
}
|
|
: null;
|
|
});
|
|
}
|
|
|
|
async function resolveReplyTargetFromCommentThread(params: {
|
|
tenantId: string;
|
|
threadId: string;
|
|
}): Promise<{ ticketId: string; threadId: string; parentCommentId: string } | null> {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
return withAdminTransaction(async (trx: any) => {
|
|
const thread = await trx('comment_threads')
|
|
.select('ticket_id as ticketId', 'thread_id as threadId')
|
|
.where({ tenant: params.tenantId, thread_id: params.threadId })
|
|
.first();
|
|
|
|
if (!thread?.ticketId || !thread?.threadId) {
|
|
return null;
|
|
}
|
|
|
|
const latest = await trx('comments')
|
|
.select('comment_id as parentCommentId')
|
|
.where({ tenant: params.tenantId, thread_id: thread.threadId })
|
|
.orderBy('created_at', 'desc')
|
|
.orderBy('comment_id', 'desc')
|
|
.first();
|
|
|
|
return latest?.parentCommentId
|
|
? {
|
|
ticketId: thread.ticketId,
|
|
threadId: thread.threadId,
|
|
parentCommentId: latest.parentCommentId,
|
|
}
|
|
: null;
|
|
});
|
|
}
|
|
|
|
async function resolveReplyTargetFromOutboundMessageId(params: {
|
|
tenantId: string;
|
|
rfcMessageId: string;
|
|
}): Promise<{ ticketId: string; threadId: string; parentCommentId: string } | null> {
|
|
const normalizedMessageId = params.rfcMessageId.trim();
|
|
if (!normalizedMessageId) {
|
|
return null;
|
|
}
|
|
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
const row = await withAdminTransaction(async (trx: any) => {
|
|
return trx('email_sending_logs')
|
|
.select('comment_thread_id as threadId')
|
|
.where({ tenant: params.tenantId, rfc_message_id: normalizedMessageId })
|
|
.whereNotNull('comment_thread_id')
|
|
.orderBy('created_at', 'desc')
|
|
.first();
|
|
});
|
|
|
|
return row?.threadId
|
|
? resolveReplyTargetFromCommentThread({
|
|
tenantId: params.tenantId,
|
|
threadId: row.threadId,
|
|
})
|
|
: null;
|
|
}
|
|
|
|
async function resolveReplyTargetFromReferences(params: {
|
|
tenantId: string;
|
|
references?: string[];
|
|
}): Promise<{ ticketId: string; threadId: string; parentCommentId: string } | null> {
|
|
const references = (params.references ?? [])
|
|
.map((value) => value.trim())
|
|
.filter(Boolean);
|
|
|
|
for (let index = references.length - 1; index >= 0; index -= 1) {
|
|
const target = await resolveReplyTargetFromOutboundMessageId({
|
|
tenantId: params.tenantId,
|
|
rfcMessageId: references[index],
|
|
});
|
|
if (target) {
|
|
return target;
|
|
}
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
async function resolveReplyTargetFromProviderThreadId(params: {
|
|
tenantId: string;
|
|
providerThreadId?: string | null;
|
|
}): Promise<{ ticketId: string; threadId: string; parentCommentId: string } | null> {
|
|
const providerThreadId = params.providerThreadId?.trim();
|
|
if (!providerThreadId) {
|
|
return null;
|
|
}
|
|
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
const row = await withAdminTransaction(async (trx: any) => {
|
|
return trx('comment_threads')
|
|
.select('thread_id as threadId')
|
|
.where({
|
|
tenant: params.tenantId,
|
|
email_provider_thread_id: providerThreadId,
|
|
})
|
|
.whereNotNull('ticket_id')
|
|
.orderBy('last_activity_at', 'desc')
|
|
.first();
|
|
});
|
|
|
|
return row?.threadId
|
|
? resolveReplyTargetFromCommentThread({
|
|
tenantId: params.tenantId,
|
|
threadId: row.threadId,
|
|
})
|
|
: null;
|
|
}
|
|
|
|
function normalizeEmbeddedContentId(value: string | undefined | null): string {
|
|
if (!value) return '';
|
|
return String(value).trim().replace(/^cid:/i, '').replace(/^<|>$/g, '').toLowerCase();
|
|
}
|
|
|
|
function rewriteEmbeddedImageSourcesInHtml(
|
|
html: string,
|
|
embeddedMappings: ProcessInboundEmailArtifactsResult['embeddedImageUrlMappings']
|
|
): string {
|
|
if (!html || !embeddedMappings.length) return html;
|
|
|
|
const dataUrlMap = new Map<string, string>();
|
|
const cidMap = new Map<string, string>();
|
|
|
|
for (const mapping of embeddedMappings) {
|
|
if (mapping.source === 'data-url') {
|
|
dataUrlMap.set(mapping.reference, mapping.url);
|
|
continue;
|
|
}
|
|
|
|
if (mapping.source === 'cid') {
|
|
const normalized = normalizeEmbeddedContentId(mapping.reference);
|
|
if (normalized) {
|
|
cidMap.set(normalized, mapping.url);
|
|
}
|
|
}
|
|
}
|
|
|
|
let rewritten = html;
|
|
|
|
if (dataUrlMap.size > 0) {
|
|
rewritten = rewritten.replace(
|
|
/data:(image\/[a-z0-9.+-]+);base64,([^"'<>]+)/gim,
|
|
(fullMatch: string, contentType: string, base64: string) => {
|
|
const normalized = `data:${String(contentType).toLowerCase()};base64,${String(base64).replace(/\s+/g, '')}`;
|
|
return dataUrlMap.get(normalized) || fullMatch;
|
|
}
|
|
);
|
|
}
|
|
|
|
if (cidMap.size > 0) {
|
|
rewritten = rewritten.replace(/\bcid:([^"'<>\s)]+)/gim, (fullMatch: string, cid: string) => {
|
|
const normalized = normalizeEmbeddedContentId(cid);
|
|
return cidMap.get(normalized) || fullMatch;
|
|
});
|
|
}
|
|
|
|
return rewritten;
|
|
}
|
|
|
|
async function maybeRewriteCommentWithEmbeddedAttachmentUrls(args: {
|
|
tenantId: string;
|
|
commentId: string;
|
|
html?: string;
|
|
text?: string;
|
|
originalCommentContent: string;
|
|
artifactsResult?: ProcessInboundEmailArtifactsResult;
|
|
}): Promise<void> {
|
|
const embeddedMappings = args.artifactsResult?.embeddedImageUrlMappings ?? [];
|
|
if (!args.html || embeddedMappings.length === 0) {
|
|
return;
|
|
}
|
|
|
|
const rewrittenHtml = rewriteEmbeddedImageSourcesInHtml(args.html, embeddedMappings);
|
|
if (!rewrittenHtml || rewrittenHtml === args.html) {
|
|
return;
|
|
}
|
|
|
|
const rewrittenBlocks = await blocksFromEmailBody({
|
|
html: rewrittenHtml,
|
|
text: args.text,
|
|
});
|
|
const rewrittenContent = JSON.stringify(rewrittenBlocks);
|
|
if (rewrittenContent === args.originalCommentContent) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const { withAdminTransaction } = await import('@alga-psa/db');
|
|
await withAdminTransaction(async (trx: any) => {
|
|
await trx('comments as c')
|
|
.where('c.tenant', args.tenantId)
|
|
.andWhere('c.comment_id', args.commentId)
|
|
.update({
|
|
note: rewrittenContent,
|
|
updated_at: new Date(),
|
|
});
|
|
});
|
|
} catch (error) {
|
|
console.warn('processInboundEmailInApp: embedded image comment rewrite failed (continuing)', {
|
|
tenantId: args.tenantId,
|
|
commentId: args.commentId,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
}
|
|
|
|
export async function processInboundEmailInApp(
|
|
input: ProcessInboundEmailInAppInput,
|
|
options: ProcessInboundEmailInAppOptions = {}
|
|
): Promise<ProcessInboundEmailInAppResult> {
|
|
if (!input?.tenantId || !input?.providerId || !input?.emailData?.id) {
|
|
const diagnostics = options.collectDiagnostics
|
|
? buildDiagnostics({
|
|
emailData: input?.emailData ?? ({
|
|
id: '',
|
|
provider: 'imap',
|
|
providerId: input?.providerId ?? '',
|
|
tenant: input?.tenantId ?? '',
|
|
receivedAt: '',
|
|
from: { email: '' },
|
|
to: [],
|
|
subject: '',
|
|
body: { text: '' },
|
|
} as EmailMessageDetails),
|
|
senderEmail: null,
|
|
})
|
|
: undefined;
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'invalid_email_data';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'invalid_email_data' }, diagnostics);
|
|
}
|
|
|
|
const tenantId = input.tenantId;
|
|
const providerId = input.providerId;
|
|
const emailData = input.emailData;
|
|
const dedupeKey = buildDedupeKey(input);
|
|
const senderEmail = normalizeEmailAddress(emailData.from?.email);
|
|
|
|
// Fast-path: if we've already created a ticket for this email, never create a second one.
|
|
const existingTicket = await findExistingEmailTicket({
|
|
tenantId,
|
|
providerId,
|
|
messageId: emailData.id,
|
|
});
|
|
if (existingTicket) {
|
|
const diagnostics = options.collectDiagnostics
|
|
? buildDiagnostics({
|
|
emailData,
|
|
senderEmail,
|
|
})
|
|
: undefined;
|
|
if (diagnostics) {
|
|
diagnostics.threading.matchedTicketId = existingTicket.ticketId;
|
|
diagnostics.threading.failureReason = 'deduped';
|
|
}
|
|
return withDiagnostics({
|
|
outcome: 'deduped',
|
|
dedupeKey,
|
|
ticketId: existingTicket.ticketId,
|
|
}, diagnostics);
|
|
}
|
|
|
|
const {
|
|
parseEmailReplyBody,
|
|
findTicketByReplyToken,
|
|
findTicketByEmailThread,
|
|
resolveInboundTicketDefaults,
|
|
resolveEffectiveInboundTicketDefaults,
|
|
findContactByEmail,
|
|
findClientIdByInboundEmailDomain,
|
|
findValidClientPrimaryContactId,
|
|
findEmailProviderMailboxAddress,
|
|
upsertTicketWatchListRecipients,
|
|
createTicketFromEmail,
|
|
createCommentFromEmail,
|
|
} = await import('../../workflow/actions/emailWorkflowActions');
|
|
|
|
let parsedEmail: any | null = null;
|
|
let parseError: string | null = null;
|
|
try {
|
|
parsedEmail = await parseEmailReplyBody({
|
|
text: emailData.body?.text,
|
|
html: emailData.body?.html,
|
|
});
|
|
} catch (error) {
|
|
parseError = error instanceof Error ? error.message : String(error);
|
|
console.warn('processInboundEmailInApp: parseEmailReplyBody failed (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
error: parseError,
|
|
});
|
|
}
|
|
const resolveSenderContact = async (context: {
|
|
ticketId?: string;
|
|
defaultClientId?: string | null;
|
|
} = {}) => {
|
|
if (!senderEmail) {
|
|
return null;
|
|
}
|
|
|
|
return findContactByEmail(senderEmail, tenantId, context);
|
|
};
|
|
|
|
let providerMailboxEmail: string | null = null;
|
|
try {
|
|
providerMailboxEmail = await findEmailProviderMailboxAddress(providerId, tenantId);
|
|
} catch (error) {
|
|
console.warn('processInboundEmailInApp: failed to resolve provider mailbox address (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
|
|
let inboundWatchListRecipients: TicketWatchListRecipientInput[] = [];
|
|
try {
|
|
inboundWatchListRecipients = buildInboundWatchListRecipients({
|
|
to: emailData.to,
|
|
cc: emailData.cc,
|
|
senderEmail: emailData.from?.email,
|
|
providerMailboxEmail,
|
|
});
|
|
} catch (error) {
|
|
console.warn('processInboundEmailInApp: watch-list candidate build failed (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
|
|
const conversationToken = extractConversationToken(parsedEmail);
|
|
const diagnostics = options.collectDiagnostics
|
|
? buildDiagnostics({
|
|
emailData,
|
|
senderEmail,
|
|
parsedEmail,
|
|
parseError,
|
|
conversationToken,
|
|
})
|
|
: undefined;
|
|
|
|
if (conversationToken && !hasSubstantiveReplyContent(parsedEmail, emailData)) {
|
|
console.info('processInboundEmailInApp: skipping token-only inbound email with no reply content', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
hasConversationToken: true,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'self_notification';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'self_notification' }, diagnostics);
|
|
}
|
|
|
|
const senderIsProviderMailbox =
|
|
Boolean(senderEmail) && Boolean(providerMailboxEmail) && senderEmail === providerMailboxEmail;
|
|
const senderName =
|
|
typeof emailData.from?.name === 'string' && emailData.from.name.trim()
|
|
? emailData.from.name.trim()
|
|
: undefined;
|
|
const hasReplySignals =
|
|
Boolean(conversationToken) ||
|
|
Boolean(emailData.inReplyTo) ||
|
|
Boolean(emailData.threadId) ||
|
|
Boolean(emailData.references?.length);
|
|
|
|
if (senderIsProviderMailbox && hasReplySignals) {
|
|
console.info('processInboundEmailInApp: skipping self-sent notification email', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
senderEmail,
|
|
providerMailboxEmail,
|
|
hasConversationToken: Boolean(conversationToken),
|
|
hasInReplyTo: Boolean(emailData.inReplyTo),
|
|
hasThreadId: Boolean(emailData.threadId),
|
|
hasReferences: Boolean(emailData.references?.length),
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'self_notification';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'self_notification' }, diagnostics);
|
|
}
|
|
|
|
const buildCommentEmailMetadata = (options: {
|
|
matchedSenderEmail?: string | null;
|
|
primaryContactEmail?: string | null;
|
|
} = {}) => ({
|
|
messageId: emailData.id,
|
|
provider: emailData.provider,
|
|
providerId,
|
|
threadId: emailData.threadId,
|
|
inReplyTo: emailData.inReplyTo,
|
|
references: emailData.references,
|
|
from: emailData.from,
|
|
fromAddress: senderEmail ?? undefined,
|
|
fromName: senderName,
|
|
matchedAddress: options.matchedSenderEmail ?? senderEmail ?? undefined,
|
|
contactEmail: options.primaryContactEmail ?? undefined,
|
|
to: emailData.to,
|
|
subject: emailData.subject,
|
|
receivedAt: emailData.receivedAt,
|
|
});
|
|
|
|
const buildUnmatchedSenderWatchListRecipients = (matchedContactId?: string | null) => {
|
|
if (matchedContactId || !senderEmail || senderIsProviderMailbox) {
|
|
return [] as TicketWatchListRecipientInput[];
|
|
}
|
|
|
|
return [
|
|
{
|
|
email: senderEmail,
|
|
active: true,
|
|
name: senderName,
|
|
source: 'inbound_from',
|
|
},
|
|
] as TicketWatchListRecipientInput[];
|
|
};
|
|
|
|
const upsertWatchListBestEffort = async (
|
|
ticketId: string,
|
|
recipients: TicketWatchListRecipientInput[] = inboundWatchListRecipients
|
|
) => {
|
|
if (!recipients.length) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await upsertTicketWatchListRecipients(
|
|
{
|
|
ticketId,
|
|
recipients,
|
|
},
|
|
tenantId
|
|
);
|
|
} catch (error) {
|
|
console.warn('processInboundEmailInApp: watch-list upsert failed (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
ticketId,
|
|
recipientCount: recipients.length,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
};
|
|
|
|
let rerouteToNewTicket = false;
|
|
let rerouteReasonMetadata: InboundReplyDecisionMetadata | null = null;
|
|
|
|
const handleThreadedReply = async (params: {
|
|
ticketId: string;
|
|
matchedBy: 'reply_token' | 'thread_headers';
|
|
parentCommentId?: string | null;
|
|
}): Promise<ProcessInboundEmailInAppResult | null> => {
|
|
const existingCommentId = await findExistingEmailComment({
|
|
tenantId,
|
|
ticketId: params.ticketId,
|
|
messageId: emailData.id,
|
|
});
|
|
if (existingCommentId) {
|
|
if (diagnostics) {
|
|
diagnostics.threading.matchedCommentId = existingCommentId;
|
|
diagnostics.threading.failureReason = 'deduped';
|
|
}
|
|
return withDiagnostics({
|
|
outcome: 'deduped',
|
|
dedupeKey,
|
|
ticketId: params.ticketId,
|
|
commentId: existingCommentId,
|
|
}, diagnostics);
|
|
}
|
|
|
|
const parsedHtml = parsedEmail?.sanitizedHtml ?? emailData.body?.html;
|
|
const parsedText = parsedEmail?.sanitizedText ?? emailData.body?.text;
|
|
const blocks = await blocksFromEmailBody({
|
|
html: parsedHtml,
|
|
text: parsedText,
|
|
});
|
|
const serializedBlocks = JSON.stringify(blocks);
|
|
const matchedSenderContact = await resolveSenderContact({ ticketId: params.ticketId });
|
|
const matchedSenderIsInternalUser = matchedSenderContact?.user_type === 'internal';
|
|
const matchedSenderContactId = matchedSenderContact?.contact_id || undefined;
|
|
const senderKind = matchedSenderIsInternalUser ? 'internal' : 'client';
|
|
|
|
let policyContext: InboundReplyReopenPolicyContext | null = null;
|
|
try {
|
|
policyContext = await loadInboundReplyPolicyContext({
|
|
tenantId,
|
|
ticketId: params.ticketId,
|
|
});
|
|
} catch (error) {
|
|
console.warn('processInboundEmailInApp: failed to load inbound reply reopen policy (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
ticketId: params.ticketId,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
});
|
|
}
|
|
|
|
const aiSuppressionDefault: InboundReplyDecisionMetadata['aiSuppression'] = {
|
|
enabled: false,
|
|
attempted: false,
|
|
decision: null,
|
|
source: null,
|
|
reason: null,
|
|
model: null,
|
|
error: null,
|
|
rawOutput: null,
|
|
};
|
|
|
|
let decisionMetadata: InboundReplyDecisionMetadata = {
|
|
policyEnabled: Boolean(policyContext?.inboundReplyReopenEnabled),
|
|
wasClosedTicketMatch: Boolean(policyContext?.ticketIsClosed),
|
|
cutoffHours: policyContext?.inboundReplyReopenCutoffHours ?? null,
|
|
cutoffExceeded: false,
|
|
senderKind,
|
|
action: 'comment_only',
|
|
reopenTargetSource: null,
|
|
reopenTargetStatusId: null,
|
|
aiSuppression: aiSuppressionDefault,
|
|
};
|
|
|
|
let shouldReopen = false;
|
|
if (policyContext?.ticketIsClosed) {
|
|
if (!policyContext.inboundReplyReopenEnabled) {
|
|
decisionMetadata.action = 'comment_only';
|
|
} else {
|
|
const cutoffExceeded = isClosedTicketBeyondReopenCutoff({
|
|
closedAt: policyContext.closedAt,
|
|
receivedAt: emailData.receivedAt,
|
|
cutoffHours: policyContext.inboundReplyReopenCutoffHours,
|
|
});
|
|
decisionMetadata.cutoffExceeded = cutoffExceeded;
|
|
|
|
if (cutoffExceeded) {
|
|
decisionMetadata.action = 'new_ticket';
|
|
rerouteToNewTicket = true;
|
|
rerouteReasonMetadata = decisionMetadata;
|
|
console.info('processInboundEmailInApp: rerouting closed-ticket reply to new ticket due to cutoff', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
ticketId: params.ticketId,
|
|
matchedBy: params.matchedBy,
|
|
metadata: decisionMetadata,
|
|
});
|
|
return null;
|
|
}
|
|
|
|
if (matchedSenderIsInternalUser) {
|
|
shouldReopen = true;
|
|
} else {
|
|
const aiSuppressionEnabled = policyContext.inboundReplyAiAckSuppressionEnabled;
|
|
decisionMetadata.aiSuppression.enabled = aiSuppressionEnabled;
|
|
|
|
if (aiSuppressionEnabled) {
|
|
const decider = await resolveInboundReplyAcknowledgementDecider();
|
|
const ackResult: InboundReplyAckDeciderResult = await decider.decide({
|
|
tenantId,
|
|
boardId: policyContext.boardId,
|
|
ticketId: params.ticketId,
|
|
subject: emailData.subject,
|
|
text: parsedText ?? '',
|
|
});
|
|
decisionMetadata.aiSuppression = {
|
|
enabled: aiSuppressionEnabled,
|
|
attempted: ackResult.attempted,
|
|
decision: ackResult.decision,
|
|
source: ackResult.source,
|
|
reason: ackResult.reason,
|
|
model: ackResult.model ?? null,
|
|
error: ackResult.error ?? null,
|
|
rawOutput: ackResult.rawOutput ?? null,
|
|
};
|
|
shouldReopen = ackResult.decision !== 'ACK';
|
|
} else {
|
|
shouldReopen = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (shouldReopen && policyContext?.ticketIsClosed) {
|
|
const reopenTarget = await resolveBoardReopenStatusTarget({
|
|
tenantId,
|
|
boardId: policyContext.boardId,
|
|
explicitStatusId: policyContext.inboundReplyReopenStatusId,
|
|
});
|
|
await applyInboundReplyReopenTransition({
|
|
tenantId,
|
|
ticketId: params.ticketId,
|
|
statusId: reopenTarget.statusId,
|
|
updatedByUserId: matchedSenderIsInternalUser ? matchedSenderContact?.user_id : undefined,
|
|
});
|
|
decisionMetadata.action = 'reopen';
|
|
decisionMetadata.reopenTargetSource = reopenTarget.source;
|
|
decisionMetadata.reopenTargetStatusId = reopenTarget.statusId;
|
|
}
|
|
|
|
const watchListRecipients = mergeTicketWatchListRecipients(
|
|
inboundWatchListRecipients,
|
|
buildUnmatchedSenderWatchListRecipients(matchedSenderContactId ?? null)
|
|
);
|
|
const commentId = await createCommentFromEmail(
|
|
{
|
|
ticket_id: params.ticketId,
|
|
content: serializedBlocks,
|
|
parent_comment_id: params.parentCommentId ?? undefined,
|
|
source: 'email',
|
|
author_type: matchedSenderIsInternalUser ? 'internal' : 'contact',
|
|
author_id: matchedSenderContact?.user_id,
|
|
contact_id: matchedSenderIsInternalUser ? undefined : matchedSenderContactId,
|
|
metadata: {
|
|
email: buildCommentEmailMetadata({
|
|
matchedSenderEmail: matchedSenderContact?.matched_email ?? senderEmail ?? null,
|
|
primaryContactEmail: matchedSenderContact?.email ?? null,
|
|
}),
|
|
parser: {
|
|
confidence: parsedEmail?.confidence,
|
|
strategy: parsedEmail?.strategy,
|
|
heuristics: parsedEmail?.appliedHeuristics,
|
|
warnings: parsedEmail?.warnings,
|
|
},
|
|
inboundReopenDecision: decisionMetadata,
|
|
},
|
|
inboundReplyEvent: {
|
|
messageId: emailData.id,
|
|
threadId: emailData.threadId,
|
|
from: emailData.from?.email ?? '',
|
|
to: (emailData.to ?? []).map((r) => r.email),
|
|
subject: emailData.subject,
|
|
receivedAt: emailData.receivedAt,
|
|
provider: emailData.provider,
|
|
matchedBy: params.matchedBy,
|
|
},
|
|
},
|
|
tenantId
|
|
);
|
|
|
|
const artifactsResult = await processInboundEmailArtifactsBestEffort({
|
|
tenantId,
|
|
providerId,
|
|
ticketId: params.ticketId,
|
|
emailData,
|
|
scopeLabel: 'reply',
|
|
clientVisibleAttachments: !matchedSenderIsInternalUser,
|
|
});
|
|
await maybeRewriteCommentWithEmbeddedAttachmentUrls({
|
|
tenantId,
|
|
commentId,
|
|
html: parsedHtml,
|
|
text: parsedText,
|
|
originalCommentContent: serializedBlocks,
|
|
artifactsResult,
|
|
});
|
|
|
|
await upsertWatchListBestEffort(params.ticketId, watchListRecipients);
|
|
|
|
console.info('processInboundEmailInApp: inbound threaded reply decision', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
ticketId: params.ticketId,
|
|
matchedBy: params.matchedBy,
|
|
metadata: decisionMetadata,
|
|
});
|
|
|
|
if (diagnostics) {
|
|
diagnostics.threading.matchedCommentId = commentId;
|
|
}
|
|
return withDiagnostics({
|
|
outcome: 'replied',
|
|
matchedBy: params.matchedBy,
|
|
ticketId: params.ticketId,
|
|
commentId,
|
|
}, diagnostics);
|
|
};
|
|
|
|
const token = conversationToken;
|
|
if (token) {
|
|
try {
|
|
const match = await findTicketByReplyToken(String(token), tenantId);
|
|
if (match?.ticketId) {
|
|
const replyTarget = match.commentId
|
|
? await resolveReplyTargetFromComment({
|
|
tenantId,
|
|
commentId: match.commentId,
|
|
})
|
|
: null;
|
|
const ticketId = replyTarget?.ticketId ?? match.ticketId;
|
|
|
|
if (diagnostics) {
|
|
diagnostics.threading.tokenLookupMatched = true;
|
|
diagnostics.threading.tokenLookupMissReason = null;
|
|
diagnostics.threading.matchedBy = 'reply_token';
|
|
diagnostics.threading.matchedTicketId = ticketId;
|
|
diagnostics.threading.matchedCommentId = match.commentId ?? null;
|
|
}
|
|
|
|
const handled = await handleThreadedReply({
|
|
ticketId,
|
|
matchedBy: 'reply_token',
|
|
parentCommentId: replyTarget?.parentCommentId ?? null,
|
|
});
|
|
if (handled) {
|
|
return handled;
|
|
}
|
|
} else if (diagnostics) {
|
|
diagnostics.threading.tokenLookupMissReason = 'token_not_found';
|
|
}
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
console.warn('processInboundEmailInApp: reply-token threading failed (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
error: errorMessage,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.tokenLookupMissReason = 'token_lookup_error';
|
|
diagnostics.threading.tokenLookupError = errorMessage;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Thread headers fallback.
|
|
let threadedTicketId: string | null = null;
|
|
let threadedParentCommentId: string | null = null;
|
|
if (!rerouteToNewTicket) {
|
|
if (diagnostics) {
|
|
diagnostics.threading.headerLookupAttempted = true;
|
|
}
|
|
|
|
try {
|
|
let threadTarget = emailData.inReplyTo
|
|
? await resolveReplyTargetFromOutboundMessageId({
|
|
tenantId,
|
|
rfcMessageId: emailData.inReplyTo,
|
|
})
|
|
: null;
|
|
if (!threadTarget) {
|
|
threadTarget = await resolveReplyTargetFromReferences({
|
|
tenantId,
|
|
references: emailData.references,
|
|
});
|
|
}
|
|
if (!threadTarget) {
|
|
threadTarget = await resolveReplyTargetFromProviderThreadId({
|
|
tenantId,
|
|
providerThreadId: emailData.threadId,
|
|
});
|
|
}
|
|
|
|
if (threadTarget) {
|
|
threadedTicketId = threadTarget.ticketId;
|
|
threadedParentCommentId = threadTarget.parentCommentId;
|
|
if (diagnostics) {
|
|
diagnostics.threading.headerLookupMatched = true;
|
|
diagnostics.threading.headerLookupMissReason = null;
|
|
diagnostics.threading.matchedBy = 'thread_headers';
|
|
diagnostics.threading.matchedTicketId = threadTarget.ticketId;
|
|
}
|
|
} else {
|
|
const ticket = await findTicketByEmailThread(
|
|
{
|
|
threadId: emailData.threadId,
|
|
inReplyTo: emailData.inReplyTo,
|
|
references: emailData.references,
|
|
originalMessageId: emailData.inReplyTo ?? emailData.id,
|
|
},
|
|
tenantId
|
|
);
|
|
if (ticket?.ticketId) {
|
|
threadedTicketId = ticket.ticketId;
|
|
if (diagnostics) {
|
|
diagnostics.threading.headerLookupMatched = true;
|
|
diagnostics.threading.headerLookupMissReason = null;
|
|
diagnostics.threading.matchedBy = 'thread_headers';
|
|
diagnostics.threading.matchedTicketId = ticket.ticketId;
|
|
}
|
|
} else if (diagnostics) {
|
|
diagnostics.threading.headerLookupMissReason = 'header_no_match';
|
|
}
|
|
}
|
|
} catch (error) {
|
|
const errorMessage = error instanceof Error ? error.message : String(error);
|
|
console.warn('processInboundEmailInApp: header threading failed (continuing)', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
error: errorMessage,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.headerLookupMissReason = 'header_lookup_error';
|
|
diagnostics.threading.headerLookupError = errorMessage;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (threadedTicketId) {
|
|
const handled = await handleThreadedReply({
|
|
ticketId: threadedTicketId,
|
|
matchedBy: 'thread_headers',
|
|
parentCommentId: threadedParentCommentId,
|
|
});
|
|
if (handled) {
|
|
return handled;
|
|
}
|
|
}
|
|
|
|
// New ticket path.
|
|
// Inbound email rules run only here — replies that threaded above never reach
|
|
// this point — and before defaults resolution so skip rules work even for
|
|
// tenants with no inbound defaults configured.
|
|
const ruleEvaluation = await evaluateInboundEmailRules({ tenantId, providerId, emailData });
|
|
const ruleOutcome = ruleEvaluation.outcome;
|
|
if (ruleEvaluation.trace.length > 0 || ruleOutcome.kind !== 'none') {
|
|
console.info('processInboundEmailInApp: inbound email rules evaluated', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
rulesConsidered: ruleEvaluation.trace.length,
|
|
matchedRuleId: 'ruleId' in ruleOutcome ? ruleOutcome.ruleId : null,
|
|
matchedRuleName: 'ruleName' in ruleOutcome ? ruleOutcome.ruleName : null,
|
|
outcome: ruleOutcome.kind,
|
|
});
|
|
}
|
|
|
|
if (ruleOutcome.kind === 'skip') {
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'rule_skip';
|
|
}
|
|
return withDiagnostics(
|
|
{
|
|
outcome: 'skipped',
|
|
reason: 'rule_skip',
|
|
rule: { ruleId: ruleOutcome.ruleId, ruleName: ruleOutcome.ruleName },
|
|
},
|
|
diagnostics
|
|
);
|
|
}
|
|
|
|
const ruleAssignedClientId = ruleOutcome.kind === 'assign_client' ? ruleOutcome.clientId : null;
|
|
const ruleDestinationDefaults =
|
|
ruleOutcome.kind === 'set_destination' || ruleOutcome.kind === 'fallback_destination'
|
|
? (ruleOutcome.defaults as any)
|
|
: null;
|
|
const appliedRule =
|
|
ruleOutcome.kind !== 'none'
|
|
? { ruleId: ruleOutcome.ruleId, ruleName: ruleOutcome.ruleName }
|
|
: null;
|
|
|
|
const providerDefaults = await resolveInboundTicketDefaults(tenantId, providerId);
|
|
if (!providerDefaults && !ruleDestinationDefaults) {
|
|
console.warn('processInboundEmailInApp: missing inbound ticket defaults; skipping email', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'missing_defaults';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'missing_defaults' }, diagnostics);
|
|
}
|
|
|
|
const matchedSenderContact = await resolveSenderContact({
|
|
defaultClientId: ruleAssignedClientId ?? providerDefaults?.client_id ?? null,
|
|
});
|
|
|
|
let domainMatchedClientId: string | null = null;
|
|
let domainMatchedContactId: string | null = null;
|
|
if (!ruleAssignedClientId && !matchedSenderContact && senderEmail) {
|
|
const senderDomain = extractEmailDomain(senderEmail);
|
|
if (senderDomain) {
|
|
domainMatchedClientId = await findClientIdByInboundEmailDomain(senderDomain, tenantId);
|
|
if (domainMatchedClientId) {
|
|
domainMatchedContactId = await findValidClientPrimaryContactId(domainMatchedClientId, tenantId);
|
|
}
|
|
}
|
|
}
|
|
|
|
const matchedSenderClientId = matchedSenderContact?.client_id || undefined;
|
|
const matchedSenderContactId = matchedSenderContact?.contact_id || undefined;
|
|
|
|
// A rule-assigned client wins over sender-based matching: the sender is a
|
|
// service mailbox, not the client the email is about. The sender contact is
|
|
// only kept when it belongs to the assigned client.
|
|
const senderContactInRuleClient = Boolean(
|
|
ruleAssignedClientId &&
|
|
matchedSenderContact?.contact_id &&
|
|
matchedSenderClientId === ruleAssignedClientId
|
|
);
|
|
let ruleAssignedContactId: string | null = null;
|
|
if (ruleAssignedClientId) {
|
|
ruleAssignedContactId = senderContactInRuleClient
|
|
? matchedSenderContactId ?? null
|
|
: await findValidClientPrimaryContactId(ruleAssignedClientId, tenantId);
|
|
}
|
|
|
|
// Rule destination defaults (set_destination / non-match fallback) sit above
|
|
// the contact/client/provider cascade.
|
|
let defaults: any = ruleDestinationDefaults;
|
|
let destinationSource: string | null = ruleDestinationDefaults
|
|
? ruleOutcome.kind === 'set_destination'
|
|
? 'rule_destination'
|
|
: 'rule_fallback_destination'
|
|
: null;
|
|
let destinationFallbackReason: string | null = null;
|
|
|
|
if (!defaults) {
|
|
const destinationResolution = await resolveEffectiveInboundTicketDefaults({
|
|
tenant: tenantId,
|
|
providerId,
|
|
providerDefaults,
|
|
matchedContactId: ruleAssignedClientId
|
|
? senderContactInRuleClient
|
|
? matchedSenderContactId ?? null
|
|
: null
|
|
: matchedSenderContactId ?? null,
|
|
matchedContactClientId: ruleAssignedClientId
|
|
? senderContactInRuleClient
|
|
? ruleAssignedClientId
|
|
: null
|
|
: matchedSenderClientId ?? null,
|
|
domainMatchedClientId: ruleAssignedClientId
|
|
? senderContactInRuleClient
|
|
? null
|
|
: ruleAssignedClientId
|
|
: domainMatchedClientId,
|
|
});
|
|
defaults = destinationResolution.defaults;
|
|
destinationSource = destinationResolution.source;
|
|
destinationFallbackReason = destinationResolution.fallbackReason ?? null;
|
|
}
|
|
|
|
if (!defaults) {
|
|
console.warn('processInboundEmailInApp: no effective inbound destination resolved; skipping email', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
source: destinationSource,
|
|
fallbackReason: destinationFallbackReason,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'missing_defaults';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'missing_defaults' }, diagnostics);
|
|
}
|
|
|
|
console.debug('processInboundEmailInApp: resolved inbound destination source', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
source: destinationSource,
|
|
fallbackReason: destinationFallbackReason,
|
|
});
|
|
let targetClientId = ruleAssignedClientId ?? matchedSenderClientId ?? defaults.client_id;
|
|
let targetContactId = ruleAssignedClientId
|
|
? ruleAssignedContactId ?? undefined
|
|
: matchedSenderContactId;
|
|
|
|
// Domain fallback: if no exact contact match, use explicitly configured inbound-domain client mapping.
|
|
if (!ruleAssignedClientId && !matchedSenderContact && domainMatchedClientId) {
|
|
targetClientId = domainMatchedClientId;
|
|
targetContactId = domainMatchedContactId ?? undefined;
|
|
}
|
|
|
|
// Only treat the email as authored by a contact when we have an exact sender
|
|
// email match that is consistent with the ticket's client.
|
|
const matchedSenderIsInternalUser = matchedSenderContact?.user_type === 'internal';
|
|
const senderContactUsableAsAuthor = !ruleAssignedClientId || senderContactInRuleClient;
|
|
const commentAuthorContactId =
|
|
matchedSenderIsInternalUser || !senderContactUsableAsAuthor ? undefined : matchedSenderContactId;
|
|
const commentAuthorUserId = senderContactUsableAsAuthor
|
|
? matchedSenderContact?.user_id ?? null
|
|
: null;
|
|
const commentAuthorType = matchedSenderIsInternalUser ? 'internal' : 'contact';
|
|
|
|
const clientMatchSource =
|
|
ruleOutcome.kind === 'assign_client'
|
|
? ruleOutcome.matchSource
|
|
: matchedSenderContact?.contact_id
|
|
? 'email_match'
|
|
: domainMatchedClientId
|
|
? 'domain_match'
|
|
: 'provider_default';
|
|
|
|
// Ticket creation requires a client. If neither defaults nor sender/domain matching
|
|
// can resolve one, skip without failing the webhook.
|
|
if (!targetClientId) {
|
|
console.warn('processInboundEmailInApp: no target client resolved; skipping email', {
|
|
tenantId,
|
|
providerId,
|
|
emailId: emailData.id,
|
|
senderEmail,
|
|
});
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'missing_defaults';
|
|
}
|
|
return withDiagnostics({ outcome: 'skipped', reason: 'missing_defaults' }, diagnostics);
|
|
}
|
|
|
|
// New-ticket idempotency: ticket could have been created in another parallel process.
|
|
const existingTicketAfterDefaults = await findExistingEmailTicket({
|
|
tenantId,
|
|
providerId,
|
|
messageId: emailData.id,
|
|
});
|
|
if (existingTicketAfterDefaults) {
|
|
if (diagnostics) {
|
|
diagnostics.threading.matchedTicketId = existingTicketAfterDefaults.ticketId;
|
|
diagnostics.threading.failureReason = 'deduped';
|
|
}
|
|
return withDiagnostics({
|
|
outcome: 'deduped',
|
|
dedupeKey,
|
|
ticketId: existingTicketAfterDefaults.ticketId,
|
|
}, diagnostics);
|
|
}
|
|
|
|
const parsedHtml = parsedEmail?.sanitizedHtml ?? emailData.body?.html;
|
|
const parsedText = parsedEmail?.sanitizedText ?? emailData.body?.text;
|
|
const blocks = await blocksFromEmailBody({
|
|
html: parsedHtml,
|
|
text: parsedText,
|
|
});
|
|
const serializedBlocks = JSON.stringify(blocks);
|
|
const seededWatchList = mergeTicketWatchListRecipients(
|
|
inboundWatchListRecipients,
|
|
buildUnmatchedSenderWatchListRecipients(commentAuthorContactId ?? null)
|
|
);
|
|
const seededAttributes = setTicketWatchListOnAttributes(undefined, seededWatchList);
|
|
|
|
const ticketResult = await createTicketFromEmail(
|
|
{
|
|
title: emailData.subject || '(no subject)',
|
|
description: serializedBlocks,
|
|
client_id: targetClientId,
|
|
contact_id: targetContactId,
|
|
source: 'email',
|
|
board_id: defaults.board_id,
|
|
status_id: defaults.status_id,
|
|
priority_id: defaults.priority_id,
|
|
category_id: defaults.category_id,
|
|
subcategory_id: defaults.subcategory_id,
|
|
// Avoid cross-client location_id mismatch when we infer a different client than the defaults.
|
|
location_id: targetClientId === defaults.client_id ? defaults.location_id : null,
|
|
entered_by: defaults.entered_by,
|
|
email_metadata: {
|
|
messageId: emailData.id,
|
|
threadId: emailData.threadId,
|
|
from: emailData.from,
|
|
inReplyTo: emailData.inReplyTo,
|
|
references: emailData.references,
|
|
providerId,
|
|
clientMatchSource,
|
|
...(appliedRule
|
|
? { appliedRuleId: appliedRule.ruleId, appliedRuleName: appliedRule.ruleName }
|
|
: {}),
|
|
},
|
|
attributes: seededAttributes ?? undefined,
|
|
},
|
|
tenantId
|
|
);
|
|
|
|
const commentId = await createCommentFromEmail(
|
|
{
|
|
ticket_id: ticketResult.ticket_id,
|
|
content: serializedBlocks,
|
|
source: 'email',
|
|
// First comment on a brand-new ticket: the TICKET_CREATED email already notifies
|
|
// the tech with the same body, so keep this comment in-app only to avoid a duplicate.
|
|
suppressTechEmailNotification: true,
|
|
// Unmatched inbound senders are still customer-originated replies even
|
|
// when we cannot resolve them to an existing contact record.
|
|
author_type: commentAuthorType,
|
|
author_id: commentAuthorUserId ?? undefined,
|
|
contact_id: commentAuthorContactId ?? undefined,
|
|
metadata: {
|
|
email: buildCommentEmailMetadata({
|
|
matchedSenderEmail: matchedSenderContact?.matched_email ?? senderEmail ?? null,
|
|
primaryContactEmail: matchedSenderContact?.email ?? null,
|
|
}),
|
|
parser: {
|
|
confidence: parsedEmail?.confidence,
|
|
strategy: parsedEmail?.strategy,
|
|
heuristics: parsedEmail?.appliedHeuristics,
|
|
warnings: parsedEmail?.warnings,
|
|
},
|
|
unmatchedSender: !commentAuthorContactId,
|
|
inboundReopenDecision: rerouteReasonMetadata ?? undefined,
|
|
},
|
|
},
|
|
tenantId
|
|
);
|
|
|
|
const artifactsResult = await processInboundEmailArtifactsBestEffort({
|
|
tenantId,
|
|
providerId,
|
|
ticketId: ticketResult.ticket_id,
|
|
emailData,
|
|
scopeLabel: 'new-ticket',
|
|
clientVisibleAttachments: !matchedSenderIsInternalUser,
|
|
});
|
|
await maybeRewriteCommentWithEmbeddedAttachmentUrls({
|
|
tenantId,
|
|
commentId,
|
|
html: parsedHtml,
|
|
text: parsedText,
|
|
originalCommentContent: serializedBlocks,
|
|
artifactsResult,
|
|
});
|
|
|
|
if (diagnostics) {
|
|
diagnostics.threading.failureReason = 'new_ticket_created';
|
|
}
|
|
return withDiagnostics({
|
|
outcome: 'created',
|
|
ticketId: ticketResult.ticket_id,
|
|
ticketNumber: ticketResult.ticket_number,
|
|
commentId,
|
|
}, diagnostics);
|
|
}
|