diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/__tests__/sync-config.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/__tests__/sync-config.test.ts new file mode 100644 index 00000000000..942a1c95b86 --- /dev/null +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/__tests__/sync-config.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it } from 'vitest'; + +import { + RESEND_SYNC_CRON_PATTERNS, + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS, + RESEND_SYNC_SLOT_TIMEOUT_SECONDS, +} from '@modules/resend/constants/sync-config'; + +const minutesInOneHour = (pattern: string): number[] => { + const [minuteField, hourField, ...rest] = pattern.split(' '); + + expect(hourField).toBe('*'); + expect(rest).toEqual(['*', '*', '*']); + + const stepMatch = minuteField.match(/^(\d+)-59\/(\d+)$/); + + if (stepMatch === null) { + throw new Error( + `Unsupported cron minute field for this test helper: ${minuteField}`, + ); + } + + const start = Number(stepMatch[1]); + const step = Number(stepMatch[2]); + const minutes: number[] = []; + + for (let m = start; m < 60; m += step) { + minutes.push(m); + } + + return minutes; +}; + +describe('RESEND_SYNC_CRON_PATTERNS', () => { + it('runs each cron exactly once per 5-minute window with a unique minute offset', () => { + const offsets = { + EMAILS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.EMAILS).map( + (m) => m % 5, + ), + CONTACTS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.CONTACTS).map( + (m) => m % 5, + ), + BROADCASTS: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.BROADCASTS).map( + (m) => m % 5, + ), + TEMPLATES: minutesInOneHour(RESEND_SYNC_CRON_PATTERNS.TEMPLATES).map( + (m) => m % 5, + ), + }; + + expect(new Set(offsets.EMAILS)).toEqual(new Set([0])); + expect(new Set(offsets.CONTACTS)).toEqual(new Set([1])); + expect(new Set(offsets.BROADCASTS)).toEqual(new Set([2])); + expect(new Set(offsets.TEMPLATES)).toEqual(new Set([3])); + + const firstOffsets = [ + offsets.EMAILS[0], + offsets.CONTACTS[0], + offsets.BROADCASTS[0], + offsets.TEMPLATES[0], + ]; + + expect(new Set(firstOffsets).size).toBe(4); + }); + + it('fires every five minutes for each cron', () => { + for (const pattern of Object.values(RESEND_SYNC_CRON_PATTERNS)) { + const minutes = minutesInOneHour(pattern); + + expect(minutes).toHaveLength(12); + for (let i = 1; i < minutes.length; i++) { + expect(minutes[i] - minutes[i - 1]).toBe(5); + } + } + }); +}); + +describe('RESEND_SYNC_SLOT_TIMEOUT_SECONDS', () => { + it('keeps every 1-minute slot strictly under 60 seconds and the trailing 2-minute slot under 120 seconds', () => { + expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS).toBeLessThan(60); + expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS).toBeLessThan(60); + expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS).toBeLessThan(60); + expect(RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES).toBeLessThan(120); + }); + + it('leaves enough slack for the deadline check to fire before the hard timeout', () => { + expect(RESEND_SYNC_SLOT_DEADLINE_SLACK_MS).toBeGreaterThan(0); + expect(RESEND_SYNC_SLOT_DEADLINE_SLACK_MS).toBeLessThan( + RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS * 1_000, + ); + }); +}); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/sync-config.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/sync-config.ts index 4514f4aa669..4ad48f8e71c 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/sync-config.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/constants/sync-config.ts @@ -13,3 +13,19 @@ export const SYNC_LOOKUP_PROGRESS = 0.1; export const INITIAL_SYNC_MODE_ENV_VAR_NAME = 'INITIAL_SYNC_MODE'; export const INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS = 7 * 24 * 60 * 60 * 1000; + +export const RESEND_SYNC_CRON_PATTERNS = { + EMAILS: '0-59/5 * * * *', + CONTACTS: '1-59/5 * * * *', + BROADCASTS: '2-59/5 * * * *', + TEMPLATES: '3-59/5 * * * *', +} as const; + +export const RESEND_SYNC_SLOT_TIMEOUT_SECONDS = { + EMAILS: 55, + CONTACTS: 55, + BROADCASTS: 55, + TEMPLATES: 115, +} as const; + +export const RESEND_SYNC_SLOT_DEADLINE_SLACK_MS = 5_000; diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/manual-sync/front-components/SyncResendData.front-component.tsx b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/manual-sync/front-components/SyncResendData.front-component.tsx index abd0ff55d49..b0e7fbfb25a 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/manual-sync/front-components/SyncResendData.front-component.tsx +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/manual-sync/front-components/SyncResendData.front-component.tsx @@ -1,111 +1,14 @@ import { isDefined } from '@utils/is-defined'; -import { CoreApiClient } from 'twenty-client-sdk/core'; import { MetadataApiClient } from 'twenty-client-sdk/metadata'; import { defineFrontComponent } from 'twenty-sdk/define'; -import { - Command, - enqueueSnackbar, - updateProgress, -} from 'twenty-sdk/front-component'; +import { Command, enqueueSnackbar } from 'twenty-sdk/front-component'; import { APPLICATION_UNIVERSAL_IDENTIFIER } from '@constants/universal-identifiers'; import { INITIAL_SYNC_MODE_ENV_VAR_NAME } from '@modules/resend/constants/sync-config'; import { - RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, SYNC_RESEND_DATA_COMMAND_UNIVERSAL_IDENTIFIER, SYNC_RESEND_DATA_FRONT_COMPONENT_UNIVERSAL_IDENTIFIER, } from '@modules/resend/constants/universal-identifiers'; -import { extractConnection } from '@modules/resend/shared/utils/typed-client'; - -type SyncSummaryStep = { - name: string; - status: 'ok' | 'failed' | 'skipped'; - fetched: number; - created: number; - updated: number; - errorCount: number; - durationMs: number; -}; - -type SyncFunctionSummary = { - totalDurationMs: number; - steps: SyncSummaryStep[]; -}; - -type CursorRowId = { id: string }; - -type LogicFunctionDescriptor = { - universalIdentifier: string; - label: string; -}; - -const SYNC_FUNCTION_DESCRIPTORS: ReadonlyArray = [ - { - universalIdentifier: - RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - label: 'topics + segments + broadcasts', - }, - { - universalIdentifier: - RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - label: 'templates', - }, - { - universalIdentifier: - RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - label: 'contacts', - }, - { - universalIdentifier: - RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER, - label: 'emails', - }, -]; - -const formatStepCounts = (step: SyncSummaryStep): string => { - const verbs: string[] = []; - - if (step.created > 0) verbs.push(`${step.created} created`); - if (step.updated > 0) verbs.push(`${step.updated} updated`); - - const counts = - verbs.length > 0 ? verbs.join(', ') : `${step.fetched} fetched`; - - return `${step.name.toLowerCase()}: ${counts}`; -}; - -const formatAggregatedSummary = ( - summaries: ReadonlyArray, -): string => { - const allSteps = summaries.flatMap((summary) => summary.steps); - const totalDurationMs = summaries.reduce( - (acc, summary) => acc + summary.totalDurationMs, - 0, - ); - const seconds = (totalDurationMs / 1000).toFixed(1); - const stepLines = allSteps - .filter((step) => step.status === 'ok') - .map(formatStepCounts); - - const prefix = `Initial sync triggered (${seconds}s)`; - - if (stepLines.length === 0) { - return prefix; - } - - return `${prefix} — ${stepLines.join('; ')}`; -}; - -const isSyncFunctionSummary = ( - value: unknown, -): value is SyncFunctionSummary => - typeof value === 'object' && - value !== null && - Array.isArray((value as SyncFunctionSummary).steps) && - typeof (value as SyncFunctionSummary).totalDurationMs === 'number'; const resolveApplicationId = async ( metadataClient: MetadataApiClient, @@ -144,136 +47,15 @@ const flipInitialSyncModeOn = async ( }); }; -const resetSyncCursors = async (client: CoreApiClient): Promise => { - const result = await client.query({ - resendSyncCursors: { - __args: { first: 50 }, - edges: { - node: { - id: true, - }, - }, - }, - }); - - const connection = extractConnection( - result, - 'resendSyncCursors', - ); - - for (const edge of connection.edges) { - if (!isDefined(edge.node?.id)) continue; - - await client.mutation({ - updateResendSyncCursor: { - __args: { - id: edge.node.id, - data: { cursor: null }, - }, - id: true, - }, - }); - } -}; - -const executeSyncFunction = async ( - metadataClient: MetadataApiClient, - logicFunctionId: string, - label: string, -): Promise => { - const { executeOneLogicFunction } = await metadataClient.mutation({ - executeOneLogicFunction: { - __args: { - input: { - id: logicFunctionId, - payload: {} as Record, - }, - }, - status: true, - error: true, - data: true, - }, - }); - - if (executeOneLogicFunction.status !== 'SUCCESS') { - const rawMessage = - typeof executeOneLogicFunction.error?.errorMessage === 'string' - ? executeOneLogicFunction.error.errorMessage - : `${label} sync execution failed`; - - throw new Error(`Resend ${label} sync failed:\n${rawMessage}`); - } - - if (isSyncFunctionSummary(executeOneLogicFunction.data)) { - return executeOneLogicFunction.data; - } - - return { totalDurationMs: 0, steps: [] }; -}; - const execute = async () => { - await updateProgress(0.05); - const metadataClient = new MetadataApiClient(); - const coreApiClient = new CoreApiClient(); const applicationId = await resolveApplicationId(metadataClient); - await updateProgress(0.15); - await flipInitialSyncModeOn(metadataClient, applicationId); - await updateProgress(0.25); - - await resetSyncCursors(coreApiClient); - - await updateProgress(0.35); - - const { findManyLogicFunctions } = await metadataClient.query({ - findManyLogicFunctions: { - id: true, - universalIdentifier: true, - }, - }); - - const idByUniversalIdentifier = new Map(); - - for (const logicFunction of findManyLogicFunctions) { - if ( - typeof logicFunction.universalIdentifier === 'string' && - typeof logicFunction.id === 'string' - ) { - idByUniversalIdentifier.set( - logicFunction.universalIdentifier, - logicFunction.id, - ); - } - } - - const resolvedFunctions = SYNC_FUNCTION_DESCRIPTORS.map((descriptor) => { - const id = idByUniversalIdentifier.get(descriptor.universalIdentifier); - - if (!isDefined(id)) { - throw new Error( - `Resend sync logic function ${descriptor.label} not found`, - ); - } - - return { id, label: descriptor.label }; - }); - - await updateProgress(0.45); - - const summaries = await Promise.all( - resolvedFunctions.map(({ id, label }) => - executeSyncFunction(metadataClient, id, label), - ), - ); - - await updateProgress(1); - await enqueueSnackbar({ - message: formatAggregatedSummary(summaries), + message: 'Initial sync triggered — it will run in the background.', variant: 'success', }); }; @@ -284,7 +66,7 @@ export default defineFrontComponent({ universalIdentifier: SYNC_RESEND_DATA_FRONT_COMPONENT_UNIVERSAL_IDENTIFIER, name: 'Sync Resend Data', description: - 'Enters initial sync mode, resets sync cursors, and triggers all per-entity sync functions in parallel.', + 'Flips the application into initial sync mode so the scheduled sync handlers pick up a full resumable sync on their next run.', isHeadless: true, component: SyncResendData, command: { diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/__tests__/for-each-page.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/__tests__/for-each-page.test.ts index e2bf1b2796c..ad4cafab079 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/__tests__/for-each-page.test.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/__tests__/for-each-page.test.ts @@ -191,4 +191,72 @@ describe('forEachPage', () => { forEachPage(listFunction, async () => {}, 'segments'), ).rejects.toThrow(/Resend list\[segments\] cursor stuck at a/); }); + + it('stops requesting additional pages once deadlineAtMs is reached', async () => { + const { listFunction, calls } = createMockListFunction([ + page(['a', 'b'], true), + page(['c', 'd'], true), + page(['e', 'f'], true), + ]); + + const onCursorAdvance = vi.fn(async () => {}); + const seenPages: string[][] = []; + + let now = 1_000; + const dateNowSpy = vi.spyOn(Date, 'now').mockImplementation(() => now); + + await forEachPage( + listFunction, + async (items) => { + seenPages.push(items.map((item) => item.id)); + now += 1_000; + }, + 'items', + { + onCursorAdvance, + deadlineAtMs: 2_500, + }, + ); + + expect(seenPages).toEqual([ + ['a', 'b'], + ['c', 'd'], + ]); + expect(calls).toHaveLength(2); + expect(onCursorAdvance).toHaveBeenCalledTimes(2); + expect(onCursorAdvance).toHaveBeenLastCalledWith('d'); + + dateNowSpy.mockRestore(); + }); + + it('still processes the first page even if the deadline is already past at start', async () => { + const { listFunction, calls } = createMockListFunction([ + page(['a'], true), + page(['b'], false), + ]); + + const onCursorAdvance = vi.fn(async () => {}); + const seenPages: string[][] = []; + + const dateNowSpy = vi.spyOn(Date, 'now').mockReturnValue(10_000); + + await forEachPage( + listFunction, + async (items) => { + seenPages.push(items.map((item) => item.id)); + }, + 'items', + { + onCursorAdvance, + deadlineAtMs: 0, + }, + ); + + expect(seenPages).toEqual([['a']]); + expect(calls).toHaveLength(1); + expect(onCursorAdvance).toHaveBeenCalledTimes(1); + expect(onCursorAdvance).toHaveBeenCalledWith('a'); + + dateNowSpy.mockRestore(); + }); }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/for-each-page.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/for-each-page.ts index f9b214da81d..5c9f7473de5 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/for-each-page.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/shared/utils/for-each-page.ts @@ -14,6 +14,7 @@ export type ResendListFunction = (paginationParameters: { export type ForEachPageOptions = { startCursor?: string; onCursorAdvance?: (cursor: string) => Promise; + deadlineAtMs?: number; }; export type OnPageResult = { @@ -93,6 +94,16 @@ export const forEachPage = async ( throw new Error(`Resend list[${label}] cursor stuck at ${nextCursor}`); } + if ( + isDefined(options?.deadlineAtMs) && + Date.now() >= options.deadlineAtMs + ) { + console.log( + `[resend] reached deadline for ${label} after page ${pageNumber}; stopping early (cursor will resume next tick)`, + ); + break; + } + cursor = nextCursor; } }; diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-broadcasts-and-dependencies.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-broadcasts-and-dependencies.test.ts index 5589043f104..0904820d771 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-broadcasts-and-dependencies.test.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-broadcasts-and-dependencies.test.ts @@ -62,6 +62,15 @@ describe('resendSyncBroadcastsAndDependenciesHandler', () => { expect(broadcastsArgs[2]).toBe(segmentMap); expect(broadcastsArgs[3]).toBe(topicMap); + expect(broadcastsArgs[4]).toEqual({ deadlineAtMs: expect.any(Number) }); + + const topicsArgs = mockSyncTopics.mock.calls[0]; + + expect(topicsArgs[3]).toEqual({ deadlineAtMs: expect.any(Number) }); + + const segmentsArgs = mockSyncSegments.mock.calls[0]; + + expect(segmentsArgs[3]).toEqual({ deadlineAtMs: expect.any(Number) }); expect(summary.steps.map((s) => s.name)).toEqual([ 'TOPICS', diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-contacts.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-contacts.test.ts index fc5d69fb4bd..b55a3834743 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-contacts.test.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-contacts.test.ts @@ -23,7 +23,7 @@ describe('resendSyncContactsHandler', () => { (CoreApiClient as unknown as ReturnType).mockReset(); }); - it('invokes syncContacts and reports an ok CONTACTS step', async () => { + it('invokes syncContacts with a deadline and reports an ok CONTACTS step', async () => { mockSyncContacts.mockResolvedValue({ result: { fetched: 1, created: 1, updated: 0, errors: [] }, value: undefined, @@ -32,6 +32,10 @@ describe('resendSyncContactsHandler', () => { const summary = await resendSyncContactsHandler(); expect(mockSyncContacts).toHaveBeenCalledTimes(1); + const args = mockSyncContacts.mock.calls[0]; + + expect(args[3]).toEqual({ deadlineAtMs: expect.any(Number) }); + expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now()); expect(summary.steps).toEqual([ expect.objectContaining({ name: 'CONTACTS', status: 'ok', created: 1 }), ]); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-emails.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-emails.test.ts index a02b83f986d..190cae742e2 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-emails.test.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-emails.test.ts @@ -43,7 +43,7 @@ describe('resendSyncEmailsHandler', () => { } }); - it('runs a full resumable sync in initial mode', async () => { + it('runs a full resumable sync in initial mode and forwards a deadline', async () => { process.env.INITIAL_SYNC_MODE = 'true'; await resendSyncEmailsHandler(); @@ -51,11 +51,12 @@ describe('resendSyncEmailsHandler', () => { expect(mockSyncEmails).toHaveBeenCalledTimes(1); const args = mockSyncEmails.mock.calls[0]; - // resend, client, syncedAt — no options - expect(args).toHaveLength(3); + expect(args).toHaveLength(4); + expect(args[3]).toEqual({ deadlineAtMs: expect.any(Number) }); + expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now()); }); - it('runs a 7-day non-resumable sync in intermediate mode', async () => { + it('runs a 7-day non-resumable sync in intermediate mode and forwards a deadline', async () => { process.env.INITIAL_SYNC_MODE = 'false'; await resendSyncEmailsHandler(); @@ -66,6 +67,8 @@ describe('resendSyncEmailsHandler', () => { expect(args[3]).toEqual({ stopBeforeCreatedAtMs: INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS, resumable: false, + deadlineAtMs: expect.any(Number), }); + expect(args[3].deadlineAtMs).toBeGreaterThan(Date.now()); }); }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-templates.test.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-templates.test.ts index 17fd0340342..d9a4ef47dd8 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-templates.test.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/__tests__/resend-sync-templates.test.ts @@ -23,7 +23,7 @@ describe('resendSyncTemplatesHandler', () => { (CoreApiClient as unknown as ReturnType).mockReset(); }); - it('reports an ok TEMPLATES step on success', async () => { + it('reports an ok TEMPLATES step on success and forwards a deadline', async () => { mockSyncTemplates.mockResolvedValue({ result: { fetched: 5, created: 2, updated: 3, errors: [] }, value: undefined, @@ -31,6 +31,11 @@ describe('resendSyncTemplatesHandler', () => { const summary = await resendSyncTemplatesHandler(); + expect(mockSyncTemplates).toHaveBeenCalledTimes(1); + const args = mockSyncTemplates.mock.calls[0]; + + expect(args[2]).toEqual({ deadlineAtMs: expect.any(Number) }); + expect(args[2].deadlineAtMs).toBeGreaterThan(Date.now()); expect(summary.steps).toEqual([ expect.objectContaining({ name: 'TEMPLATES', diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-broadcasts-and-dependencies.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-broadcasts-and-dependencies.ts index 10a125f0a8e..2d52cfd3703 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-broadcasts-and-dependencies.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-broadcasts-and-dependencies.ts @@ -1,6 +1,11 @@ import { CoreApiClient } from 'twenty-client-sdk/core'; import { defineLogicFunction } from 'twenty-sdk/define'; +import { + RESEND_SYNC_CRON_PATTERNS, + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS, + RESEND_SYNC_SLOT_TIMEOUT_SECONDS, +} from '@modules/resend/constants/sync-config'; import { RESEND_SYNC_BROADCASTS_AND_DEPENDENCIES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers'; import { getResendClient } from '@modules/resend/shared/utils/get-resend-client'; import type { StepOutcome } from '@modules/resend/sync/types/step-outcome'; @@ -28,12 +33,17 @@ export const resendSyncBroadcastsAndDependenciesHandler = const coreApiClient = new CoreApiClient(); const syncedAt = new Date().toISOString(); + const deadlineAtMs = + Date.now() + + RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS * 1_000 - + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS; + const topics = await runSyncStep('TOPICS', () => - syncTopics(resendClient, coreApiClient, syncedAt), + syncTopics(resendClient, coreApiClient, syncedAt, { deadlineAtMs }), ); const segments = await runSyncStep('SEGMENTS', () => - syncSegments(resendClient, coreApiClient, syncedAt), + syncSegments(resendClient, coreApiClient, syncedAt, { deadlineAtMs }), ); const broadcasts = @@ -44,6 +54,7 @@ export const resendSyncBroadcastsAndDependenciesHandler = coreApiClient, segments.value, topics.value, + { deadlineAtMs }, ), ) : skipDueToFailedDependencies('BROADCASTS', { topics, segments }); @@ -69,9 +80,9 @@ export default defineLogicFunction({ name: 'resend-sync-broadcasts-and-dependencies', description: 'Syncs Resend topics, segments, and broadcasts in sequence. Broadcasts depend on the in-memory topic and segment id maps produced by the first two steps. Each step has its own cursor and resumes from the last advance on the next tick if the function timeouts mid-pagination.', - timeoutSeconds: 300, + timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.BROADCASTS, handler: resendSyncBroadcastsAndDependenciesHandler, cronTriggerSettings: { - pattern: '*/5 * * * *', + pattern: RESEND_SYNC_CRON_PATTERNS.BROADCASTS, }, }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-contacts.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-contacts.ts index ff8686589a0..30ccaea6376 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-contacts.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-contacts.ts @@ -1,6 +1,11 @@ import { CoreApiClient } from 'twenty-client-sdk/core'; import { defineLogicFunction } from 'twenty-sdk/define'; +import { + RESEND_SYNC_CRON_PATTERNS, + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS, + RESEND_SYNC_SLOT_TIMEOUT_SECONDS, +} from '@modules/resend/constants/sync-config'; import { RESEND_SYNC_CONTACTS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers'; import { getResendClient } from '@modules/resend/shared/utils/get-resend-client'; import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome'; @@ -22,8 +27,13 @@ export const resendSyncContactsHandler = const coreApiClient = new CoreApiClient(); const syncedAt = new Date().toISOString(); + const deadlineAtMs = + Date.now() + + RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS * 1_000 - + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS; + const contacts = await runSyncStep('CONTACTS', () => - syncContacts(resendClient, coreApiClient, syncedAt), + syncContacts(resendClient, coreApiClient, syncedAt, { deadlineAtMs }), ); logStepOutcome(contacts); @@ -38,9 +48,9 @@ export default defineLogicFunction({ name: 'resend-sync-contacts', description: 'Syncs Resend contacts and links them to existing people by email. Resumes from its own cursor if the function timeouts mid-pagination.', - timeoutSeconds: 300, + timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.CONTACTS, handler: resendSyncContactsHandler, cronTriggerSettings: { - pattern: '*/5 * * * *', + pattern: RESEND_SYNC_CRON_PATTERNS.CONTACTS, }, }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-emails.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-emails.ts index fa6c64f5321..848989a3a5f 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-emails.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-emails.ts @@ -1,7 +1,12 @@ import { CoreApiClient } from 'twenty-client-sdk/core'; import { defineLogicFunction } from 'twenty-sdk/define'; -import { INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS } from '@modules/resend/constants/sync-config'; +import { + INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS, + RESEND_SYNC_CRON_PATTERNS, + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS, + RESEND_SYNC_SLOT_TIMEOUT_SECONDS, +} from '@modules/resend/constants/sync-config'; import { RESEND_SYNC_EMAILS_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers'; import { getResendClient } from '@modules/resend/shared/utils/get-resend-client'; import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome'; @@ -26,12 +31,18 @@ export const resendSyncEmailsHandler = const initialMode = isInitialSyncModeOn(); + const deadlineAtMs = + Date.now() + + RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS * 1_000 - + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS; + const emails = await runSyncStep('EMAILS', () => initialMode - ? syncEmails(resendClient, coreApiClient, syncedAt) + ? syncEmails(resendClient, coreApiClient, syncedAt, { deadlineAtMs }) : syncEmails(resendClient, coreApiClient, syncedAt, { stopBeforeCreatedAtMs: INTERMEDIATE_SYNC_EMAILS_MAX_AGE_MS, resumable: false, + deadlineAtMs, }), ); @@ -47,9 +58,9 @@ export default defineLogicFunction({ name: 'resend-sync-emails', description: 'Syncs Resend emails and links them to existing people by email. In initial sync mode it does a full resumable pass; in intermediate mode it only fetches emails created in the last 7 days and does not persist a cursor.', - timeoutSeconds: 300, + timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.EMAILS, handler: resendSyncEmailsHandler, cronTriggerSettings: { - pattern: '*/5 * * * *', + pattern: RESEND_SYNC_CRON_PATTERNS.EMAILS, }, }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-templates.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-templates.ts index 5eb1f4406dd..46eb6f2ef0c 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-templates.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/logic-functions/resend-sync-templates.ts @@ -1,6 +1,11 @@ import { CoreApiClient } from 'twenty-client-sdk/core'; import { defineLogicFunction } from 'twenty-sdk/define'; +import { + RESEND_SYNC_CRON_PATTERNS, + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS, + RESEND_SYNC_SLOT_TIMEOUT_SECONDS, +} from '@modules/resend/constants/sync-config'; import { RESEND_SYNC_TEMPLATES_LOGIC_FUNCTION_UNIVERSAL_IDENTIFIER } from '@modules/resend/constants/universal-identifiers'; import { getResendClient } from '@modules/resend/shared/utils/get-resend-client'; import { logStepOutcome } from '@modules/resend/sync/utils/log-step-outcome'; @@ -21,8 +26,13 @@ export const resendSyncTemplatesHandler = const resendClient = getResendClient(); const coreApiClient = new CoreApiClient(); + const deadlineAtMs = + Date.now() + + RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES * 1_000 - + RESEND_SYNC_SLOT_DEADLINE_SLACK_MS; + const templates = await runSyncStep('TEMPLATES', () => - syncTemplates(resendClient, coreApiClient), + syncTemplates(resendClient, coreApiClient, { deadlineAtMs }), ); logStepOutcome(templates); @@ -37,9 +47,9 @@ export default defineLogicFunction({ name: 'resend-sync-templates', description: 'Syncs Resend templates. Resumes from its own cursor if the function timeouts mid-pagination.', - timeoutSeconds: 300, + timeoutSeconds: RESEND_SYNC_SLOT_TIMEOUT_SECONDS.TEMPLATES, handler: resendSyncTemplatesHandler, cronTriggerSettings: { - pattern: '*/15 * * * *', + pattern: RESEND_SYNC_CRON_PATTERNS.TEMPLATES, }, }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-broadcasts.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-broadcasts.ts index 47988270926..3768bbd2ec2 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-broadcasts.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-broadcasts.ts @@ -54,11 +54,16 @@ const fetchBroadcastDetailsForPage = async ( return detailByResendId; }; +export type SyncBroadcastsOptions = { + deadlineAtMs?: number; +}; + export const syncBroadcasts = async ( resend: Resend, client: CoreApiClient, segmentMap: SegmentIdMap, topicMap: TopicIdMap, + options?: SyncBroadcastsOptions, ): Promise => { const aggregate: SyncResult = { fetched: 0, @@ -182,7 +187,13 @@ export const syncBroadcasts = async ( return { ok: pageOutcome.ok, errors: pageOutcome.result.errors }; }, 'broadcasts', - { startCursor: resumeCursor, onCursorAdvance }, + { + startCursor: resumeCursor, + onCursorAdvance, + ...(isDefined(options?.deadlineAtMs) && { + deadlineAtMs: options.deadlineAtMs, + }), + }, ); }, ); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-contacts.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-contacts.ts index 8d6044a0ff9..6fa21a3ea46 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-contacts.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-contacts.ts @@ -37,10 +37,15 @@ const toContactDto = ( ...(isDefined(personId) && { personId }), }); +export type SyncContactsOptions = { + deadlineAtMs?: number; +}; + export const syncContacts = async ( resend: Resend, client: CoreApiClient, syncedAt: string, + options?: SyncContactsOptions, ): Promise => { const aggregate: SyncResult = { fetched: 0, @@ -81,7 +86,13 @@ export const syncContacts = async ( return { ok: pageOutcome.ok, errors: pageOutcome.result.errors }; }, 'contacts', - { startCursor: resumeCursor, onCursorAdvance }, + { + startCursor: resumeCursor, + onCursorAdvance, + ...(isDefined(options?.deadlineAtMs) && { + deadlineAtMs: options.deadlineAtMs, + }), + }, ); }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-emails.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-emails.ts index 0ec19ce3111..3cce716c252 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-emails.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-emails.ts @@ -20,6 +20,7 @@ import { upsertRecords } from '@modules/resend/sync/utils/upsert-records'; export type SyncEmailsOptions = { stopBeforeCreatedAtMs?: number; resumable?: boolean; + deadlineAtMs?: number; }; export const syncEmails = async ( @@ -140,6 +141,9 @@ export const syncEmails = async ( { startCursor: resumable ? resumeCursor : undefined, ...(resumable && { onCursorAdvance }), + ...(isDefined(options?.deadlineAtMs) && { + deadlineAtMs: options.deadlineAtMs, + }), }, ); }, diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-segments.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-segments.ts index c4907453547..414a4d45e16 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-segments.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-segments.ts @@ -1,5 +1,6 @@ import type { Resend } from 'resend'; import { CoreApiClient } from 'twenty-client-sdk/core'; +import { isDefined } from '@utils/is-defined'; import type { SegmentDto } from '@modules/resend/sync/types/segment.dto'; import type { SyncResult } from '@modules/resend/sync/types/sync-result'; @@ -23,10 +24,15 @@ const toSegmentDto = (segment: RawSegment, syncedAt: string): SegmentDto => ({ lastSyncedFromResend: syncedAt, }); +export type SyncSegmentsOptions = { + deadlineAtMs?: number; +}; + export const syncSegments = async ( resend: Resend, client: CoreApiClient, syncedAt: string, + options?: SyncSegmentsOptions, ): Promise> => { const aggregate: SyncResult = { fetched: 0, @@ -63,7 +69,13 @@ export const syncSegments = async ( return { ok: pageOutcome.ok, errors: pageOutcome.result.errors }; }, 'segments', - { startCursor: resumeCursor, onCursorAdvance }, + { + startCursor: resumeCursor, + onCursorAdvance, + ...(isDefined(options?.deadlineAtMs) && { + deadlineAtMs: options.deadlineAtMs, + }), + }, ); }); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-templates.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-templates.ts index a065d77cb06..55f2ba44d0d 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-templates.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-templates.ts @@ -52,9 +52,14 @@ const fetchTemplateDetailsForPage = async ( return detailByResendId; }; +export type SyncTemplatesOptions = { + deadlineAtMs?: number; +}; + export const syncTemplates = async ( resend: Resend, client: CoreApiClient, + options?: SyncTemplatesOptions, ): Promise => { const aggregate: SyncResult = { fetched: 0, @@ -129,7 +134,13 @@ export const syncTemplates = async ( return { ok: pageOutcome.ok, errors: pageOutcome.result.errors }; }, 'templates', - { startCursor: resumeCursor, onCursorAdvance }, + { + startCursor: resumeCursor, + onCursorAdvance, + ...(isDefined(options?.deadlineAtMs) && { + deadlineAtMs: options.deadlineAtMs, + }), + }, ); }, ); diff --git a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-topics.ts b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-topics.ts index 6a06a3e76b7..5c56f7644c5 100644 --- a/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-topics.ts +++ b/packages/twenty-apps/internal/twenty-for-twenty/src/modules/resend/sync/utils/sync-topics.ts @@ -30,10 +30,15 @@ const toTopicDto = (topic: RawTopic, syncedAt: string): TopicDto => ({ lastSyncedFromResend: syncedAt, }); +export type SyncTopicsOptions = { + deadlineAtMs?: number; +}; + export const syncTopics = async ( resend: Resend, client: CoreApiClient, syncedAt: string, + _options?: SyncTopicsOptions, ): Promise> => { const aggregate: SyncResult = { fetched: 0,