Some checks are pending
Bidi Control Character Guard / bidi-control-guard (push) Waiting to run
Circular Dependency Check / Check for new circular dependencies (push) Waiting to run
Citus Migration Smoke / Combined migrations on single-node Citus (push) Waiting to run
E2E Fresh Install Tests / fresh-install-e2e (push) Waiting to run
ext-v2 guardrails / Run ext-v2 guard and ESLint (push) Waiting to run
Integration Tests / Check for relevant changes (push) Waiting to run
Integration Tests / ${{ (github.event_name == 'schedule' || github.event.inputs.suite == 'full') && 'Full integration suite' || 'Tier-1 integration subset' }} (push) Blocked by required conditions
Mobile checks / Mobile lint + typecheck (push) Waiting to run
Mobile checks / Mobile unit tests (push) Waiting to run
Mobile checks / Mobile dependency audit (report) (push) Waiting to run
Mobile checks / Mobile reproducibility checks (push) Waiting to run
Secrets guard (env backups) / Ensure no tracked env backup files (push) Waiting to run
Temporal Readiness / fast-readiness (push) Waiting to run
Temporal Readiness / docker-parity (push) Waiting to run
TypeScript Type Check / Nx affected typecheck (push) Waiting to run
Unit Tests / Skipped-test budget (push) Waiting to run
Unit Tests / Nx affected unit tests (push) Waiting to run
Unit Tests / Server unit coverage (informational) (push) Waiting to run
Validate Tenant Management Schema / Check for relevant changes (push) Waiting to run
Validate Tenant Management Schema / Validate Tenant Management Schema (push) Blocked by required conditions
EE Workflows Build Guard / ee-workflows-build-guard (push) Waiting to run
Excluded: .git, node_modules, secrets/, compose.env, assemblyscript tgz Source: /opt/alga-psa on psa.joliet.tech
708 lines
26 KiB
TypeScript
708 lines
26 KiB
TypeScript
import axios, { AxiosInstance } from 'axios';
|
|
import { BaseEmailAdapter } from './base/BaseEmailAdapter';
|
|
import { EmailMessageDetails, EmailProviderConfig } from '../../../interfaces/inbound-email.interfaces';
|
|
import { getSecretProviderInstance } from '@alga-psa/core/secrets';
|
|
import { google } from 'googleapis';
|
|
import { OAuth2Client } from 'google-auth-library';
|
|
import { getAdminConnection } from '@alga-psa/db/admin';
|
|
import { parseEmailAddress, parseEmailAddressList } from '../../../lib/email/addressUtils';
|
|
|
|
/**
|
|
* Gmail API adapter for email processing
|
|
* Handles OAuth authentication, Pub/Sub subscriptions, and message retrieval
|
|
*/
|
|
export class GmailAdapter extends BaseEmailAdapter {
|
|
private httpClient: AxiosInstance;
|
|
private baseUrl = 'https://gmail.googleapis.com/gmail/v1';
|
|
private oauth2Client: OAuth2Client;
|
|
private gmail: any;
|
|
|
|
constructor(config: EmailProviderConfig) {
|
|
super(config);
|
|
|
|
// Create axios instance with default headers
|
|
this.httpClient = axios.create({
|
|
baseURL: this.baseUrl,
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
},
|
|
timeout: 30000,
|
|
});
|
|
|
|
// Initialize OAuth2 client (will be configured with credentials later)
|
|
this.oauth2Client = new OAuth2Client();
|
|
this.gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
|
|
|
|
// Add request interceptor to include auth token
|
|
this.httpClient.interceptors.request.use(async (config) => {
|
|
await this.ensureValidToken();
|
|
if (this.accessToken) {
|
|
config.headers.Authorization = `Bearer ${this.accessToken}`;
|
|
}
|
|
return config;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Load stored credentials from the provider configuration
|
|
*/
|
|
protected async loadCredentials(): Promise<void> {
|
|
try {
|
|
const vendorConfig = this.config.provider_config || {};
|
|
|
|
// Check if OAuth tokens are available in provider config
|
|
if (!vendorConfig.access_token || !vendorConfig.refresh_token) {
|
|
throw new Error('OAuth tokens not found in provider configuration. Please complete OAuth authorization.');
|
|
}
|
|
|
|
this.accessToken = vendorConfig.access_token;
|
|
this.refreshToken = vendorConfig.refresh_token;
|
|
this.tokenExpiresAt = vendorConfig.token_expires_at ? new Date(vendorConfig.token_expires_at) : new Date();
|
|
|
|
// Configure OAuth2 client with stored credentials
|
|
this.oauth2Client.setCredentials({
|
|
access_token: this.accessToken,
|
|
refresh_token: this.refreshToken,
|
|
token_type: 'Bearer',
|
|
expiry_date: this.tokenExpiresAt.getTime()
|
|
});
|
|
|
|
this.log('info', 'Credentials loaded successfully from provider configuration');
|
|
} catch (error) {
|
|
throw this.handleError(error, 'loadCredentials');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Refresh the access token using Google OAuth
|
|
*/
|
|
protected async refreshAccessToken(): Promise<void> {
|
|
try {
|
|
if (!this.refreshToken) {
|
|
throw new Error('No refresh token available');
|
|
}
|
|
|
|
const vendorConfig = this.config.provider_config || {};
|
|
|
|
// Get client credentials from provider config, environment, or tenant secrets
|
|
let clientId = vendorConfig.client_id || process.env.GOOGLE_CLIENT_ID;
|
|
let clientSecret = vendorConfig.client_secret || process.env.GOOGLE_CLIENT_SECRET;
|
|
|
|
// Fall back to tenant secrets if not found in config or environment
|
|
if (!clientId || !clientSecret) {
|
|
const secretProvider = await getSecretProviderInstance();
|
|
clientId = clientId || await secretProvider.getTenantSecret(this.config.tenant, 'google_client_id');
|
|
clientSecret = clientSecret || await secretProvider.getTenantSecret(this.config.tenant, 'google_client_secret');
|
|
}
|
|
|
|
if (!clientId || !clientSecret) {
|
|
throw new Error('Google OAuth credentials not configured');
|
|
}
|
|
|
|
// Configure OAuth2 client with app credentials
|
|
this.oauth2Client = new OAuth2Client(clientId, clientSecret);
|
|
this.oauth2Client.setCredentials({
|
|
refresh_token: this.refreshToken
|
|
});
|
|
|
|
// Get new access token
|
|
const { credentials } = await this.oauth2Client.refreshAccessToken();
|
|
|
|
if (!credentials.access_token) {
|
|
throw new Error('Failed to obtain new access token');
|
|
}
|
|
|
|
this.accessToken = credentials.access_token;
|
|
if (credentials.refresh_token) {
|
|
this.refreshToken = credentials.refresh_token;
|
|
}
|
|
|
|
// Calculate expiry with 5-minute buffer
|
|
const expiryTime = credentials.expiry_date
|
|
? new Date(credentials.expiry_date - 300000)
|
|
: new Date(Date.now() + 3300000); // Default to 55 minutes
|
|
|
|
this.tokenExpiresAt = expiryTime;
|
|
|
|
// Update stored credentials
|
|
await this.updateStoredCredentials();
|
|
|
|
// Update gmail client
|
|
this.gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
|
|
|
|
this.log('info', 'Access token refreshed successfully');
|
|
} catch (error) {
|
|
throw this.handleError(error, 'refreshAccessToken');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Update stored credentials with new tokens
|
|
* This method updates both in-memory config and persists changes to the database.
|
|
*/
|
|
private async updateStoredCredentials(): Promise<void> {
|
|
try {
|
|
// Update the provider config with new tokens
|
|
if (this.config.provider_config) {
|
|
this.config.provider_config.access_token = this.accessToken;
|
|
this.config.provider_config.refresh_token = this.refreshToken;
|
|
this.config.provider_config.token_expires_at = this.tokenExpiresAt?.toISOString();
|
|
}
|
|
|
|
this.log('info', 'Updated credentials in provider configuration');
|
|
|
|
// Persist updated credentials to database
|
|
try {
|
|
const knex = await getAdminConnection();
|
|
await knex('google_email_provider_config')
|
|
.where({ tenant: this.config.tenant, email_provider_id: this.config.id })
|
|
.update({
|
|
access_token: this.accessToken,
|
|
refresh_token: this.refreshToken,
|
|
token_expires_at: this.tokenExpiresAt?.toISOString(),
|
|
updated_at: new Date().toISOString()
|
|
});
|
|
|
|
this.log('info', 'Successfully persisted refreshed OAuth tokens to database');
|
|
} catch (dbError: any) {
|
|
this.log('error', `Failed to persist credentials to database: ${dbError.message}`, dbError);
|
|
// Don't throw here - we still have the tokens in memory, so the current operation can continue
|
|
// But log the error so we know there's a persistence issue
|
|
}
|
|
} catch (error) {
|
|
this.log('warn', 'Failed to update stored credentials', error);
|
|
throw error; // Re-throw for the calling method to handle
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Connect to Gmail API
|
|
*/
|
|
async connect(): Promise<void> {
|
|
try {
|
|
await this.loadCredentials();
|
|
await this.testConnection();
|
|
this.log('info', 'Connected to Gmail API successfully');
|
|
} catch (error) {
|
|
throw this.handleError(error, 'connect');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Register webhook subscription using Google Pub/Sub
|
|
*/
|
|
async registerWebhookSubscription(): Promise<void> {
|
|
try {
|
|
const vendorConfig = this.config.provider_config || {};
|
|
const topicName = vendorConfig.pubsub_topic_name;
|
|
const projectId = vendorConfig.project_id;
|
|
|
|
if (!topicName) {
|
|
throw new Error('Pub/Sub topic name not configured');
|
|
}
|
|
|
|
if (!projectId) {
|
|
throw new Error('Google Cloud project ID not configured');
|
|
}
|
|
|
|
console.log('📦 vendorConfig', vendorConfig);
|
|
|
|
// Check if user has completed OAuth authorization
|
|
if (!vendorConfig.access_token || !vendorConfig.refresh_token) {
|
|
const errorMsg = `Gmail watch subscription setup failed: OAuth tokens are missing.
|
|
Expected tokens to be saved after OAuth authorization but found:
|
|
- access_token: ${vendorConfig.access_token ? '[PRESENT]' : '[MISSING]'}
|
|
- refresh_token: ${vendorConfig.refresh_token ? '[PRESENT]' : '[MISSING]'}
|
|
This indicates a problem with the OAuth token saving process.`;
|
|
|
|
this.log('error', errorMsg);
|
|
throw new Error('Gmail OAuth tokens are missing. Please check the OAuth authorization flow.');
|
|
}
|
|
|
|
// Load credentials and ensure valid token
|
|
await this.ensureValidToken();
|
|
|
|
// Determine label filters (user-defined label names only)
|
|
let requestedFilters: string[] = Array.isArray(vendorConfig.label_filters)
|
|
? (vendorConfig.label_filters as string[]).map((s: string) => s?.trim()).filter(Boolean)
|
|
: [];
|
|
// If not present on the in-memory config, attempt to load from DB
|
|
if (requestedFilters.length === 0) {
|
|
try {
|
|
const knex = await getAdminConnection();
|
|
const rec: any = await knex('google_email_provider_config')
|
|
.select('label_filters')
|
|
.where({ tenant: this.config.tenant, email_provider_id: this.config.id })
|
|
.first();
|
|
const fromDb = Array.isArray(rec?.label_filters)
|
|
? rec.label_filters
|
|
: (() => { try { return JSON.parse(rec?.label_filters || '[]'); } catch { return []; } })();
|
|
requestedFilters = (Array.isArray(fromDb) ? fromDb : []).map((s: string) => s?.trim()).filter(Boolean);
|
|
} catch (e: any) {
|
|
this.log('warn', 'Unable to load label_filters from DB; proceeding without label filters', e);
|
|
}
|
|
}
|
|
|
|
// Deduplicate while preserving order
|
|
const uniqueFilters = Array.from(new Set(requestedFilters));
|
|
|
|
// Resolve user label names to IDs (no special-casing of system labels)
|
|
let effectiveLabelIds: string[] = [];
|
|
if (uniqueFilters.length > 0) {
|
|
try {
|
|
const labelsResp = await this.gmail.users.labels.list({ userId: 'me' });
|
|
const allLabels: Array<{ id?: string; name?: string }> = (labelsResp.data.labels as any) || [];
|
|
effectiveLabelIds = uniqueFilters.map(f => allLabels.find(l => l.name === f)?.id).filter((id): id is string => !!id);
|
|
const missing = uniqueFilters.filter(f => !allLabels.find(l => l.name === f)?.id);
|
|
if (missing.length > 0) {
|
|
this.log('warn', `Some Gmail label filters were not found and will be ignored`, { missing });
|
|
}
|
|
} catch (e: any) {
|
|
this.log('warn', `Failed to resolve Gmail labels; proceeding without label filters: ${e?.message || e}`);
|
|
effectiveLabelIds = [];
|
|
}
|
|
}
|
|
|
|
// Build watch request; include label filters only when provided
|
|
const watchBody: any = {
|
|
topicName: `projects/${projectId}/topics/${topicName}`,
|
|
};
|
|
if (effectiveLabelIds.length > 0) {
|
|
watchBody.labelIds = effectiveLabelIds;
|
|
watchBody.labelFilterBehavior = 'include';
|
|
}
|
|
const response = await this.gmail.users.watch({
|
|
userId: 'me',
|
|
requestBody: watchBody
|
|
});
|
|
|
|
console.log('✅ Gmail watch response:', response.data);
|
|
this.log('info', 'Gmail watch configured', { labelFilters: uniqueFilters, effectiveLabelIds });
|
|
|
|
// Store the history ID for tracking changes in provider config
|
|
if (!this.config.provider_config) {
|
|
this.config.provider_config = {};
|
|
}
|
|
this.config.provider_config.history_id = response.data.historyId;
|
|
|
|
// Handle expiration date safely - Gmail API returns expiration as a string timestamp in milliseconds
|
|
let expirationISO: string | null = null;
|
|
if (response.data.expiration) {
|
|
try {
|
|
// Gmail API returns expiration as a string of milliseconds since epoch
|
|
const expirationMs = parseInt(response.data.expiration, 10);
|
|
if (!isNaN(expirationMs) && expirationMs > 0) {
|
|
expirationISO = new Date(expirationMs).toISOString();
|
|
}
|
|
} catch (err) {
|
|
this.log('warn', `Failed to parse expiration date: ${response.data.expiration}`, err);
|
|
}
|
|
}
|
|
|
|
this.config.provider_config.watch_expiration = expirationISO || undefined;
|
|
|
|
// Save updated history_id and watch_expiration to database
|
|
try {
|
|
const knex = await getAdminConnection();
|
|
await knex('google_email_provider_config')
|
|
.where({ tenant: this.config.tenant, email_provider_id: this.config.id })
|
|
.update({
|
|
history_id: response.data.historyId,
|
|
watch_expiration: expirationISO,
|
|
updated_at: new Date().toISOString()
|
|
});
|
|
this.log('info', 'Updated database with new watch subscription details');
|
|
} catch (dbError: any) {
|
|
this.log('error', 'Failed to update database with watch subscription details', dbError);
|
|
// Continue execution - the watch subscription is still valid even if DB update fails
|
|
}
|
|
|
|
this.log('info', `Gmail watch created with historyId: ${response.data.historyId}, expiration: ${expirationISO}`);
|
|
} catch (error) {
|
|
throw this.handleError(error, 'registerWebhookSubscription');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Renew webhook subscription
|
|
*/
|
|
async renewWebhookSubscription(): Promise<void> {
|
|
try {
|
|
// Load credentials and ensure valid token
|
|
await this.ensureValidToken();
|
|
|
|
// Stop existing watch subscription first
|
|
try {
|
|
// await this.gmail.users.stop({ userId: 'me' });
|
|
this.log('info', 'Stopped existing Gmail watch subscription');
|
|
} catch (error: any) {
|
|
// It's okay if there's no existing watch to stop
|
|
this.log('warn', `No existing watch to stop: ${error.message}`);
|
|
}
|
|
|
|
// Create new watch subscription
|
|
await this.registerWebhookSubscription();
|
|
|
|
this.log('info', 'Successfully renewed Gmail watch subscription');
|
|
} catch (error) {
|
|
this.log('error', 'Failed to renew Gmail watch subscription', error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Process webhook notification from Google Pub/Sub
|
|
*/
|
|
async processWebhookNotification(payload: any): Promise<string[]> {
|
|
try {
|
|
const messageIds: string[] = [];
|
|
const vendorConfig = this.config.provider_config || {};
|
|
|
|
// Extract historyId from the notification
|
|
const historyId = payload.historyId;
|
|
const lastHistoryId = this.config.provider_config?.history_id;
|
|
|
|
if (!historyId || !lastHistoryId) {
|
|
this.log('warn', 'Missing history ID in webhook notification');
|
|
return messageIds;
|
|
}
|
|
|
|
// Get history of changes since last known historyId
|
|
const history = await this.gmail.users.history.list({
|
|
userId: 'me',
|
|
startHistoryId: lastHistoryId,
|
|
historyTypes: ['messageAdded'],
|
|
labelId: 'INBOX'
|
|
});
|
|
|
|
if (history.data.history) {
|
|
for (const record of history.data.history) {
|
|
if (record.messagesAdded) {
|
|
for (const msg of record.messagesAdded) {
|
|
messageIds.push(msg.message.id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update last known historyId
|
|
if (history.data.historyId) {
|
|
if (!this.config.provider_config) {
|
|
this.config.provider_config = {};
|
|
}
|
|
this.config.provider_config.history_id = history.data.historyId;
|
|
}
|
|
|
|
return messageIds;
|
|
} catch (error) {
|
|
throw this.handleError(error, 'processWebhookNotification');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* List Gmail message IDs added since a given historyId
|
|
*/
|
|
async listMessagesSince(startHistoryId: string): Promise<string[]> {
|
|
try {
|
|
await this.ensureValidToken();
|
|
const messageIds: string[] = [];
|
|
|
|
let pageToken: string | undefined = undefined;
|
|
let lastHistoryId = startHistoryId;
|
|
|
|
do {
|
|
const historyResp: any = await this.gmail.users.history.list({
|
|
userId: 'me',
|
|
startHistoryId: startHistoryId, // Use original startHistoryId for pagination consistency
|
|
historyTypes: ['messageAdded'],
|
|
// labelId: 'INBOX', // Removed to allow processing of all incoming messages (even if archived/filtered)
|
|
pageToken,
|
|
});
|
|
|
|
if (historyResp.data.history) {
|
|
for (const record of historyResp.data.history) {
|
|
if (record.messagesAdded) {
|
|
for (const msg of record.messagesAdded) {
|
|
if (msg.message?.id) {
|
|
messageIds.push(msg.message.id);
|
|
}
|
|
}
|
|
}
|
|
// Track the most recent historyId seen to update our cursor
|
|
if (record.id) {
|
|
lastHistoryId = record.id;
|
|
}
|
|
}
|
|
}
|
|
|
|
pageToken = historyResp.data.nextPageToken || undefined;
|
|
|
|
// Update stored last historyId if API returned a newer one
|
|
const newHistoryId = historyResp.data.historyId || lastHistoryId;
|
|
if (!this.config.provider_config) {
|
|
this.config.provider_config = {};
|
|
}
|
|
this.config.provider_config.history_id = newHistoryId;
|
|
} while (pageToken);
|
|
|
|
return Array.from(new Set(messageIds));
|
|
} catch (error) {
|
|
const gmailNotFound = this.isHistoryIdNotFoundError(error);
|
|
if (gmailNotFound) {
|
|
const axiosError = error as any;
|
|
await this.attemptWatchRecovery(startHistoryId);
|
|
const historyError = new Error('Gmail history_id is no longer valid. Request a resync and establish a new watch.');
|
|
(historyError as any).code = 'gmail.historyIdNotFound';
|
|
(historyError as any).status = 404;
|
|
(historyError as any).responseBody = axiosError?.response?.data;
|
|
(historyError as any).requestId = axiosError?.response?.headers?.['request-id'] || axiosError?.response?.headers?.['client-request-id'];
|
|
this.log('warn', 'Gmail history_id rejected by API; downstream should reset cursor and re-register watch.', {
|
|
providerId: this.config.id,
|
|
attemptedHistoryId: startHistoryId
|
|
});
|
|
throw historyError;
|
|
}
|
|
throw this.handleError(error, 'listMessagesSince');
|
|
}
|
|
}
|
|
|
|
private isHistoryIdNotFoundError(error: any): boolean {
|
|
if (!error) return false;
|
|
const status = error?.response?.status || error?.status;
|
|
if (status !== 404) return false;
|
|
|
|
const errorBody = error?.response?.data?.error || {};
|
|
const reason = Array.isArray(errorBody?.errors) ? errorBody.errors.find((e: any) => e?.reason)?.reason : undefined;
|
|
const message: string = errorBody?.message || error?.message || '';
|
|
const matchedReason = reason === 'notFound';
|
|
const matchedStatus = (errorBody?.status || '').toUpperCase() === 'NOT_FOUND';
|
|
const matchedMessage = typeof message === 'string' && message.toLowerCase().includes('requested entity was not found');
|
|
return Boolean(matchedReason || matchedStatus || matchedMessage);
|
|
}
|
|
|
|
private async attemptWatchRecovery(startHistoryId: string): Promise<void> {
|
|
try {
|
|
this.log('info', 'Attempting to recreate Gmail watch after history_id invalidation', {
|
|
providerId: this.config.id,
|
|
rejectedHistoryId: startHistoryId
|
|
});
|
|
await this.registerWebhookSubscription();
|
|
this.log('info', 'Gmail watch recreated successfully after history_id invalidation');
|
|
} catch (recoveryError: any) {
|
|
this.log('error', 'Failed to recreate Gmail watch after history_id invalidation', recoveryError);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Mark a message as processed (READ-ONLY MODE: No-op)
|
|
* Note: This system now operates in read-only mode and does not modify emails.
|
|
* Email processing status is tracked in the database instead.
|
|
*/
|
|
async markMessageProcessed(messageId: string): Promise<void> {
|
|
this.log('info', `Email ${messageId} processed (read-only mode - not adding labels in mailbox)`);
|
|
// No API call made - operating in read-only mode
|
|
}
|
|
|
|
/**
|
|
* Get detailed information about a specific email message
|
|
*/
|
|
async getMessageDetails(messageId: string): Promise<EmailMessageDetails> {
|
|
try {
|
|
const message = await this.gmail.users.messages.get({
|
|
userId: 'me',
|
|
id: messageId,
|
|
format: 'full'
|
|
});
|
|
|
|
// Skip Drafts and Sent messages
|
|
const labelIds = message.data.labelIds || [];
|
|
if (labelIds.includes('DRAFT') || labelIds.includes('SENT')) {
|
|
throw new Error('Message is a DRAFT/SENT type, skipping');
|
|
}
|
|
|
|
const headers = message.data.payload?.headers || [];
|
|
const getHeader = (name: string) =>
|
|
headers.find((h: any) => h.name.toLowerCase() === name.toLowerCase())?.value || '';
|
|
|
|
// Extract body content
|
|
let bodyContent = '';
|
|
let htmlContent = '';
|
|
|
|
const extractBody = (parts: any[]): void => {
|
|
for (const part of parts) {
|
|
if (part.mimeType === 'text/plain' && part.body?.data) {
|
|
bodyContent = Buffer.from(part.body.data, 'base64').toString();
|
|
} else if (part.mimeType === 'text/html' && part.body?.data) {
|
|
htmlContent = Buffer.from(part.body.data, 'base64').toString();
|
|
} else if (part.parts) {
|
|
extractBody(part.parts);
|
|
}
|
|
}
|
|
};
|
|
|
|
if (message.data.payload?.parts) {
|
|
extractBody(message.data.payload.parts);
|
|
} else if (message.data.payload?.body?.data) {
|
|
bodyContent = Buffer.from(message.data.payload.body.data, 'base64').toString();
|
|
}
|
|
|
|
// Extract attachments
|
|
const attachments: any[] = [];
|
|
const extractAttachments = (parts: any[]): void => {
|
|
for (const part of parts) {
|
|
if (part.filename && part.body?.attachmentId) {
|
|
const partHeaders = part.headers || [];
|
|
const getPartHeader = (name: string) =>
|
|
partHeaders.find((h: any) => String(h.name || '').toLowerCase() === name.toLowerCase())?.value || '';
|
|
|
|
const contentDisposition = String(getPartHeader('Content-Disposition') || '').toLowerCase();
|
|
const isInline = contentDisposition.includes('inline');
|
|
|
|
const rawContentId = String(getPartHeader('Content-ID') || '').trim();
|
|
const contentId = rawContentId ? rawContentId.replace(/^<|>$/g, '') : undefined;
|
|
|
|
attachments.push({
|
|
filename: part.filename,
|
|
mimeType: part.mimeType,
|
|
size: part.body.size,
|
|
attachmentId: part.body.attachmentId,
|
|
contentId,
|
|
isInline
|
|
});
|
|
} else if (part.parts) {
|
|
extractAttachments(part.parts);
|
|
}
|
|
}
|
|
};
|
|
|
|
if (message.data.payload?.parts) {
|
|
extractAttachments(message.data.payload.parts);
|
|
}
|
|
|
|
const fromEmail = getHeader('From') || '';
|
|
const toEmails = getHeader('To') || '';
|
|
const ccEmails = getHeader('Cc') || '';
|
|
const parsedFrom = parseEmailAddress(fromEmail);
|
|
const parsedTo = parseEmailAddressList(toEmails);
|
|
const parsedCc = parseEmailAddressList(ccEmails);
|
|
|
|
return {
|
|
id: message.data.id!,
|
|
provider: 'google' as const,
|
|
providerId: this.config.id,
|
|
tenant: this.config.tenant,
|
|
receivedAt: getHeader('Date') || new Date().toISOString(),
|
|
from: {
|
|
email: parsedFrom?.email || fromEmail.trim(),
|
|
name: parsedFrom?.name
|
|
},
|
|
to: parsedTo,
|
|
cc: parsedCc.length > 0 ? parsedCc : undefined,
|
|
subject: getHeader('Subject') || '',
|
|
body: {
|
|
text: bodyContent,
|
|
html: htmlContent
|
|
},
|
|
attachments: attachments.map(att => ({
|
|
id: att.attachmentId,
|
|
name: att.filename,
|
|
size: att.size,
|
|
contentType: att.mimeType,
|
|
contentId: att.contentId,
|
|
isInline: att.isInline
|
|
})),
|
|
headers: headers.reduce((acc: any, header: any) => {
|
|
acc[header.name] = header.value;
|
|
return acc;
|
|
}, {})
|
|
};
|
|
} catch (error) {
|
|
throw this.handleError(error, 'getMessageDetails');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Download attachment bytes for a Gmail message.
|
|
*
|
|
* Gmail returns attachment payload as base64url in `data`.
|
|
*/
|
|
async downloadAttachmentBytes(messageId: string, attachmentId: string): Promise<Buffer> {
|
|
try {
|
|
await this.ensureValidToken();
|
|
const res = await this.gmail.users.messages.attachments.get({
|
|
userId: 'me',
|
|
messageId,
|
|
id: attachmentId,
|
|
});
|
|
|
|
const raw: string | undefined = res?.data?.data;
|
|
if (!raw) {
|
|
throw new Error('Attachment data missing');
|
|
}
|
|
|
|
const base64 = raw.replace(/-/g, '+').replace(/_/g, '/').padEnd(Math.ceil(raw.length / 4) * 4, '=');
|
|
return Buffer.from(base64, 'base64');
|
|
} catch (error) {
|
|
throw this.handleError(error, 'downloadAttachmentBytes');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Download full RFC822 source bytes for a Gmail message.
|
|
*
|
|
* Gmail returns raw MIME as base64url in `raw` when using format=raw.
|
|
*/
|
|
async downloadMessageSource(messageId: string): Promise<Buffer> {
|
|
try {
|
|
await this.ensureValidToken();
|
|
const res = await this.gmail.users.messages.get({
|
|
userId: 'me',
|
|
id: messageId,
|
|
format: 'raw',
|
|
});
|
|
|
|
const raw: string | undefined = res?.data?.raw || undefined;
|
|
if (!raw) {
|
|
throw new Error('Message raw MIME missing');
|
|
}
|
|
|
|
const base64 = raw.replace(/-/g, '+').replace(/_/g, '/').padEnd(Math.ceil(raw.length / 4) * 4, '=');
|
|
return Buffer.from(base64, 'base64');
|
|
} catch (error) {
|
|
throw this.handleError(error, 'downloadMessageSource');
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Test the connection to Gmail API
|
|
*/
|
|
async testConnection(): Promise<{ success: boolean; error?: string; }> {
|
|
try {
|
|
// Try to get the user's profile
|
|
const profile = await this.gmail.users.getProfile({ userId: 'me' });
|
|
|
|
if (profile.data.emailAddress !== this.config.mailbox) {
|
|
return {
|
|
success: false,
|
|
error: `Email mismatch: expected ${this.config.mailbox}, got ${profile.data.emailAddress}`
|
|
};
|
|
}
|
|
|
|
return { success: true };
|
|
} catch (error: any) {
|
|
return {
|
|
success: false,
|
|
error: error.message || 'Failed to connect to Gmail API'
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Disconnect from Gmail API
|
|
*/
|
|
async disconnect(): Promise<void> {
|
|
// Gmail doesn't require explicit disconnect
|
|
this.log('info', 'Disconnected from Gmail API');
|
|
}
|
|
}
|