mirror of
https://github.com/twentyhq/twenty
synced 2026-04-21 13:37:22 +00:00
Simplify IMAP implementation (#16295)
This commit is contained in:
parent
4a94ad1003
commit
e112eb4f69
22 changed files with 506 additions and 853 deletions
|
|
@ -153,6 +153,7 @@
|
|||
"pg": "8.12.0",
|
||||
"planer": "1.2.0",
|
||||
"pluralize": "8.0.0",
|
||||
"postal-mime": "^2.6.1",
|
||||
"psl": "^1.9.0",
|
||||
"react": "18.3.1",
|
||||
"react-dom": "18.3.1",
|
||||
|
|
|
|||
|
|
@ -10,17 +10,14 @@ import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects
|
|||
import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module';
|
||||
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
|
||||
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
|
||||
import { ImapFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-fetch-by-batch.service';
|
||||
import { ImapFindSentFolderService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-find-sent-folder.service';
|
||||
import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service';
|
||||
import { ImapGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-messages.service';
|
||||
import { ImapIncrementalSyncService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-incremental-sync.service';
|
||||
import { ImapMessageFetcherService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-fetcher.service';
|
||||
import { ImapMessageListFetchErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-list-fetch-error-handler.service';
|
||||
import { ImapMessageProcessorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
|
||||
import { ImapMessagesImportErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-messages-import-error-handler.service';
|
||||
import { ImapMessageParserService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-parser.service';
|
||||
import { ImapMessageTextExtractorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-text-extractor.service';
|
||||
import { ImapNetworkErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-network-error-handler.service';
|
||||
import { ImapMessagesImportErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-messages-import-error-handler.service';
|
||||
import { ImapSyncService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-sync.service';
|
||||
import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module';
|
||||
|
||||
@Module({
|
||||
|
|
@ -36,15 +33,12 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p
|
|||
],
|
||||
providers: [
|
||||
ImapClientProvider,
|
||||
ImapFetchByBatchService,
|
||||
ImapGetMessagesService,
|
||||
ImapGetMessageListService,
|
||||
ImapNetworkErrorHandler,
|
||||
ImapMessageListFetchErrorHandler,
|
||||
ImapMessagesImportErrorHandler,
|
||||
ImapIncrementalSyncService,
|
||||
ImapMessageFetcherService,
|
||||
ImapMessageProcessorService,
|
||||
ImapSyncService,
|
||||
ImapMessageParserService,
|
||||
ImapFindSentFolderService,
|
||||
ImapMessageTextExtractorService,
|
||||
],
|
||||
|
|
|
|||
|
|
@ -21,8 +21,6 @@ export class ImapClientProvider {
|
|||
private static readonly RETRY_DELAY_MS = 1000;
|
||||
private static readonly CONNECTION_TIMEOUT_MS = 30000;
|
||||
|
||||
constructor() {}
|
||||
|
||||
async getClient(
|
||||
connectedAccount: ConnectedAccountIdentifier,
|
||||
): Promise<ImapFlow> {
|
||||
|
|
|
|||
|
|
@ -1,82 +0,0 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
|
||||
import {
|
||||
ImapMessageProcessorService,
|
||||
type MessageFetchResult,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
|
||||
|
||||
type ConnectedAccount = Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'id' | 'provider' | 'handle' | 'handleAliases' | 'connectionParameters'
|
||||
>;
|
||||
|
||||
type FetchAllResult = {
|
||||
results: MessageFetchResult[];
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class ImapFetchByBatchService {
|
||||
private readonly logger = new Logger(ImapFetchByBatchService.name);
|
||||
|
||||
constructor(
|
||||
private readonly imapClientProvider: ImapClientProvider,
|
||||
private readonly imapMessageProcessorService: ImapMessageProcessorService,
|
||||
) {}
|
||||
|
||||
async fetchAllByBatches(
|
||||
uids: number[],
|
||||
connectedAccount: ConnectedAccount,
|
||||
folder: string,
|
||||
): Promise<FetchAllResult> {
|
||||
const batchLimit = 20;
|
||||
const results: MessageFetchResult[] = [];
|
||||
|
||||
this.logger.log(
|
||||
`Starting optimized batch fetch for ${uids.length} messages from folder ${folder}`,
|
||||
);
|
||||
|
||||
const client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
|
||||
try {
|
||||
for (let i = 0; i < uids.length; i += batchLimit) {
|
||||
const batchUids = uids.slice(i, i + batchLimit);
|
||||
|
||||
try {
|
||||
const batchResult =
|
||||
await this.imapMessageProcessorService.processMessagesByUidsInFolder(
|
||||
batchUids,
|
||||
folder,
|
||||
client,
|
||||
);
|
||||
|
||||
results.push(...batchResult);
|
||||
|
||||
this.logger.log(
|
||||
`Fetched batch ${Math.floor(i / batchLimit) + 1}/${Math.ceil(uids.length / batchLimit)} (${batchUids.length} messages)`,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Batch fetch failed for batch starting at index ${i}: ${error.message}`,
|
||||
);
|
||||
|
||||
const errorResults =
|
||||
this.imapMessageProcessorService.createErrorResults(
|
||||
batchUids,
|
||||
folder,
|
||||
error as Error,
|
||||
);
|
||||
|
||||
results.push(...errorResults);
|
||||
}
|
||||
}
|
||||
|
||||
return { results };
|
||||
} finally {
|
||||
if (client) {
|
||||
await this.imapClientProvider.closeClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,6 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type ImapFlow } from 'imapflow';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { type MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import {
|
||||
|
|
@ -9,185 +8,121 @@ import {
|
|||
MessageImportDriverExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
|
||||
import { ImapIncrementalSyncService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-incremental-sync.service';
|
||||
import { ImapMessageListFetchErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-list-fetch-error-handler.service';
|
||||
import { ImapSyncService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-sync.service';
|
||||
import { createSyncCursor } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/create-sync-cursor.util';
|
||||
import { extractMailboxState } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/extract-mailbox-state.util';
|
||||
import {
|
||||
ImapSyncCursor,
|
||||
parseSyncCursor,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-sync-cursor.util';
|
||||
import { parseSyncCursor } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-sync-cursor.util';
|
||||
import { type GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type';
|
||||
import {
|
||||
type GetMessageListsResponse,
|
||||
type GetOneMessageListResponse,
|
||||
} from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
|
||||
|
||||
type MessageFolder = Pick<
|
||||
MessageFolderWorkspaceEntity,
|
||||
'id' | 'name' | 'syncCursor' | 'externalId'
|
||||
>;
|
||||
|
||||
@Injectable()
|
||||
export class ImapGetMessageListService {
|
||||
private readonly logger = new Logger(ImapGetMessageListService.name);
|
||||
|
||||
constructor(
|
||||
private readonly imapClientProvider: ImapClientProvider,
|
||||
private readonly imapIncrementalSyncService: ImapIncrementalSyncService,
|
||||
private readonly imapMessageListFetchErrorHandler: ImapMessageListFetchErrorHandler,
|
||||
private readonly imapSyncService: ImapSyncService,
|
||||
private readonly errorHandler: ImapMessageListFetchErrorHandler,
|
||||
) {}
|
||||
|
||||
public async getMessageLists({
|
||||
async getMessageLists({
|
||||
connectedAccount,
|
||||
messageFolders,
|
||||
}: GetMessageListsArgs): Promise<GetMessageListsResponse> {
|
||||
let client: ImapFlow | null = null;
|
||||
const client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
|
||||
try {
|
||||
client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
const result: GetMessageListsResponse = [];
|
||||
const results: GetMessageListsResponse = [];
|
||||
|
||||
for (const folder of messageFolders) {
|
||||
this.logger.log(`Processing folder: ${folder.name}`);
|
||||
const response = await this.getMessageList(client, folder);
|
||||
|
||||
const folderPath = folder.externalId?.split(':')[0];
|
||||
|
||||
if (!folderPath) {
|
||||
this.logger.warn(`Folder ${folder.name} has no path. Skipping.`);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await this.getMessageList(
|
||||
client,
|
||||
folderPath,
|
||||
folder,
|
||||
);
|
||||
|
||||
result.push({
|
||||
...response,
|
||||
folderId: folder.id,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Error fetching from folder ${folder.name}: ${error.message}. Continuing with other folders.`,
|
||||
);
|
||||
|
||||
result.push({
|
||||
messageExternalIds: [],
|
||||
nextSyncCursor: folder.syncCursor || '',
|
||||
previousSyncCursor: folder.syncCursor || '',
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: folder.id,
|
||||
});
|
||||
}
|
||||
results.push({ ...response, folderId: folder.id });
|
||||
}
|
||||
|
||||
return result;
|
||||
return results;
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error getting message list: ${error.message}`,
|
||||
error.stack,
|
||||
`Connected account ${connectedAccount.id}: Error fetching message list: ${error.message}`,
|
||||
);
|
||||
|
||||
this.imapMessageListFetchErrorHandler.handleError(error);
|
||||
|
||||
return messageFolders.map((folder) => ({
|
||||
messageExternalIds: [],
|
||||
nextSyncCursor: folder.syncCursor || '',
|
||||
previousSyncCursor: folder.syncCursor || '',
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: folder.id,
|
||||
}));
|
||||
this.errorHandler.handleError(error);
|
||||
throw error;
|
||||
} finally {
|
||||
if (client) {
|
||||
await this.imapClientProvider.closeClient(client);
|
||||
}
|
||||
await this.imapClientProvider.closeClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
public async getMessageList(
|
||||
private async getMessageList(
|
||||
client: ImapFlow,
|
||||
folder: string,
|
||||
messageFolder: Pick<MessageFolderWorkspaceEntity, 'syncCursor'>,
|
||||
folder: MessageFolder,
|
||||
): Promise<GetOneMessageListResponse> {
|
||||
if (!isDefined(messageFolder.syncCursor)) {
|
||||
const folderPath = folder.externalId?.split(':')[0];
|
||||
|
||||
if (!folderPath) {
|
||||
throw new MessageImportDriverException(
|
||||
'Message folder sync cursor is required',
|
||||
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
|
||||
`Folder ${folder.name} has no path`,
|
||||
MessageImportDriverExceptionCode.NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
const { messages, messageExternalUidsToDelete, syncCursor } =
|
||||
await this.getMessagesFromFolder(
|
||||
client,
|
||||
folder,
|
||||
messageFolder.syncCursor,
|
||||
);
|
||||
this.logger.log(`Processing folder: ${folder.name}`);
|
||||
|
||||
messages.sort((a, b) => b.uid - a.uid);
|
||||
|
||||
const messageExternalIds = messages.map(
|
||||
(message) => `${folder}:${message.uid.toString()}`,
|
||||
);
|
||||
|
||||
return {
|
||||
messageExternalIds,
|
||||
nextSyncCursor: JSON.stringify(syncCursor),
|
||||
previousSyncCursor: messageFolder.syncCursor || '',
|
||||
messageExternalIdsToDelete: messageExternalUidsToDelete.map((uid) =>
|
||||
uid.toString(),
|
||||
),
|
||||
folderId: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
private async getMessagesFromFolder(
|
||||
client: ImapFlow,
|
||||
folder: string,
|
||||
cursor?: string,
|
||||
): Promise<{
|
||||
messages: { uid: number }[];
|
||||
messageExternalUidsToDelete: number[];
|
||||
syncCursor: ImapSyncCursor;
|
||||
}> {
|
||||
let lock;
|
||||
const lock = await client.getMailboxLock(folderPath);
|
||||
|
||||
try {
|
||||
lock = await client.getMailboxLock(folder);
|
||||
const mailbox = client.mailbox!;
|
||||
const mailbox = client.mailbox;
|
||||
|
||||
if (typeof mailbox === 'boolean') {
|
||||
throw new Error(`Invalid mailbox state for folder ${folder}`);
|
||||
if (!mailbox || typeof mailbox === 'boolean') {
|
||||
throw new MessageImportDriverException(
|
||||
`Invalid mailbox state for folder ${folderPath}`,
|
||||
MessageImportDriverExceptionCode.UNKNOWN,
|
||||
);
|
||||
}
|
||||
|
||||
const mailboxState = extractMailboxState(mailbox);
|
||||
const previousCursor = parseSyncCursor(cursor);
|
||||
const previousCursor = parseSyncCursor(folder.syncCursor);
|
||||
|
||||
const { messages, messageExternalUidsToDelete } =
|
||||
await this.imapIncrementalSyncService.syncMessages(
|
||||
client,
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
folder,
|
||||
);
|
||||
|
||||
const newSyncCursor = createSyncCursor(
|
||||
messages,
|
||||
const { messageUids } = await this.imapSyncService.syncFolder(
|
||||
client,
|
||||
folderPath,
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
);
|
||||
|
||||
return {
|
||||
messages,
|
||||
messageExternalUidsToDelete,
|
||||
syncCursor: newSyncCursor,
|
||||
};
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`Error fetching from folder ${folder}: ${err.message}`,
|
||||
err.stack,
|
||||
const nextCursor = createSyncCursor(
|
||||
messageUids.map((uid) => ({ uid })),
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
);
|
||||
|
||||
throw err;
|
||||
const messageExternalIds = messageUids
|
||||
.sort((a, b) => b - a)
|
||||
.map((uid) => `${folderPath}:${uid}`);
|
||||
|
||||
return {
|
||||
messageExternalIds,
|
||||
messageExternalIdsToDelete: [],
|
||||
nextSyncCursor: JSON.stringify(nextCursor),
|
||||
previousSyncCursor: folder.syncCursor,
|
||||
folderId: folder.id,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error syncing folder ${folder.name}: ${error.message}`,
|
||||
);
|
||||
this.errorHandler.handleError(error);
|
||||
throw error;
|
||||
} finally {
|
||||
if (lock) lock.release();
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,22 +1,21 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type AddressObject, type ParsedMail } from 'mailparser';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
import { type ImapFlow } from 'imapflow';
|
||||
import { Address, type Email as ParsedMail } from 'postal-mime';
|
||||
|
||||
import { type ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { computeMessageDirection } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util';
|
||||
import { ImapFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-fetch-by-batch.service';
|
||||
import { type MessageFetchResult } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
|
||||
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
|
||||
import { ImapMessageParserService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-parser.service';
|
||||
import { ImapMessageTextExtractorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-text-extractor.service';
|
||||
import { ImapMessagesImportErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-messages-import-error-handler.service';
|
||||
import { parseMessageId } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-message-id.util';
|
||||
import { type EmailAddress } from 'src/modules/messaging/message-import-manager/types/email-address';
|
||||
import { type MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
|
||||
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
|
||||
import { sanitizeString } from 'src/modules/messaging/message-import-manager/utils/sanitize-string.util';
|
||||
|
||||
type AddressType = 'from' | 'to' | 'cc' | 'bcc';
|
||||
|
||||
type ConnectedAccountType = Pick<
|
||||
type ConnectedAccount = Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'id' | 'provider' | 'handle' | 'handleAliases' | 'connectionParameters'
|
||||
>;
|
||||
|
|
@ -26,221 +25,214 @@ export class ImapGetMessagesService {
|
|||
private readonly logger = new Logger(ImapGetMessagesService.name);
|
||||
|
||||
constructor(
|
||||
private readonly fetchByBatchService: ImapFetchByBatchService,
|
||||
private readonly messageTextExtractor: ImapMessageTextExtractorService,
|
||||
private readonly imapClientProvider: ImapClientProvider,
|
||||
private readonly messageParser: ImapMessageParserService,
|
||||
private readonly textExtractor: ImapMessageTextExtractorService,
|
||||
private readonly errorHandler: ImapMessagesImportErrorHandler,
|
||||
) {}
|
||||
|
||||
async getMessages(
|
||||
messageIds: string[],
|
||||
connectedAccount: ConnectedAccountType,
|
||||
messageExternalIds: string[],
|
||||
connectedAccount: ConnectedAccount,
|
||||
): Promise<MessageWithParticipants[]> {
|
||||
if (!messageIds.length) {
|
||||
if (!messageExternalIds.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const folderToUidsMap = this.groupMessageIdsByFolder(messageIds);
|
||||
const messagesByFolder = this.groupByFolder(messageExternalIds);
|
||||
const client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
|
||||
const allMessages: MessageWithParticipants[] = [];
|
||||
try {
|
||||
const messages = await this.fetchFromAllFolders(
|
||||
messagesByFolder,
|
||||
client,
|
||||
connectedAccount,
|
||||
);
|
||||
|
||||
for (const [folder, uids] of folderToUidsMap.entries()) {
|
||||
if (!uids.length) {
|
||||
return messages;
|
||||
} finally {
|
||||
await this.imapClientProvider.closeClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
private groupByFolder(messageExternalIds: string[]): Map<string, number[]> {
|
||||
const messagesByFolder = new Map<string, number[]>();
|
||||
|
||||
for (const externalId of messageExternalIds) {
|
||||
const parsed = parseMessageId(externalId);
|
||||
|
||||
if (!parsed) {
|
||||
this.logger.warn(`Invalid message external ID format: ${externalId}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const { results } = await this.fetchByBatchService.fetchAllByBatches(
|
||||
uids,
|
||||
const uids = messagesByFolder.get(parsed.folder) ?? [];
|
||||
|
||||
uids.push(parsed.uid);
|
||||
messagesByFolder.set(parsed.folder, uids);
|
||||
}
|
||||
|
||||
return messagesByFolder;
|
||||
}
|
||||
|
||||
private async fetchFromAllFolders(
|
||||
messagesByFolder: Map<string, number[]>,
|
||||
client: ImapFlow,
|
||||
connectedAccount: ConnectedAccount,
|
||||
): Promise<MessageWithParticipants[]> {
|
||||
const allMessages: MessageWithParticipants[] = [];
|
||||
|
||||
for (const [folderPath, messageUids] of messagesByFolder) {
|
||||
if (!messageUids.length) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const folderMessages = await this.fetchFromFolder(
|
||||
folderPath,
|
||||
messageUids,
|
||||
client,
|
||||
connectedAccount,
|
||||
folder,
|
||||
);
|
||||
|
||||
this.logger.log(`IMAP fetch completed for folder: ${folder}`);
|
||||
|
||||
const messages = this.formatBatchResponseAsMessages(
|
||||
results,
|
||||
connectedAccount,
|
||||
folder,
|
||||
);
|
||||
|
||||
allMessages.push(...messages);
|
||||
allMessages.push(...folderMessages);
|
||||
}
|
||||
|
||||
return allMessages;
|
||||
}
|
||||
|
||||
private groupMessageIdsByFolder(messageIds: string[]): Map<string, number[]> {
|
||||
const folderToUidsMap = new Map<string, number[]>();
|
||||
private async fetchFromFolder(
|
||||
folderPath: string,
|
||||
messageUids: number[],
|
||||
client: ImapFlow,
|
||||
connectedAccount: ConnectedAccount,
|
||||
): Promise<MessageWithParticipants[]> {
|
||||
this.logger.log(
|
||||
`Fetching ${messageUids.length} messages from ${folderPath}`,
|
||||
);
|
||||
const startTime = Date.now();
|
||||
|
||||
for (const messageId of messageIds) {
|
||||
const parsedMessageId = parseMessageId(messageId);
|
||||
const results = await this.messageParser.parseMessagesFromFolder(
|
||||
messageUids,
|
||||
folderPath,
|
||||
client,
|
||||
);
|
||||
|
||||
if (!parsedMessageId) {
|
||||
this.logger.warn(`Invalid messageId format: ${messageId}`);
|
||||
const messages: MessageWithParticipants[] = [];
|
||||
|
||||
for (const result of results) {
|
||||
if (result.error) {
|
||||
this.errorHandler.handleError(
|
||||
result.error,
|
||||
`${folderPath}:${result.uid}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const { folder, uid } = parsedMessageId;
|
||||
|
||||
if (!folderToUidsMap.has(folder)) {
|
||||
folderToUidsMap.set(folder, []);
|
||||
}
|
||||
folderToUidsMap.get(folder)!.push(uid);
|
||||
}
|
||||
|
||||
return folderToUidsMap;
|
||||
}
|
||||
|
||||
private formatBatchResponseAsMessages(
|
||||
batchResults: MessageFetchResult[],
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'handle' | 'handleAliases'
|
||||
>,
|
||||
folder: string,
|
||||
): MessageWithParticipants[] {
|
||||
const messages = batchResults.map((result) => {
|
||||
if (!result.parsed) {
|
||||
this.logger.warn(
|
||||
`Message UID ${result.uid} could not be parsed - likely not found in current folders`,
|
||||
`Message UID ${result.uid} could not be parsed - likely deleted`,
|
||||
);
|
||||
|
||||
return undefined;
|
||||
continue;
|
||||
}
|
||||
|
||||
return this.createMessageFromParsedMail(
|
||||
result.parsed,
|
||||
result.uid.toString(),
|
||||
connectedAccount,
|
||||
folder,
|
||||
messages.push(
|
||||
this.buildMessage(
|
||||
result.parsed,
|
||||
result.uid,
|
||||
folderPath,
|
||||
connectedAccount,
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
const validMessages = messages.filter(isDefined);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Successfully parsed ${validMessages.length} out of ${batchResults.length} messages`,
|
||||
`Parsed ${messages.length}/${results.length} messages from ${folderPath} in ${Date.now() - startTime}ms`,
|
||||
);
|
||||
|
||||
return validMessages;
|
||||
return messages;
|
||||
}
|
||||
|
||||
private createMessageFromParsedMail(
|
||||
private buildMessage(
|
||||
parsed: ParsedMail,
|
||||
uid: string,
|
||||
uid: number,
|
||||
folderPath: string,
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'handle' | 'handleAliases'
|
||||
>,
|
||||
folder: string,
|
||||
): MessageWithParticipants {
|
||||
const participants = this.extractAllParticipants(parsed);
|
||||
const attachments = this.extractAttachments(parsed);
|
||||
const threadId = this.extractThreadId(parsed);
|
||||
const fromAddresses = this.extractAddresses(parsed.from);
|
||||
const senderAddress = fromAddresses[0]?.address ?? '';
|
||||
|
||||
const fromAddresses = this.extractAddresses(
|
||||
parsed.from as AddressObject | undefined,
|
||||
'from',
|
||||
const text = sanitizeString(
|
||||
this.textExtractor.extractTextWithoutReplyQuotations(parsed),
|
||||
);
|
||||
|
||||
const fromHandle = fromAddresses.length > 0 ? fromAddresses[0].address : '';
|
||||
|
||||
const textWithoutReplyQuotations =
|
||||
this.messageTextExtractor.extractTextWithoutReplyQuotations(parsed);
|
||||
|
||||
const direction = computeMessageDirection(fromHandle, connectedAccount);
|
||||
const text = sanitizeString(textWithoutReplyQuotations);
|
||||
const subject = sanitizeString(parsed.subject || '');
|
||||
|
||||
return {
|
||||
externalId: `${folder}:${uid}`,
|
||||
messageThreadExternalId: threadId || parsed.messageId || uid,
|
||||
headerMessageId: parsed.messageId || uid,
|
||||
subject: subject,
|
||||
text: text,
|
||||
receivedAt: parsed.date || new Date(),
|
||||
direction: direction,
|
||||
attachments,
|
||||
participants,
|
||||
externalId: `${folderPath}:${uid}`,
|
||||
messageThreadExternalId: this.extractThreadId(parsed),
|
||||
headerMessageId: parsed.messageId || String(uid),
|
||||
subject: sanitizeString(parsed.subject || ''),
|
||||
text,
|
||||
receivedAt: parsed.date ? new Date(parsed.date) : null,
|
||||
direction: computeMessageDirection(senderAddress, connectedAccount),
|
||||
attachments: this.extractAttachments(parsed),
|
||||
participants: this.extractParticipants(parsed),
|
||||
};
|
||||
}
|
||||
|
||||
private extractThreadId(parsed: ParsedMail): string | null {
|
||||
const { messageId, references, inReplyTo } = parsed;
|
||||
private extractThreadId(parsed: ParsedMail): string {
|
||||
if (Array.isArray(parsed.references) && parsed.references[0]?.trim()) {
|
||||
return parsed.references[0].trim();
|
||||
}
|
||||
|
||||
if (references && Array.isArray(references) && references.length > 0) {
|
||||
const threadRoot = references[0].trim();
|
||||
if (parsed.inReplyTo) {
|
||||
const inReplyTo = String(parsed.inReplyTo).trim();
|
||||
|
||||
if (threadRoot && threadRoot.length > 0) {
|
||||
return threadRoot;
|
||||
if (inReplyTo) {
|
||||
return inReplyTo;
|
||||
}
|
||||
}
|
||||
|
||||
if (inReplyTo) {
|
||||
const cleanInReplyTo =
|
||||
typeof inReplyTo === 'string'
|
||||
? inReplyTo.trim()
|
||||
: String(inReplyTo).trim();
|
||||
|
||||
if (cleanInReplyTo && cleanInReplyTo.length > 0) {
|
||||
return cleanInReplyTo;
|
||||
}
|
||||
if (parsed.messageId?.trim()) {
|
||||
return parsed.messageId.trim();
|
||||
}
|
||||
|
||||
if (messageId) {
|
||||
return messageId.trim();
|
||||
}
|
||||
|
||||
const timestamp = Date.now();
|
||||
const randomSuffix = Math.random().toString(36).substring(2, 11);
|
||||
|
||||
return `thread-${timestamp}-${randomSuffix}`;
|
||||
return `thread-${Date.now()}-${Math.random().toString(36).slice(2, 11)}`;
|
||||
}
|
||||
|
||||
private extractAllParticipants(parsed: ParsedMail) {
|
||||
const fromAddresses = this.extractAddresses(
|
||||
parsed.from as AddressObject | undefined,
|
||||
'from',
|
||||
);
|
||||
const toAddresses = this.extractAddresses(
|
||||
parsed.to as AddressObject | undefined,
|
||||
'to',
|
||||
);
|
||||
const ccAddresses = this.extractAddresses(
|
||||
parsed.cc as AddressObject | undefined,
|
||||
'cc',
|
||||
);
|
||||
const bccAddresses = this.extractAddresses(
|
||||
parsed.bcc as AddressObject | undefined,
|
||||
'bcc',
|
||||
);
|
||||
|
||||
return [
|
||||
...formatAddressObjectAsParticipants(fromAddresses, 'from'),
|
||||
...formatAddressObjectAsParticipants(toAddresses, 'to'),
|
||||
...formatAddressObjectAsParticipants(ccAddresses, 'cc'),
|
||||
...formatAddressObjectAsParticipants(bccAddresses, 'bcc'),
|
||||
private extractParticipants(parsed: ParsedMail) {
|
||||
const addressFields = [
|
||||
{ field: parsed.from, role: 'from' as const },
|
||||
{ field: parsed.to, role: 'to' as const },
|
||||
{ field: parsed.cc, role: 'cc' as const },
|
||||
{ field: parsed.bcc, role: 'bcc' as const },
|
||||
];
|
||||
|
||||
return addressFields.flatMap(({ field, role }) =>
|
||||
formatAddressObjectAsParticipants(this.extractAddresses(field), role),
|
||||
);
|
||||
}
|
||||
|
||||
private extractAddresses(
|
||||
addressObject: AddressObject | undefined,
|
||||
_type: AddressType,
|
||||
address: Address | Address[] | undefined,
|
||||
): EmailAddress[] {
|
||||
const addresses: EmailAddress[] = [];
|
||||
|
||||
if (addressObject && 'value' in addressObject) {
|
||||
for (const addr of addressObject.value) {
|
||||
if (addr.address) {
|
||||
const name = sanitizeString(addr.name);
|
||||
|
||||
addresses.push({
|
||||
address: addr.address,
|
||||
name: name,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (!address) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return addresses;
|
||||
const addresses = Array.isArray(address) ? address : [address];
|
||||
|
||||
const mailboxes = addresses.flatMap((addr) =>
|
||||
addr.address ? [addr] : (addr.group ?? []),
|
||||
);
|
||||
|
||||
return mailboxes
|
||||
.filter((mailbox) => mailbox.address)
|
||||
.map((mailbox) => ({
|
||||
address: mailbox.address,
|
||||
name: sanitizeString(mailbox.name || ''),
|
||||
}));
|
||||
}
|
||||
|
||||
private extractAttachments(parsed: ParsedMail) {
|
||||
|
|
|
|||
|
|
@ -1,111 +0,0 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type ImapFlow } from 'imapflow';
|
||||
|
||||
import { canUseQresync } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/can-use-qresync.util';
|
||||
import { type MailboxState } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/extract-mailbox-state.util';
|
||||
import { type ImapSyncCursor } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-sync-cursor.util';
|
||||
|
||||
import { ImapMessageFetcherService } from './imap-message-fetcher.service';
|
||||
|
||||
type SyncStrategyResult = {
|
||||
messages: { uid: number }[];
|
||||
messageExternalUidsToDelete: number[];
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class ImapIncrementalSyncService {
|
||||
private readonly logger = new Logger(ImapIncrementalSyncService.name);
|
||||
|
||||
constructor(
|
||||
private readonly imapMessageFetcherService: ImapMessageFetcherService,
|
||||
) {}
|
||||
|
||||
public async syncMessages(
|
||||
client: ImapFlow,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
folder: string,
|
||||
): Promise<SyncStrategyResult> {
|
||||
const messageExternalUidsToDelete = await this.checkUidValidityChange(
|
||||
client,
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
folder,
|
||||
);
|
||||
|
||||
const messages = await this.selectSyncStrategy(
|
||||
client,
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
folder,
|
||||
);
|
||||
|
||||
return {
|
||||
messages,
|
||||
messageExternalUidsToDelete,
|
||||
};
|
||||
}
|
||||
|
||||
private async checkUidValidityChange(
|
||||
client: ImapFlow,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
folder: string,
|
||||
): Promise<number[]> {
|
||||
const lastUidValidity = previousCursor?.uidValidity ?? 0;
|
||||
const { uidValidity } = mailboxState;
|
||||
|
||||
if (lastUidValidity !== 0 && lastUidValidity !== uidValidity) {
|
||||
this.logger.log(
|
||||
`UID validity changed from ${lastUidValidity} to ${uidValidity} in ${folder}. Full resync required.`,
|
||||
);
|
||||
|
||||
return this.imapMessageFetcherService.getAllMessageUids(client);
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
private async selectSyncStrategy(
|
||||
client: ImapFlow,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
folder: string,
|
||||
): Promise<{ uid: number }[]> {
|
||||
const lastSeenUid = previousCursor?.highestUid ?? 0;
|
||||
const supportsQresync = client.capabilities.has('QRESYNC');
|
||||
const { maxUid } = mailboxState;
|
||||
|
||||
if (canUseQresync(supportsQresync, previousCursor, mailboxState)) {
|
||||
this.logger.log(`Using QRESYNC for folder ${folder}`);
|
||||
const lastModSeq = BigInt(previousCursor!.modSeq!);
|
||||
|
||||
try {
|
||||
return await this.imapMessageFetcherService.getMessagesWithQresync(
|
||||
client,
|
||||
lastSeenUid,
|
||||
lastModSeq,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`QRESYNC failed for folder ${folder}, falling back to UID search: ${error.message}`,
|
||||
);
|
||||
|
||||
return this.imapMessageFetcherService.getMessagesWithUidSearch(
|
||||
client,
|
||||
lastSeenUid,
|
||||
maxUid,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Using standard UID search for folder ${folder}`);
|
||||
|
||||
return this.imapMessageFetcherService.getMessagesWithUidSearch(
|
||||
client,
|
||||
lastSeenUid,
|
||||
maxUid,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,114 +0,0 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type ImapFlow } from 'imapflow';
|
||||
|
||||
@Injectable()
|
||||
export class ImapMessageFetcherService {
|
||||
private readonly logger = new Logger(ImapMessageFetcherService.name);
|
||||
|
||||
public async getAllMessageUids(client: ImapFlow): Promise<number[]> {
|
||||
try {
|
||||
const uids: number[] = [];
|
||||
|
||||
for await (const msg of client.fetch('1:*', {}, { uid: true })) {
|
||||
if (msg.uid) {
|
||||
uids.push(msg.uid);
|
||||
}
|
||||
}
|
||||
|
||||
return uids;
|
||||
} catch (err) {
|
||||
this.logger.error(`Error getting all message UIDs: ${err.message}`);
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
public async getMessagesWithUidSearch(
|
||||
client: ImapFlow,
|
||||
lastSeenUid: number,
|
||||
maxUid: number,
|
||||
): Promise<{ uid: number }[]> {
|
||||
try {
|
||||
let allUids = await client.search({ all: true }, { uid: true });
|
||||
|
||||
if (!Array.isArray(allUids)) allUids = [];
|
||||
|
||||
const wantedUids = allUids.filter((u) => u > lastSeenUid && u <= maxUid);
|
||||
|
||||
if (wantedUids.length === 0) {
|
||||
this.logger.log(
|
||||
`No new messages. lastSeenUid=${lastSeenUid}, maxUid=${maxUid}`,
|
||||
);
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
const messages: { uid: number }[] = [];
|
||||
|
||||
this.logger.log(
|
||||
`Fetching ${wantedUids.length} messages, UIDs ${wantedUids[0]}..${
|
||||
wantedUids[wantedUids.length - 1]
|
||||
}`,
|
||||
);
|
||||
|
||||
for await (const msg of client.fetch(
|
||||
wantedUids,
|
||||
{},
|
||||
{
|
||||
uid: true,
|
||||
},
|
||||
)) {
|
||||
if (msg.uid) {
|
||||
messages.push({ uid: msg.uid });
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
} catch (err) {
|
||||
this.logger.error(`Error with UID search: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
public async getMessagesWithQresync(
|
||||
client: ImapFlow,
|
||||
lastSeenUid: number,
|
||||
lastModSeq: bigint,
|
||||
): Promise<{ uid: number }[]> {
|
||||
try {
|
||||
const vanished = await client.search(
|
||||
{
|
||||
modseq: lastModSeq + BigInt(1),
|
||||
uid: `${lastSeenUid + 1}:*`,
|
||||
},
|
||||
{ uid: true },
|
||||
);
|
||||
|
||||
const messages: { uid: number }[] = [];
|
||||
|
||||
if (vanished && Array.isArray(vanished) && vanished.length > 0) {
|
||||
this.logger.log(
|
||||
`QRESYNC: Fetching ${vanished.length} new/modified messages`,
|
||||
);
|
||||
|
||||
for await (const msg of client.fetch(
|
||||
vanished,
|
||||
{},
|
||||
{
|
||||
uid: true,
|
||||
},
|
||||
)) {
|
||||
if (msg.uid) {
|
||||
messages.push({ uid: msg.uid });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return messages;
|
||||
} catch (err) {
|
||||
this.logger.error(`Error with QRESYNC: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,16 +1,16 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { parseImapError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-error.util';
|
||||
import { parseImapMessageListFetchError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-message-list-fetch-error.util';
|
||||
import { ImapNetworkErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-network-error-handler.service';
|
||||
|
||||
@Injectable()
|
||||
export class ImapMessageListFetchErrorHandler {
|
||||
constructor(
|
||||
private readonly imapNetworkErrorHandler: ImapNetworkErrorHandler,
|
||||
) {}
|
||||
private readonly logger = new Logger(ImapMessageListFetchErrorHandler.name);
|
||||
|
||||
public handleError(error: Error): void {
|
||||
const networkError = this.imapNetworkErrorHandler.handleError(error);
|
||||
this.logger.error(`IMAP: Error fetching message list: ${error.message}`);
|
||||
|
||||
const networkError = parseImapError(error, { cause: error });
|
||||
|
||||
if (networkError) {
|
||||
throw networkError;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type FetchMessageObject, type ImapFlow } from 'imapflow';
|
||||
import PostalMime, { type Email as ParsedEmail } from 'postal-mime';
|
||||
|
||||
export type MessageParseResult = {
|
||||
uid: number;
|
||||
parsed: ParsedEmail | null;
|
||||
error?: Error;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class ImapMessageParserService {
|
||||
private readonly logger = new Logger(ImapMessageParserService.name);
|
||||
|
||||
async parseMessagesFromFolder(
|
||||
messageUids: number[],
|
||||
folderPath: string,
|
||||
client: ImapFlow,
|
||||
): Promise<MessageParseResult[]> {
|
||||
if (!messageUids.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const lock = await client.getMailboxLock(folderPath);
|
||||
|
||||
try {
|
||||
const uidSet = messageUids.join(',');
|
||||
const startTime = Date.now();
|
||||
|
||||
const messages = await client.fetchAll(
|
||||
uidSet,
|
||||
{ uid: true, source: true },
|
||||
{ uid: true },
|
||||
);
|
||||
|
||||
const fetchedUids = new Set<number>();
|
||||
const results: MessageParseResult[] = [];
|
||||
|
||||
for (const message of messages) {
|
||||
fetchedUids.add(message.uid);
|
||||
results.push(await this.parseMessage(message));
|
||||
}
|
||||
|
||||
for (const uid of messageUids) {
|
||||
if (!fetchedUids.has(uid)) {
|
||||
results.push({ uid, parsed: null });
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Fetched and parsed ${results.length} messages from ${folderPath} in ${Date.now() - startTime}ms`,
|
||||
);
|
||||
|
||||
return results;
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failed to parse messages from folder ${folderPath}: ${error.message}`,
|
||||
);
|
||||
|
||||
return this.createErrorResults(messageUids, error as Error);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
}
|
||||
|
||||
private async parseMessage(
|
||||
message: FetchMessageObject,
|
||||
): Promise<MessageParseResult> {
|
||||
const { uid, source } = message;
|
||||
|
||||
if (!source) {
|
||||
this.logger.debug(`No source content for message UID ${uid}`);
|
||||
|
||||
return { uid, parsed: null };
|
||||
}
|
||||
|
||||
try {
|
||||
const parsed = await PostalMime.parse(source);
|
||||
|
||||
return { uid, parsed };
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to parse message UID ${uid}: ${error.message}`);
|
||||
|
||||
return { uid, parsed: null, error: error as Error };
|
||||
}
|
||||
}
|
||||
|
||||
createErrorResults(
|
||||
messageUids: number[],
|
||||
error: Error,
|
||||
): MessageParseResult[] {
|
||||
return messageUids.map((uid) => ({ uid, parsed: null, error }));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,188 +0,0 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type FetchMessageObject, type ImapFlow } from 'imapflow';
|
||||
import { type ParsedMail, simpleParser } from 'mailparser';
|
||||
|
||||
import { ImapMessagesImportErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-messages-import-error-handler.service';
|
||||
|
||||
export type MessageFetchResult = {
|
||||
uid: number;
|
||||
parsed: ParsedMail | null;
|
||||
processingTimeMs?: number;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class ImapMessageProcessorService {
|
||||
private readonly logger = new Logger(ImapMessageProcessorService.name);
|
||||
|
||||
constructor(
|
||||
private readonly imapMessagesImportErrorHandler: ImapMessagesImportErrorHandler,
|
||||
) {}
|
||||
|
||||
async processMessagesByUidsInFolder(
|
||||
uids: number[],
|
||||
folder: string,
|
||||
client: ImapFlow,
|
||||
): Promise<MessageFetchResult[]> {
|
||||
if (!uids.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
const lock = await client.getMailboxLock(folder);
|
||||
|
||||
try {
|
||||
return await this.fetchMessages(uids, client);
|
||||
} finally {
|
||||
lock.release();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failed to fetch messages from folder ${folder}: ${error.message}`,
|
||||
);
|
||||
|
||||
return uids.map((uid) =>
|
||||
this.createErrorResult(uid, error as Error, Date.now()),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchMessages(
|
||||
uids: number[],
|
||||
client: ImapFlow,
|
||||
): Promise<MessageFetchResult[]> {
|
||||
const startTime = Date.now();
|
||||
const results: MessageFetchResult[] = [];
|
||||
|
||||
try {
|
||||
const uidSet = uids.join(',');
|
||||
|
||||
const fetchResults = client.fetch(
|
||||
uidSet,
|
||||
{
|
||||
uid: true,
|
||||
source: true,
|
||||
},
|
||||
{ uid: true },
|
||||
);
|
||||
|
||||
const processedMessages = new Map<number, MessageFetchResult>();
|
||||
|
||||
for await (const message of fetchResults) {
|
||||
const result = await this.processMessageData(
|
||||
message.uid,
|
||||
message,
|
||||
startTime,
|
||||
);
|
||||
|
||||
processedMessages.set(message.uid, result);
|
||||
}
|
||||
|
||||
for (const uid of uids) {
|
||||
const processedMessage = processedMessages.get(uid);
|
||||
|
||||
if (processedMessage) {
|
||||
results.push(processedMessage);
|
||||
} else {
|
||||
results.push({
|
||||
uid,
|
||||
parsed: null,
|
||||
processingTimeMs: Date.now() - startTime,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Batch fetch failed: ${error.message}`);
|
||||
|
||||
return uids.map((uid) =>
|
||||
this.createErrorResult(uid, error as Error, startTime),
|
||||
);
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private async processMessageData(
|
||||
uid: number,
|
||||
messageData: FetchMessageObject,
|
||||
startTime: number,
|
||||
): Promise<MessageFetchResult> {
|
||||
try {
|
||||
if (!messageData.source) {
|
||||
this.logger.debug(`No source content for message UID ${uid}`);
|
||||
|
||||
return {
|
||||
uid,
|
||||
parsed: null,
|
||||
processingTimeMs: Date.now() - startTime,
|
||||
};
|
||||
}
|
||||
|
||||
const parsed = await this.parseMessageFromBuffer(messageData.source, uid);
|
||||
const processingTime = Date.now() - startTime;
|
||||
|
||||
this.logger.debug(`Processed message UID ${uid} in ${processingTime}ms`);
|
||||
|
||||
return {
|
||||
uid,
|
||||
parsed,
|
||||
processingTimeMs: processingTime,
|
||||
};
|
||||
} catch (error) {
|
||||
return this.createErrorResult(uid, error as Error, startTime);
|
||||
}
|
||||
}
|
||||
|
||||
private async parseMessageFromBuffer(
|
||||
source: Buffer,
|
||||
uid: number,
|
||||
): Promise<ParsedMail> {
|
||||
try {
|
||||
return await simpleParser(source, {
|
||||
skipTextToHtml: true,
|
||||
skipImageLinks: true,
|
||||
skipTextLinks: true,
|
||||
keepCidLinks: false,
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to parse message UID ${uid}: ${error.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
createErrorResult(
|
||||
uid: number,
|
||||
error: Error,
|
||||
startTime: number,
|
||||
): MessageFetchResult {
|
||||
const processingTime = Date.now() - startTime;
|
||||
|
||||
this.logger.error(`Failed to fetch message UID ${uid}: ${error.message}`);
|
||||
|
||||
return {
|
||||
uid,
|
||||
parsed: null,
|
||||
processingTimeMs: processingTime,
|
||||
};
|
||||
}
|
||||
|
||||
createErrorResults(
|
||||
uids: number[],
|
||||
folder: string,
|
||||
error: Error,
|
||||
): MessageFetchResult[] {
|
||||
return uids.map((uid) => {
|
||||
this.logger.error(`Failed to fetch message UID ${uid}: ${error.message}`);
|
||||
|
||||
this.imapMessagesImportErrorHandler.handleError(
|
||||
error,
|
||||
`${folder}:${uid}`,
|
||||
);
|
||||
|
||||
return {
|
||||
uid,
|
||||
parsed: null,
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -3,8 +3,8 @@ import { Injectable } from '@nestjs/common';
|
|||
import DOMPurify from 'dompurify';
|
||||
import { convert } from 'html-to-text';
|
||||
import { JSDOM } from 'jsdom';
|
||||
import { type ParsedMail } from 'mailparser';
|
||||
import * as planer from 'planer';
|
||||
import { type Email as ParsedEmail } from 'postal-mime';
|
||||
|
||||
import { safeDecodeURIComponent } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/safe-decode-uri-component.util';
|
||||
|
||||
|
|
@ -18,7 +18,7 @@ export class ImapMessageTextExtractorService {
|
|||
this.purify = DOMPurify(this.jsdomInstance.window);
|
||||
}
|
||||
|
||||
extractTextWithoutReplyQuotations(parsed: ParsedMail): string {
|
||||
extractTextWithoutReplyQuotations(parsed: ParsedEmail): string {
|
||||
if (parsed.text) {
|
||||
const extractedText = planer.extractFrom(parsed.text, 'text/plain');
|
||||
|
||||
|
|
@ -28,8 +28,6 @@ export class ImapMessageTextExtractorService {
|
|||
if (parsed.html) {
|
||||
const sanitizedHtml = this.purify.sanitize(parsed.html);
|
||||
|
||||
this.jsdomInstance.window.document.documentElement.innerHTML = `<html><body>${sanitizedHtml}</body></html>`;
|
||||
|
||||
const cleanedHtml = planer.extractFromHtml(
|
||||
sanitizedHtml,
|
||||
this.jsdomInstance.window.document,
|
||||
|
|
|
|||
|
|
@ -1,16 +1,18 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { parseImapError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-error.util';
|
||||
import { parseImapMessagesImportError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-messages-import-error.util';
|
||||
import { ImapNetworkErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-network-error-handler.service';
|
||||
|
||||
@Injectable()
|
||||
export class ImapMessagesImportErrorHandler {
|
||||
constructor(
|
||||
private readonly imapNetworkErrorHandler: ImapNetworkErrorHandler,
|
||||
) {}
|
||||
private readonly logger = new Logger(ImapMessagesImportErrorHandler.name);
|
||||
|
||||
public handleError(error: Error, messageExternalId: string): void {
|
||||
const networkError = this.imapNetworkErrorHandler.handleError(error);
|
||||
this.logger.error(
|
||||
`IMAP: Error importing message ${messageExternalId}: ${error.message}`,
|
||||
);
|
||||
|
||||
const networkError = parseImapError(error, { cause: error });
|
||||
|
||||
if (networkError) {
|
||||
throw networkError;
|
||||
|
|
|
|||
|
|
@ -1,11 +0,0 @@
|
|||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { type MessageImportDriverException } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import { parseImapError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-error.util';
|
||||
|
||||
@Injectable()
|
||||
export class ImapNetworkErrorHandler {
|
||||
public handleError(error: Error): MessageImportDriverException | null {
|
||||
return parseImapError(error, { cause: error });
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,129 @@
|
|||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { type ImapFlow } from 'imapflow';
|
||||
|
||||
import {
|
||||
MessageImportDriverException,
|
||||
MessageImportDriverExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import { canUseQresync } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/can-use-qresync.util';
|
||||
import { type MailboxState } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/extract-mailbox-state.util';
|
||||
import { type ImapSyncCursor } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-sync-cursor.util';
|
||||
|
||||
type SyncResult = {
|
||||
messageUids: number[];
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class ImapSyncService {
|
||||
private readonly logger = new Logger(ImapSyncService.name);
|
||||
|
||||
async syncFolder(
|
||||
client: ImapFlow,
|
||||
folderPath: string,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
): Promise<SyncResult> {
|
||||
this.validateUidValidity(previousCursor, mailboxState, folderPath);
|
||||
|
||||
const messageUids = await this.fetchNewMessageUids(
|
||||
client,
|
||||
previousCursor,
|
||||
mailboxState,
|
||||
folderPath,
|
||||
);
|
||||
|
||||
return { messageUids };
|
||||
}
|
||||
|
||||
private validateUidValidity(
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
folderPath: string,
|
||||
): void {
|
||||
const previousUidValidity = previousCursor?.uidValidity ?? 0;
|
||||
const { uidValidity } = mailboxState;
|
||||
|
||||
if (previousUidValidity !== 0 && previousUidValidity !== uidValidity) {
|
||||
this.logger.warn(
|
||||
`UID validity changed from ${previousUidValidity} to ${uidValidity} in ${folderPath}. Full resync required.`,
|
||||
);
|
||||
|
||||
throw new MessageImportDriverException(
|
||||
`IMAP UID validity changed for folder ${folderPath}`,
|
||||
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchNewMessageUids(
|
||||
client: ImapFlow,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
folderPath: string,
|
||||
): Promise<number[]> {
|
||||
const lastSyncedUid = previousCursor?.highestUid ?? 0;
|
||||
const { maxUid } = mailboxState;
|
||||
|
||||
if (canUseQresync(client, previousCursor, mailboxState)) {
|
||||
this.logger.log(`Using QRESYNC for folder ${folderPath}`);
|
||||
|
||||
try {
|
||||
return await this.fetchWithQresync(
|
||||
client,
|
||||
lastSyncedUid,
|
||||
BigInt(previousCursor!.modSeq!),
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`QRESYNC failed for ${folderPath}, falling back to UID range: ${error.message}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`Using UID range fetch for folder ${folderPath}`);
|
||||
|
||||
return this.fetchWithUidRange(client, lastSyncedUid, maxUid);
|
||||
}
|
||||
|
||||
private async fetchWithUidRange(
|
||||
client: ImapFlow,
|
||||
lastSyncedUid: number,
|
||||
highestAvailableUid: number,
|
||||
): Promise<number[]> {
|
||||
if (lastSyncedUid >= highestAvailableUid) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const uidRange = `${lastSyncedUid + 1}:${highestAvailableUid}`;
|
||||
const uids = await client.search({ uid: uidRange }, { uid: true });
|
||||
|
||||
if (!uids || !Array.isArray(uids)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return uids;
|
||||
}
|
||||
|
||||
private async fetchWithQresync(
|
||||
client: ImapFlow,
|
||||
lastSyncedUid: number,
|
||||
lastModSeq: bigint,
|
||||
): Promise<number[]> {
|
||||
const uids = await client.search(
|
||||
{
|
||||
modseq: lastModSeq + BigInt(1),
|
||||
uid: `${lastSyncedUid + 1}:*`,
|
||||
},
|
||||
{ uid: true },
|
||||
);
|
||||
|
||||
if (!uids || !Array.isArray(uids) || !uids.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
this.logger.log(`QRESYNC found ${uids.length} new/modified messages`);
|
||||
|
||||
return uids;
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
import { type ParsedMail } from 'mailparser';
|
||||
import { type Email as ParsedMail } from 'postal-mime';
|
||||
|
||||
import { ImapMessageTextExtractorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-text-extractor.service';
|
||||
|
||||
|
|
@ -97,9 +97,7 @@ Developer Support
|
|||
>>> -John
|
||||
`,
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
html: false,
|
||||
headers: [],
|
||||
};
|
||||
|
||||
const result = service.extractTextWithoutReplyQuotations(parsed);
|
||||
|
|
@ -133,9 +131,7 @@ Developer Support`);
|
|||
|
||||
`,
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
html: false,
|
||||
headers: [],
|
||||
};
|
||||
|
||||
const result = service.extractTextWithoutReplyQuotations(parsed);
|
||||
|
|
@ -156,8 +152,7 @@ Developer Support`);
|
|||
</blockquote>
|
||||
</div>`,
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
headers: [],
|
||||
};
|
||||
|
||||
const result = service.extractTextWithoutReplyQuotations(parsed);
|
||||
|
|
@ -168,9 +163,7 @@ Developer Support`);
|
|||
it('should return empty string when no text or html content', () => {
|
||||
const parsed: ParsedMail = {
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
html: false,
|
||||
headers: [],
|
||||
};
|
||||
|
||||
const result = service.extractTextWithoutReplyQuotations(parsed);
|
||||
|
|
@ -181,8 +174,7 @@ Developer Support`);
|
|||
it('should preserve new lines in html email', () => {
|
||||
const parsed: ParsedMail = {
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
headers: [],
|
||||
html: `<html><head><style>
|
||||
html, body {
|
||||
font-size: 14.5px;
|
||||
|
|
@ -323,8 +315,7 @@ John`);
|
|||
text: 'Plain text content\n\nOn 2023-01-01, user@example.com wrote:\n> Reply',
|
||||
html: '<html><body><p>HTML content</p></body></html>',
|
||||
attachments: [],
|
||||
headers: new Map(),
|
||||
headerLines: [],
|
||||
headers: [],
|
||||
};
|
||||
|
||||
const result = service.extractTextWithoutReplyQuotations(parsed);
|
||||
|
|
|
|||
|
|
@ -1,21 +1,19 @@
|
|||
import { type ImapFlow } from 'imapflow';
|
||||
|
||||
import { type MailboxState } from './extract-mailbox-state.util';
|
||||
import { type ImapSyncCursor } from './parse-sync-cursor.util';
|
||||
|
||||
export const canUseQresync = (
|
||||
supportsQresync: boolean,
|
||||
client: ImapFlow,
|
||||
previousCursor: ImapSyncCursor | null,
|
||||
mailboxState: MailboxState,
|
||||
): boolean => {
|
||||
const lastModSeq = previousCursor?.modSeq
|
||||
? BigInt(previousCursor.modSeq)
|
||||
: undefined;
|
||||
const lastUidValidity = previousCursor?.uidValidity ?? 0;
|
||||
const { uidValidity, highestModSeq } = mailboxState;
|
||||
const supportsQresync = client.capabilities.has('QRESYNC');
|
||||
const hasModSeq = previousCursor?.modSeq !== undefined;
|
||||
const hasServerModSeq = mailboxState.highestModSeq !== undefined;
|
||||
const uidValidityMatches =
|
||||
(previousCursor?.uidValidity ?? 0) === mailboxState.uidValidity ||
|
||||
previousCursor?.uidValidity === 0;
|
||||
|
||||
return (
|
||||
supportsQresync &&
|
||||
lastModSeq !== undefined &&
|
||||
highestModSeq !== undefined &&
|
||||
lastUidValidity === uidValidity
|
||||
);
|
||||
return supportsQresync && hasModSeq && hasServerModSeq && uidValidityMatches;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,5 +1,13 @@
|
|||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { type ImapFlowError } from 'src/modules/messaging/message-import-manager/drivers/imap/types/imap-error.type';
|
||||
|
||||
export const isImapFlowError = (error: Error): error is ImapFlowError => {
|
||||
return error !== undefined && error !== null;
|
||||
return (
|
||||
isDefined(error) &&
|
||||
('serverResponseCode' in error ||
|
||||
'responseText' in error ||
|
||||
'executedCommand' in error ||
|
||||
'authenticationFailed' in error)
|
||||
);
|
||||
};
|
||||
|
|
|
|||
|
|
@ -67,18 +67,26 @@ export const parseImapError = (
|
|||
);
|
||||
}
|
||||
|
||||
if (error.message === 'Command failed' && error.responseText) {
|
||||
if (error.responseText.includes('Resource temporarily unavailable')) {
|
||||
if (error.message === 'Command failed') {
|
||||
if (error.responseText) {
|
||||
if (error.responseText.includes('Resource temporarily unavailable')) {
|
||||
return new MessageImportDriverException(
|
||||
`IMAP temporary error: ${error.responseText}`,
|
||||
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
|
||||
{ cause: options?.cause || error },
|
||||
);
|
||||
}
|
||||
|
||||
return new MessageImportDriverException(
|
||||
`IMAP temporary error: ${error.responseText}`,
|
||||
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
|
||||
`IMAP command failed: ${error.responseText}`,
|
||||
MessageImportDriverExceptionCode.UNKNOWN,
|
||||
{ cause: options?.cause || error },
|
||||
);
|
||||
}
|
||||
|
||||
return new MessageImportDriverException(
|
||||
`IMAP command failed: ${error.responseText}`,
|
||||
MessageImportDriverExceptionCode.UNKNOWN,
|
||||
`IMAP command failed: ${error.message}`,
|
||||
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
|
||||
{ cause: options?.cause || error },
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@ export type ImapSyncCursor = {
|
|||
modSeq?: string;
|
||||
};
|
||||
|
||||
export const parseSyncCursor = (cursor?: string): ImapSyncCursor | null => {
|
||||
export const parseSyncCursor = (
|
||||
cursor: string | null,
|
||||
): ImapSyncCursor | null => {
|
||||
if (!isDefined(cursor)) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
export type GetOneMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
messageExternalIdsToDelete: string[];
|
||||
previousSyncCursor: string;
|
||||
previousSyncCursor: string | null;
|
||||
nextSyncCursor: string;
|
||||
folderId: string | undefined;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -48494,6 +48494,13 @@ __metadata:
|
|||
languageName: node
|
||||
linkType: hard
|
||||
|
||||
"postal-mime@npm:^2.6.1":
|
||||
version: 2.6.1
|
||||
resolution: "postal-mime@npm:2.6.1"
|
||||
checksum: 10c0/60818f46612aef7f276034466bebe1fa9209b815fe7a70facbbe023c8d79dc75315f2d372c44f36e73faefb685b7541b45c1403c5dc9cb8c039bf50fdcd1e56d
|
||||
languageName: node
|
||||
linkType: hard
|
||||
|
||||
"postcss-calc@npm:^8.2.3":
|
||||
version: 8.2.4
|
||||
resolution: "postcss-calc@npm:8.2.4"
|
||||
|
|
@ -56302,6 +56309,7 @@ __metadata:
|
|||
pg: "npm:8.12.0"
|
||||
planer: "npm:1.2.0"
|
||||
pluralize: "npm:8.0.0"
|
||||
postal-mime: "npm:^2.6.1"
|
||||
psl: "npm:^1.9.0"
|
||||
react: "npm:18.3.1"
|
||||
react-dom: "npm:18.3.1"
|
||||
|
|
|
|||
Loading…
Reference in a new issue