update crons and sync trigger

This commit is contained in:
bosiraphael 2026-04-21 15:31:57 +02:00
parent 7bde1bae1f
commit 5212d3487c
19 changed files with 331 additions and 245 deletions

View file

@ -0,0 +1,92 @@
import { describe, expect, it } from 'vitest';
import {
RESEND_SYNC_CRON_PATTERNS,
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS,
RESEND_SYNC_SLOT_TIMEOUT_SECONDS,
} from '@modules/resend/constants/sync-config';
const minutesInOneHour = (pattern: string): number[] => {
const [minuteField, hourField, ...rest] = pattern.split(' ');
expect(hourField).toBe('*');
expect(rest).toEqual(['*', '*', '*']);
const stepMatch = minuteField.match(/^(\d+)-59\/(\d+)$/);
if (stepMatch === null) {
throw new Error(
`Unsupported cron minute field for this test helper: ${minuteField}`,
);
}
const start = Number(stepMatch[1]);
const step = Number(stepMatch[2]);
const minutes: number[] = [];
for (let m = start; m < 60; m += step) {
minutes.push(m);
}
return minutes;
};
describe('RESEND_SYNC_CRON_PATTERNS', () => {
it('runs each cron exactly once per 5-minute window with a unique minute offset', () => {
const offsets = {
EMAILS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.EMAILS).map(
(m) => m % 5,
),
CONTACTS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.CONTACTS).map(
(m) => m % 5,
),
BROADCASTS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.BROADCASTS).map(
(m) => m % 5,
),
TEMPLATES: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.TEMPLATES).map(
(m) => m % 5,
),
};
expect(new Set(offsets.EMAILS)).toEqual(new Set([0]));
expect(new Set(offsets.CONTACTS)).toEqual(new Set([1]));
expect(new Set(offsets.BROADCASTS)).toEqual(new Set([2]));
expect(new Set(offsets.TEMPLATES)).toEqual(new Set([3]));
const firstOffsets = [
offsets.EMAILS[0],
offsets.CONTACTS[0],
offsets.BROADCASTS[0],
offsets.TEMPLATES[0],
];
expect(new Set(firstOffsets).size).toBe(4);
});
it('fires every five minutes for each cron', () => {
for (const pattern of Object.values(RESEND_SYNC_CRON_PATTERNS)) {
const minutes = minutesInOneHour(pattern);
expect(minutes).toHaveLength(12);
for (let i = 1; i < minutes.length; i++) {
expect(minutes[i] - minutes[i - 1]).toBe(5);
}
}
});
});
describe('RESEND_SYNC_SLOT_TIMEOUT_SECONDS', () => {
it('keeps every 1-minute slot strictly under 60 seconds and the trailing 2-minute slot under 120 seconds', () => {
expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS).toBeLessThan(60);
expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS).toBeLessThan(60);
expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS).toBeLessThan(60);
expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES).toBeLessThan(120);
});
it('leaves enough slack for the deadline check to fire before the hard timeout', () => {
expect(RESEND_SYNC_SLOT_DEADLINE_SLACK_MS).toBeGreaterThan(0);
expect(RESEND_SYNC_SLOT_DEADLINE_SLACK_MS).toBeLessThan(
RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS * 1_000,
);
});
});

View file

@ -13,3 +13,19 @@ export const SYNC_LOOKUP_PROGRESS = 0.1;
export const INITIAL_SYNC_MODE_ENV_VAR_NAME = 'INITIAL_SYNC_MODE';
export const INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS = 7 * 24 * 60 * 60 * 1000;
export const RESEND_SYNC_CRON_PATTERNS = {
EMAILS: '0-59/5 * * * *',
CONTACTS: '1-59/5 * * * *',
BROADCASTS: '2-59/5 * * * *',
TEMPLATES: '3-59/5 * * * *',
} as const;
export const RESEND_SYNC_SLOT_TIMEOUT_SECONDS = {
EMAILS: 55,
CONTACTS: 55,
BROADCASTS: 55,
TEMPLATES: 115,
} as const;
export const RESEND_SYNC_SLOT_DEADLINE_SLACK_MS = 5_000;

View file

@ -1,111 +1,14 @@
import { isDefined } from '@utils/is-defined';
import { CoreApiClient } from 'twenty-client-sdk/core';
import { MetadataApiClient } from 'twenty-client-sdk/metadata';
import { defineFrontComponent } from 'twenty-sdk/define';
import {
Command,
enqueueSnackbar,
updateProgress,
} from 'twenty-sdk/front-component';
import { Command, enqueueSnackbar } from 'twenty-sdk/front-component';
import { APPLICATION_UNIVERSAL_IDENTIFIER } from '@constants/universal-identifiers';
import { INITIAL_SYNC_MODE_ENV_VAR_NAME } from '@modules/resend/constants/sync-config';
import {
RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
SYNC_RESEND_DATA_COMMAND_UNIVERSAL_IDENTIFIER,
SYNC_RESEND_DATA_FRONT_COMPONENT_UNIVERSAL_IDENTIFIER,
} from '@modules/resend/constants/universal-identifiers';
import { extractConnection } from '@modules/resend/shared/utils/typed-client';
type SyncSummaryStep = {
name: string;
status: 'ok' | 'failed' | 'skipped';
fetched: number;
created: number;
updated: number;
errorCount: number;
durationMs: number;
};
type SyncFunctionSummary = {
totalDurationMs: number;
steps: SyncSummaryStep[];
};
type CursorRowId = { id: string };
type LogicFunctionDescriptor = {
universalIdentifier: string;
label: string;
};
const SYNC_FUNCTION_DESCRIPTORS: ReadonlyArray<LogicFunctionDescriptor> = [
{
universalIdentifier:
RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
label: 'topics + segments + broadcasts',
},
{
universalIdentifier:
RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
label: 'templates',
},
{
universalIdentifier:
RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
label: 'contacts',
},
{
universalIdentifier:
RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER,
label: 'emails',
},
];
const formatStepCounts = (step: SyncSummaryStep): string => {
const verbs: string[] = [];
if (step.created > 0) verbs.push(`${step.created} created`);
if (step.updated > 0) verbs.push(`${step.updated} updated`);
const counts =
verbs.length > 0 ? verbs.join(', ') : `${step.fetched} fetched`;
return `${step.name.toLowerCase()}: ${counts}`;
};
const formatAggregatedSummary = (
summaries: ReadonlyArray<SyncFunctionSummary>,
): string => {
const allSteps = summaries.flatMap((summary) => summary.steps);
const totalDurationMs = summaries.reduce(
(acc, summary) => acc + summary.totalDurationMs,
0,
);
const seconds = (totalDurationMs / 1000).toFixed(1);
const stepLines = allSteps
.filter((step) => step.status === 'ok')
.map(formatStepCounts);
const prefix = `Initial sync triggered (${seconds}s)`;
if (stepLines.length === 0) {
return prefix;
}
return `${prefix}${stepLines.join('; ')}`;
};
const isSyncFunctionSummary = (
value: unknown,
): value is SyncFunctionSummary =>
typeof value === 'object' &&
value !== null &&
Array.isArray((value as SyncFunctionSummary).steps) &&
typeof (value as SyncFunctionSummary).totalDurationMs === 'number';
const resolveApplicationId = async (
metadataClient: MetadataApiClient,
@ -144,136 +47,15 @@ const flipInitialSyncModeOn = async (
});
};
const resetSyncCursors = async (client: CoreApiClient): Promise<void> => {
const result = await client.query({
resendSyncCursors: {
__args: { first: 50 },
edges: {
node: {
id: true,
},
},
},
});
const connection = extractConnection<CursorRowId>(
result,
'resendSyncCursors',
);
for (const edge of connection.edges) {
if (!isDefined(edge.node?.id)) continue;
await client.mutation({
updateResendSyncCursor: {
__args: {
id: edge.node.id,
data: { cursor: null },
},
id: true,
},
});
}
};
const executeSyncFunction = async (
metadataClient: MetadataApiClient,
logicFunctionId: string,
label: string,
): Promise<SyncFunctionSummary> => {
const { executeOneLogicFunction } = await metadataClient.mutation({
executeOneLogicFunction: {
__args: {
input: {
id: logicFunctionId,
payload: {} as Record<string, unknown>,
},
},
status: true,
error: true,
data: true,
},
});
if (executeOneLogicFunction.status !== 'SUCCESS') {
const rawMessage =
typeof executeOneLogicFunction.error?.errorMessage === 'string'
? executeOneLogicFunction.error.errorMessage
: `${label} sync execution failed`;
throw new Error(`Resend ${label} sync failed:\n${rawMessage}`);
}
if (isSyncFunctionSummary(executeOneLogicFunction.data)) {
return executeOneLogicFunction.data;
}
return { totalDurationMs: 0, steps: [] };
};
const execute = async () => {
await updateProgress(0.05);
const metadataClient = new MetadataApiClient();
const coreApiClient = new CoreApiClient();
const applicationId = await resolveApplicationId(metadataClient);
await updateProgress(0.15);
await flipInitialSyncModeOn(metadataClient, applicationId);
await updateProgress(0.25);
await resetSyncCursors(coreApiClient);
await updateProgress(0.35);
const { findManyLogicFunctions } = await metadataClient.query({
findManyLogicFunctions: {
id: true,
universalIdentifier: true,
},
});
const idByUniversalIdentifier = new Map<string, string>();
for (const logicFunction of findManyLogicFunctions) {
if (
typeof logicFunction.universalIdentifier === 'string' &&
typeof logicFunction.id === 'string'
) {
idByUniversalIdentifier.set(
logicFunction.universalIdentifier,
logicFunction.id,
);
}
}
const resolvedFunctions = SYNC_FUNCTION_DESCRIPTORS.map((descriptor) => {
const id = idByUniversalIdentifier.get(descriptor.universalIdentifier);
if (!isDefined(id)) {
throw new Error(
`Resend sync logic function ${descriptor.label} not found`,
);
}
return { id, label: descriptor.label };
});
await updateProgress(0.45);
const summaries = await Promise.all(
resolvedFunctions.map(({ id, label }) =>
executeSyncFunction(metadataClient, id, label),
),
);
await updateProgress(1);
await enqueueSnackbar({
message: formatAggregatedSummary(summaries),
message: 'Initial sync triggered — it will run in the background.',
variant: 'success',
});
};
@ -284,7 +66,7 @@ export default defineFrontComponent({
universalIdentifier: SYNC_RESEND_DATA_FRONT_COMPONENT_UNIVERSAL_IDENTIFIER,
name: 'Sync Resend Data',
description:
'Enters initial sync mode, resets sync cursors, and triggers all per-entity sync functions in parallel.',
'Flips the application into initial sync mode so the scheduled sync handlers pick up a full resumable sync on their next run.',
isHeadless: true,
component: SyncResendData,
command: {

View file

@ -191,4 +191,72 @@ describe('forEachPage', () => {
forEachPage(listFunction, async () => {}, 'segments'),
).rejects.toThrow(/Resend list\[segments\] cursor stuck at a/);
});
it('stops requesting additional pages once deadlineAtMs is reached', async () => {
const { listFunction, calls } = createMockListFunction([
page(['a', 'b'], true),
page(['c', 'd'], true),
page(['e', 'f'], true),
]);
const onCursorAdvance = vi.fn(async () => {});
const seenPages: string[][] = [];
let now = 1_000;
const dateNowSpy = vi.spyOn(Date, 'now').mockImplementation(() => now);
await forEachPage(
listFunction,
async (items) => {
seenPages.push(items.map((item) => item.id));
now += 1_000;
},
'items',
{
onCursorAdvance,
deadlineAtMs: 2_500,
},
);
expect(seenPages).toEqual([
['a', 'b'],
['c', 'd'],
]);
expect(calls).toHaveLength(2);
expect(onCursorAdvance).toHaveBeenCalledTimes(2);
expect(onCursorAdvance).toHaveBeenLastCalledWith('d');
dateNowSpy.mockRestore();
});
it('still processes the first page even if the deadline is already past at start', async () => {
const { listFunction, calls } = createMockListFunction([
page(['a'], true),
page(['b'], false),
]);
const onCursorAdvance = vi.fn(async () => {});
const seenPages: string[][] = [];
const dateNowSpy = vi.spyOn(Date, 'now').mockReturnValue(10_000);
await forEachPage(
listFunction,
async (items) => {
seenPages.push(items.map((item) => item.id));
},
'items',
{
onCursorAdvance,
deadlineAtMs: 0,
},
);
expect(seenPages).toEqual([['a']]);
expect(calls).toHaveLength(1);
expect(onCursorAdvance).toHaveBeenCalledTimes(1);
expect(onCursorAdvance).toHaveBeenCalledWith('a');
dateNowSpy.mockRestore();
});
});

View file

@ -14,6 +14,7 @@ export type ResendListFunction<T> = (paginationParameters: {
export type ForEachPageOptions = {
startCursor?: string;
onCursorAdvance?: (cursor: string) => Promise<void>;
deadlineAtMs?: number;
};
export type OnPageResult = {
@ -93,6 +94,16 @@ export const forEachPage = async <T extends { id: string }>(
throw new Error(`Resend list[${label}] cursor stuck at ${nextCursor}`);
}
if (
isDefined(options?.deadlineAtMs) &&
Date.now() >= options.deadlineAtMs
) {
console.log(
`[resend] reached deadline for ${label} after page ${pageNumber}; stopping early (cursor will resume next tick)`,
);
break;
}
cursor = nextCursor;
}
};

View file

@ -62,6 +62,15 @@ describe('resendSyncBroadcastsAndDependenciesHandler', () => {
expect(broadcastsArgs[2]).toBe(segmentMap);
expect(broadcastsArgs[3]).toBe(topicMap);
expect(broadcastsArgs[4]).toEqual({ deadlineAtMs: expect.any(Number) });
const topicsArgs = mockSyncTopics.mock.calls[0];
expect(topicsArgs[3]).toEqual({ deadlineAtMs: expect.any(Number) });
const segmentsArgs = mockSyncSegments.mock.calls[0];
expect(segmentsArgs[3]).toEqual({ deadlineAtMs: expect.any(Number) });
expect(summary.steps.map((s) => s.name)).toEqual([
'TOPICS',

View file

@ -23,7 +23,7 @@ describe('resendSyncContactsHandler', () => {
(CoreApiClient as unknown as ReturnType<typeof vi.fn>).mockReset();
});
it('invokes syncContacts and reports an ok CONTACTS step', async () => {
it('invokes syncContacts with a deadline and reports an ok CONTACTS step', async () => {
mockSyncContacts.mockResolvedValue({
result: { fetched: 1, created: 1, updated: 0, errors: [] },
value: undefined,
@ -32,6 +32,10 @@ describe('resendSyncContactsHandler', () => {
const summary = await resendSyncContactsHandler();
expect(mockSyncContacts).toHaveBeenCalledTimes(1);
const args = mockSyncContacts.mock.calls[0];
expect(args[3]).toEqual({ deadlineAtMs: expect.any(Number) });
expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now());
expect(summary.steps).toEqual([
expect.objectContaining({ name: 'CONTACTS', status: 'ok', created: 1 }),
]);

View file

@ -43,7 +43,7 @@ describe('resendSyncEmailsHandler', () => {
}
});
it('runs a full resumable sync in initial mode', async () => {
it('runs a full resumable sync in initial mode and forwards a deadline', async () => {
process.env.INITIAL_SYNC_MODE = 'true';
await resendSyncEmailsHandler();
@ -51,11 +51,12 @@ describe('resendSyncEmailsHandler', () => {
expect(mockSyncEmails).toHaveBeenCalledTimes(1);
const args = mockSyncEmails.mock.calls[0];
// resend, client, syncedAt — no options
expect(args).toHaveLength(3);
expect(args).toHaveLength(4);
expect(args[3]).toEqual({ deadlineAtMs: expect.any(Number) });
expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now());
});
it('runs a 7-day non-resumable sync in intermediate mode', async () => {
it('runs a 7-day non-resumable sync in intermediate mode and forwards a deadline', async () => {
process.env.INITIAL_SYNC_MODE = 'false';
await resendSyncEmailsHandler();
@ -66,6 +67,8 @@ describe('resendSyncEmailsHandler', () => {
expect(args[3]).toEqual({
stopBeforeCreatedAtMs: INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS,
resumable: false,
deadlineAtMs: expect.any(Number),
});
expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now());
});
});

View file

@ -23,7 +23,7 @@ describe('resendSyncTemplatesHandler', () => {
(CoreApiClient as unknown as ReturnType<typeof vi.fn>).mockReset();
});
it('reports an ok TEMPLATES step on success', async () => {
it('reports an ok TEMPLATES step on success and forwards a deadline', async () => {
mockSyncTemplates.mockResolvedValue({
result: { fetched: 5, created: 2, updated: 3, errors: [] },
value: undefined,
@ -31,6 +31,11 @@ describe('resendSyncTemplatesHandler', () => {
const summary = await resendSyncTemplatesHandler();
expect(mockSyncTemplates).toHaveBeenCalledTimes(1);
const args = mockSyncTemplates.mock.calls[0];
expect(args[2]).toEqual({ deadlineAtMs: expect.any(Number) });
expect(args[2].deadlineAtMs).toBeGreaterThan(Date.now());
expect(summary.steps).toEqual([
expect.objectContaining({
name: 'TEMPLATES',

View file

@ -1,6 +1,11 @@
import { CoreApiClient } from 'twenty-client-sdk/core';
import { defineLogicFunction } from 'twenty-sdk/define';
import {
RESEND_SYNC_CRON_PATTERNS,
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS,
RESEND_SYNC_SLOT_TIMEOUT_SECONDS,
} from '@modules/resend/constants/sync-config';
import { RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers';
import { getResendClient } from '@modules/resend/shared/utils/get-resend-client';
import type { StepOutcome } from '@modules/resend/sync/types/step-outcome';
@ -28,12 +33,17 @@ export const resendSyncBroadcastsAndDependenciesHandler =
const coreApiClient = new CoreApiClient();
const syncedAt = new Date().toISOString();
const deadlineAtMs =
Date.now() +
RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS * 1_000 -
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS;
const topics = await runSyncStep('TOPICS', () =>
syncTopics(resendClient, coreApiClient, syncedAt),
syncTopics(resendClient, coreApiClient, syncedAt, { deadlineAtMs }),
);
const segments = await runSyncStep('SEGMENTS', () =>
syncSegments(resendClient, coreApiClient, syncedAt),
syncSegments(resendClient, coreApiClient, syncedAt, { deadlineAtMs }),
);
const broadcasts =
@ -44,6 +54,7 @@ export const resendSyncBroadcastsAndDependenciesHandler =
coreApiClient,
segments.value,
topics.value,
{ deadlineAtMs },
),
)
: skipDueToFailedDependencies('BROADCASTS', { topics, segments });
@ -69,9 +80,9 @@ export default defineLogicFunction({
name: 'resend-sync-broadcasts-and-dependencies',
description:
'Syncs Resend topics, segments, and broadcasts in sequence. Broadcasts depend on the in-memory topic and segment id maps produced by the first two steps. Each step has its own cursor and resumes from the last advance on the next tick if the function timeouts mid-pagination.',
timeoutSeconds: 300,
timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS,
handler: resendSyncBroadcastsAndDependenciesHandler,
cronTriggerSettings: {
pattern: '*/5 * * * *',
pattern: RESEND_SYNC_CRON_PATTERNS.BROADCASTS,
},
});

View file

@ -1,6 +1,11 @@
import { CoreApiClient } from 'twenty-client-sdk/core';
import { defineLogicFunction } from 'twenty-sdk/define';
import {
RESEND_SYNC_CRON_PATTERNS,
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS,
RESEND_SYNC_SLOT_TIMEOUT_SECONDS,
} from '@modules/resend/constants/sync-config';
import { RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers';
import { getResendClient } from '@modules/resend/shared/utils/get-resend-client';
import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome';
@ -22,8 +27,13 @@ export const resendSyncContactsHandler =
const coreApiClient = new CoreApiClient();
const syncedAt = new Date().toISOString();
const deadlineAtMs =
Date.now() +
RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS * 1_000 -
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS;
const contacts = await runSyncStep('CONTACTS', () =>
syncContacts(resendClient, coreApiClient, syncedAt),
syncContacts(resendClient, coreApiClient, syncedAt, { deadlineAtMs }),
);
logStepOutcome(contacts);
@ -38,9 +48,9 @@ export default defineLogicFunction({
name: 'resend-sync-contacts',
description:
'Syncs Resend contacts and links them to existing people by email. Resumes from its own cursor if the function timeouts mid-pagination.',
timeoutSeconds: 300,
timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS,
handler: resendSyncContactsHandler,
cronTriggerSettings: {
pattern: '*/5 * * * *',
pattern: RESEND_SYNC_CRON_PATTERNS.CONTACTS,
},
});

View file

@ -1,7 +1,12 @@
import { CoreApiClient } from 'twenty-client-sdk/core';
import { defineLogicFunction } from 'twenty-sdk/define';
import { INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS } from '@modules/resend/constants/sync-config';
import {
INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS,
RESEND_SYNC_CRON_PATTERNS,
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS,
RESEND_SYNC_SLOT_TIMEOUT_SECONDS,
} from '@modules/resend/constants/sync-config';
import { RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers';
import { getResendClient } from '@modules/resend/shared/utils/get-resend-client';
import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome';
@ -26,12 +31,18 @@ export const resendSyncEmailsHandler =
const initialMode = isInitialSyncModeOn();
const deadlineAtMs =
Date.now() +
RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS * 1_000 -
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS;
const emails = await runSyncStep('EMAILS', () =>
initialMode
? syncEmails(resendClient, coreApiClient, syncedAt)
? syncEmails(resendClient, coreApiClient, syncedAt, { deadlineAtMs })
: syncEmails(resendClient, coreApiClient, syncedAt, {
stopBeforeCreatedAtMs: INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS,
resumable: false,
deadlineAtMs,
}),
);
@ -47,9 +58,9 @@ export default defineLogicFunction({
name: 'resend-sync-emails',
description:
'Syncs Resend emails and links them to existing people by email. In initial sync mode it does a full resumable pass; in intermediate mode it only fetches emails created in the last 7 days and does not persist a cursor.',
timeoutSeconds: 300,
timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS,
handler: resendSyncEmailsHandler,
cronTriggerSettings: {
pattern: '*/5 * * * *',
pattern: RESEND_SYNC_CRON_PATTERNS.EMAILS,
},
});

View file

@ -1,6 +1,11 @@
import { CoreApiClient } from 'twenty-client-sdk/core';
import { defineLogicFunction } from 'twenty-sdk/define';
import {
RESEND_SYNC_CRON_PATTERNS,
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS,
RESEND_SYNC_SLOT_TIMEOUT_SECONDS,
} from '@modules/resend/constants/sync-config';
import { RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers';
import { getResendClient } from '@modules/resend/shared/utils/get-resend-client';
import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome';
@ -21,8 +26,13 @@ export const resendSyncTemplatesHandler =
const resendClient = getResendClient();
const coreApiClient = new CoreApiClient();
const deadlineAtMs =
Date.now() +
RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES * 1_000 -
RESEND_SYNC_SLOT_DEADLINE_SLACK_MS;
const templates = await runSyncStep('TEMPLATES', () =>
syncTemplates(resendClient, coreApiClient),
syncTemplates(resendClient, coreApiClient, { deadlineAtMs }),
);
logStepOutcome(templates);
@ -37,9 +47,9 @@ export default defineLogicFunction({
name: 'resend-sync-templates',
description:
'Syncs Resend templates. Resumes from its own cursor if the function timeouts mid-pagination.',
timeoutSeconds: 300,
timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES,
handler: resendSyncTemplatesHandler,
cronTriggerSettings: {
pattern: '*/15 * * * *',
pattern: RESEND_SYNC_CRON_PATTERNS.TEMPLATES,
},
});

View file

@ -54,11 +54,16 @@ const fetchBroadcastDetailsForPage = async (
return detailByResendId;
};
export type SyncBroadcastsOptions = {
deadlineAtMs?: number;
};
export const syncBroadcasts = async (
resend: Resend,
client: CoreApiClient,
segmentMap: SegmentIdMap,
topicMap: TopicIdMap,
options?: SyncBroadcastsOptions,
): Promise<SyncStepResult> => {
const aggregate: SyncResult = {
fetched: 0,
@ -182,7 +187,13 @@ export const syncBroadcasts = async (
return { ok: pageOutcome.ok, errors: pageOutcome.result.errors };
},
'broadcasts',
{ startCursor: resumeCursor, onCursorAdvance },
{
startCursor: resumeCursor,
onCursorAdvance,
...(isDefined(options?.deadlineAtMs) && {
deadlineAtMs: options.deadlineAtMs,
}),
},
);
},
);

View file

@ -37,10 +37,15 @@ const toContactDto = (
...(isDefined(personId) && { personId }),
});
export type SyncContactsOptions = {
deadlineAtMs?: number;
};
export const syncContacts = async (
resend: Resend,
client: CoreApiClient,
syncedAt: string,
options?: SyncContactsOptions,
): Promise<SyncStepResult> => {
const aggregate: SyncResult = {
fetched: 0,
@ -81,7 +86,13 @@ export const syncContacts = async (
return { ok: pageOutcome.ok, errors: pageOutcome.result.errors };
},
'contacts',
{ startCursor: resumeCursor, onCursorAdvance },
{
startCursor: resumeCursor,
onCursorAdvance,
...(isDefined(options?.deadlineAtMs) && {
deadlineAtMs: options.deadlineAtMs,
}),
},
);
});

View file

@ -20,6 +20,7 @@ import { upsertRecords } from '@modules/resend/sync/utils/upsert-records';
export type SyncEmailsOptions = {
stopBeforeCreatedAtMs?: number;
resumable?: boolean;
deadlineAtMs?: number;
};
export const syncEmails = async (
@ -140,6 +141,9 @@ export const syncEmails = async (
{
startCursor: resumable ? resumeCursor : undefined,
...(resumable && { onCursorAdvance }),
...(isDefined(options?.deadlineAtMs) && {
deadlineAtMs: options.deadlineAtMs,
}),
},
);
},

View file

@ -1,5 +1,6 @@
import type { Resend } from 'resend';
import { CoreApiClient } from 'twenty-client-sdk/core';
import { isDefined } from '@utils/is-defined';
import type { SegmentDto } from '@modules/resend/sync/types/segment.dto';
import type { SyncResult } from '@modules/resend/sync/types/sync-result';
@ -23,10 +24,15 @@ const toSegmentDto = (segment: RawSegment, syncedAt: string): SegmentDto => ({
lastSyncedFromResend: syncedAt,
});
export type SyncSegmentsOptions = {
deadlineAtMs?: number;
};
export const syncSegments = async (
resend: Resend,
client: CoreApiClient,
syncedAt: string,
options?: SyncSegmentsOptions,
): Promise<SyncStepResult<SegmentIdMap>> => {
const aggregate: SyncResult = {
fetched: 0,
@ -63,7 +69,13 @@ export const syncSegments = async (
return { ok: pageOutcome.ok, errors: pageOutcome.result.errors };
},
'segments',
{ startCursor: resumeCursor, onCursorAdvance },
{
startCursor: resumeCursor,
onCursorAdvance,
...(isDefined(options?.deadlineAtMs) && {
deadlineAtMs: options.deadlineAtMs,
}),
},
);
});

View file

@ -52,9 +52,14 @@ const fetchTemplateDetailsForPage = async (
return detailByResendId;
};
export type SyncTemplatesOptions = {
deadlineAtMs?: number;
};
export const syncTemplates = async (
resend: Resend,
client: CoreApiClient,
options?: SyncTemplatesOptions,
): Promise<SyncStepResult> => {
const aggregate: SyncResult = {
fetched: 0,
@ -129,7 +134,13 @@ export const syncTemplates = async (
return { ok: pageOutcome.ok, errors: pageOutcome.result.errors };
},
'templates',
{ startCursor: resumeCursor, onCursorAdvance },
{
startCursor: resumeCursor,
onCursorAdvance,
...(isDefined(options?.deadlineAtMs) && {
deadlineAtMs: options.deadlineAtMs,
}),
},
);
},
);

View file

@ -30,10 +30,15 @@ const toTopicDto = (topic: RawTopic, syncedAt: string): TopicDto => ({
lastSyncedFromResend: syncedAt,
});
export type SyncTopicsOptions = {
deadlineAtMs?: number;
};
export const syncTopics = async (
resend: Resend,
client: CoreApiClient,
syncedAt: string,
_options?: SyncTopicsOptions,
): Promise<SyncStepResult<TopicIdMap>> => {
const aggregate: SyncResult = {
fetched: 0,