Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
192 lines
7.3 KiB
TypeScript
192 lines
7.3 KiB
TypeScript
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
|
import type {
|
|
ClaimedUnifiedInboundEmailQueueJob,
|
|
FailUnifiedInboundEmailQueueJobResult,
|
|
} from '../unifiedInboundEmailQueue';
|
|
import { UnifiedInboundEmailQueueConsumer } from '../unifiedInboundEmailQueueConsumer';
|
|
|
|
const claimUnifiedInboundEmailQueueJobMock = vi.fn();
|
|
const ackUnifiedInboundEmailQueueJobMock = vi.fn();
|
|
const failUnifiedInboundEmailQueueJobMock = vi.fn();
|
|
const reclaimExpiredUnifiedInboundEmailQueueJobsMock = vi.fn();
|
|
|
|
vi.mock('../unifiedInboundEmailQueue', () => ({
|
|
claimUnifiedInboundEmailQueueJob: (...args: any[]) => claimUnifiedInboundEmailQueueJobMock(...args),
|
|
ackUnifiedInboundEmailQueueJob: (...args: any[]) => ackUnifiedInboundEmailQueueJobMock(...args),
|
|
failUnifiedInboundEmailQueueJob: (...args: any[]) => failUnifiedInboundEmailQueueJobMock(...args),
|
|
reclaimExpiredUnifiedInboundEmailQueueJobs: (...args: any[]) =>
|
|
reclaimExpiredUnifiedInboundEmailQueueJobsMock(...args),
|
|
}));
|
|
|
|
function buildClaimedJob(provider: 'microsoft' | 'google' | 'imap'): ClaimedUnifiedInboundEmailQueueJob {
|
|
const base = {
|
|
jobId: `job-${provider}-1`,
|
|
schemaVersion: 1 as const,
|
|
tenantId: 'tenant-1',
|
|
providerId: `provider-${provider}-1`,
|
|
enqueuedAt: new Date().toISOString(),
|
|
attempt: 0,
|
|
maxAttempts: 5,
|
|
provider,
|
|
};
|
|
|
|
const job =
|
|
provider === 'microsoft'
|
|
? {
|
|
...base,
|
|
pointer: {
|
|
subscriptionId: 'sub-ms-1',
|
|
messageId: 'ms-msg-1',
|
|
resource: '/users/user/messages/ms-msg-1',
|
|
changeType: 'created',
|
|
},
|
|
}
|
|
: provider === 'google'
|
|
? {
|
|
...base,
|
|
pointer: {
|
|
historyId: '200',
|
|
emailAddress: 'support@example.com',
|
|
pubsubMessageId: 'pubsub-1',
|
|
},
|
|
}
|
|
: {
|
|
...base,
|
|
pointer: {
|
|
mailbox: 'INBOX',
|
|
uid: '300',
|
|
uidValidity: '400',
|
|
messageId: '<imap-msg-1@example.com>',
|
|
},
|
|
};
|
|
|
|
return {
|
|
job,
|
|
originalPayload: JSON.stringify(job),
|
|
consumerId: 'consumer-test-1',
|
|
claimedAt: new Date().toISOString(),
|
|
leaseExpiresAt: new Date(Date.now() + 60_000).toISOString(),
|
|
} as ClaimedUnifiedInboundEmailQueueJob;
|
|
}
|
|
|
|
describe('UnifiedInboundEmailQueueConsumer provider claim/processing flow', () => {
|
|
beforeEach(() => {
|
|
claimUnifiedInboundEmailQueueJobMock.mockReset();
|
|
ackUnifiedInboundEmailQueueJobMock.mockReset();
|
|
failUnifiedInboundEmailQueueJobMock.mockReset();
|
|
reclaimExpiredUnifiedInboundEmailQueueJobsMock.mockReset();
|
|
|
|
reclaimExpiredUnifiedInboundEmailQueueJobsMock.mockResolvedValue(0);
|
|
ackUnifiedInboundEmailQueueJobMock.mockResolvedValue(undefined);
|
|
failUnifiedInboundEmailQueueJobMock.mockResolvedValue({
|
|
action: 'retried',
|
|
attempt: 1,
|
|
queueDepth: 1,
|
|
} as FailUnifiedInboundEmailQueueJobResult);
|
|
});
|
|
|
|
it('T009: consumer claims and processes queued Microsoft pointer jobs', async () => {
|
|
const claim = buildClaimedJob('microsoft');
|
|
claimUnifiedInboundEmailQueueJobMock.mockResolvedValue(claim);
|
|
const handleJobMock = vi.fn(async () => ({ outcome: 'processed' }));
|
|
const consumer = new UnifiedInboundEmailQueueConsumer({ handleJob: handleJobMock });
|
|
|
|
const processed = await consumer.runOnce();
|
|
|
|
expect(processed).toBe(true);
|
|
expect(handleJobMock).toHaveBeenCalledTimes(1);
|
|
expect(handleJobMock).toHaveBeenCalledWith(claim.job);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledTimes(1);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledWith(claim);
|
|
expect(failUnifiedInboundEmailQueueJobMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('T010: consumer claims and processes queued Google pointer jobs', async () => {
|
|
const claim = buildClaimedJob('google');
|
|
claimUnifiedInboundEmailQueueJobMock.mockResolvedValue(claim);
|
|
const handleJobMock = vi.fn(async () => ({ outcome: 'processed' }));
|
|
const consumer = new UnifiedInboundEmailQueueConsumer({ handleJob: handleJobMock });
|
|
|
|
const processed = await consumer.runOnce();
|
|
|
|
expect(processed).toBe(true);
|
|
expect(handleJobMock).toHaveBeenCalledTimes(1);
|
|
expect(handleJobMock).toHaveBeenCalledWith(claim.job);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledTimes(1);
|
|
expect(failUnifiedInboundEmailQueueJobMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('T011: consumer claims and processes queued IMAP pointer jobs', async () => {
|
|
const claim = buildClaimedJob('imap');
|
|
claimUnifiedInboundEmailQueueJobMock.mockResolvedValue(claim);
|
|
const handleJobMock = vi.fn(async () => ({ outcome: 'processed' }));
|
|
const consumer = new UnifiedInboundEmailQueueConsumer({ handleJob: handleJobMock });
|
|
|
|
const processed = await consumer.runOnce();
|
|
|
|
expect(processed).toBe(true);
|
|
expect(handleJobMock).toHaveBeenCalledTimes(1);
|
|
expect(handleJobMock).toHaveBeenCalledWith(claim.job);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledTimes(1);
|
|
expect(failUnifiedInboundEmailQueueJobMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('T019: processing failure does not ACK and routes claim into retry failure handling', async () => {
|
|
const claim = buildClaimedJob('microsoft');
|
|
claimUnifiedInboundEmailQueueJobMock.mockResolvedValue(claim);
|
|
failUnifiedInboundEmailQueueJobMock.mockResolvedValue({
|
|
action: 'retried',
|
|
attempt: 1,
|
|
queueDepth: 2,
|
|
} as FailUnifiedInboundEmailQueueJobResult);
|
|
|
|
const handleJobMock = vi.fn(async () => {
|
|
throw new Error('processor_failed');
|
|
});
|
|
const consumer = new UnifiedInboundEmailQueueConsumer({ handleJob: handleJobMock });
|
|
|
|
const processed = await consumer.runOnce();
|
|
|
|
expect(processed).toBe(false);
|
|
expect(handleJobMock).toHaveBeenCalledTimes(1);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).not.toHaveBeenCalled();
|
|
expect(failUnifiedInboundEmailQueueJobMock).toHaveBeenCalledTimes(1);
|
|
expect(failUnifiedInboundEmailQueueJobMock).toHaveBeenCalledWith({
|
|
claim,
|
|
error: 'processor_failed',
|
|
});
|
|
});
|
|
|
|
it('T024: skipped source-unavailable outcomes are ACKed and do not enter retry loop', async () => {
|
|
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => undefined);
|
|
const claim = buildClaimedJob('imap');
|
|
claimUnifiedInboundEmailQueueJobMock.mockResolvedValue(claim);
|
|
|
|
const handleJobMock = vi.fn(async () => ({
|
|
outcome: 'skipped',
|
|
reason: 'source_unavailable:imap_message_not_found',
|
|
processedCount: 0,
|
|
dedupedCount: 0,
|
|
skippedCount: 1,
|
|
}));
|
|
const consumer = new UnifiedInboundEmailQueueConsumer({ handleJob: handleJobMock });
|
|
|
|
const processed = await consumer.runOnce();
|
|
|
|
expect(processed).toBe(true);
|
|
expect(handleJobMock).toHaveBeenCalledTimes(1);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledTimes(1);
|
|
expect(ackUnifiedInboundEmailQueueJobMock).toHaveBeenCalledWith(claim);
|
|
expect(failUnifiedInboundEmailQueueJobMock).not.toHaveBeenCalled();
|
|
expect(warnSpy).toHaveBeenCalledWith(
|
|
'[UnifiedInboundEmailQueueConsumer] Job skipped',
|
|
expect.objectContaining({
|
|
event: 'inbound_email_queue_skip',
|
|
attempt: 0,
|
|
reason: 'source_unavailable:imap_message_not_found',
|
|
})
|
|
);
|
|
warnSpy.mockRestore();
|
|
});
|
|
});
|