diff --git a/packages/twenty-server/src/database/commands/data-seed-dev-workspace.command.ts b/packages/twenty-server/src/database/commands/data-seed-dev-workspace.command.ts index 7106fbb72a2..9c505946bf8 100644 --- a/packages/twenty-server/src/database/commands/data-seed-dev-workspace.command.ts +++ b/packages/twenty-server/src/database/commands/data-seed-dev-workspace.command.ts @@ -4,7 +4,10 @@ import { Command, CommandRunner, Option } from 'nest-commander'; import { SEED_APPLE_WORKSPACE_ID, + SEED_EMPTY_WORKSPACE_3_ID, + SEED_EMPTY_WORKSPACE_4_ID, SEED_YCOMBINATOR_WORKSPACE_ID, + SeededEmptyWorkspacesIds, SeededWorkspacesIds, } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant'; import { DevSeederService } from 'src/engine/workspace-manager/dev-seeder/services/dev-seeder.service'; @@ -42,12 +45,21 @@ export class DataSeedWorkspaceCommand extends CommandRunner { ? [SEED_APPLE_WORKSPACE_ID] : [SEED_APPLE_WORKSPACE_ID, SEED_YCOMBINATOR_WORKSPACE_ID]; + const emptyWorkspaceIds: SeededEmptyWorkspacesIds[] = [ + SEED_EMPTY_WORKSPACE_3_ID, + SEED_EMPTY_WORKSPACE_4_ID, + ]; + try { for (const workspaceId of workspaceIds) { await this.devSeederService.seedDev(workspaceId, { light: options.light, }); } + + for (const workspaceId of emptyWorkspaceIds) { + await this.devSeederService.seedEmptyWorkspace(workspaceId); + } } catch (error) { this.logger.error(error); this.logger.error(error.stack); diff --git a/packages/twenty-server/src/database/commands/run-instance-commands.command.ts b/packages/twenty-server/src/database/commands/run-instance-commands.command.ts index 8b3d533b5c1..cb241d64908 100644 --- a/packages/twenty-server/src/database/commands/run-instance-commands.command.ts +++ b/packages/twenty-server/src/database/commands/run-instance-commands.command.ts @@ -62,6 +62,9 @@ export class RunInstanceCommandsCommand extends CommandRunner { await this.checkWorkspaceVersionSafety(options); await this.runLegacyPendingTypeOrmMigrations(); + const activeOrSuspendedWorkspaceIds = + await this.workspaceVersionService.getActiveOrSuspendedWorkspaceIds(); + for (const { command, name, @@ -79,9 +82,6 @@ export class RunInstanceCommandsCommand extends CommandRunner { } if (options.includeSlow) { - const hasWorkspaces = - await this.workspaceVersionService.hasActiveOrSuspendedWorkspaces(); - for (const { command, name, @@ -90,7 +90,7 @@ export class RunInstanceCommandsCommand extends CommandRunner { await this.instanceUpgradeService.runSlowInstanceCommand({ command, name, - skipDataMigration: !hasWorkspaces, + skipDataMigration: activeOrSuspendedWorkspaceIds.length === 0, }); if (result.status === 'failed') { @@ -119,10 +119,10 @@ export class RunInstanceCommandsCommand extends CommandRunner { return; } - const activeWorkspaceIds = + const activeOrSuspendedWorkspaceIds = await this.workspaceVersionService.getActiveOrSuspendedWorkspaceIds(); - if (activeWorkspaceIds.length === 0) { + if (activeOrSuspendedWorkspaceIds.length === 0) { return; } @@ -141,7 +141,7 @@ export class RunInstanceCommandsCommand extends CommandRunner { const allAtPreviousVersion = await this.upgradeMigrationService.areAllWorkspacesAtCommand({ commandName: lastWorkspaceCommand.name, - workspaceIds: activeWorkspaceIds, + workspaceIds: activeOrSuspendedWorkspaceIds, }); if (!allAtPreviousVersion) { diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/upgrade-sequence-reader.service.spec.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/upgrade-sequence-reader.service.spec.ts new file mode 100644 index 00000000000..08ab58770bf --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/upgrade-sequence-reader.service.spec.ts @@ -0,0 +1,299 @@ +import 'reflect-metadata'; + +import { Test } from '@nestjs/testing'; +import { DiscoveryService } from '@nestjs/core'; + +import { + type UpgradeStep, + UpgradeSequenceReaderService, +} from 'src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service'; +import { UpgradeCommandRegistryService } from 'src/engine/core-modules/upgrade/services/upgrade-command-registry.service'; +import { RegisteredWorkspaceCommand } from 'src/engine/core-modules/upgrade/decorators/registered-workspace-command.decorator'; +import { TWENTY_CURRENT_VERSION } from 'src/engine/core-modules/upgrade/constants/twenty-current-version.constant'; + +const VERSION = TWENTY_CURRENT_VERSION; + +@RegisteredWorkspaceCommand(VERSION, 1770000000000) +class MinimalWorkspaceCommand { + async runOnWorkspace(): Promise {} +} + +const buildProviderWrapper = (instance: object) => ({ + instance, + metatype: instance.constructor, +}); + +const buildServiceWithMockedSequence = async ( + mockSequence: UpgradeStep[], +): Promise => { + const module = await Test.createTestingModule({ + providers: [ + UpgradeSequenceReaderService, + UpgradeCommandRegistryService, + { + provide: DiscoveryService, + useValue: { + getProviders: () => + [new MinimalWorkspaceCommand()].map(buildProviderWrapper), + }, + }, + ], + }).compile(); + + const registryService = module.get(UpgradeCommandRegistryService); + + registryService.onModuleInit(); + + const service = module.get(UpgradeSequenceReaderService); + + jest.spyOn(service, 'getUpgradeSequence').mockReturnValue(mockSequence); + + return service; +}; + +const noopAsync = async () => {}; + +const makeStep = (kind: UpgradeStep['kind'], name: string): UpgradeStep => { + const command = + kind === 'workspace' + ? { runOnWorkspace: noopAsync } + : kind === 'slow-instance' + ? { up: noopAsync, down: noopAsync, runDataMigration: noopAsync } + : { up: noopAsync, down: noopAsync }; + + return { + kind, + name, + command, + version: VERSION, + timestamp: 0, + } as unknown as UpgradeStep; +}; + +const makeFastInstance = (name: string) => makeStep('fast-instance', name); +const makeWorkspace = (name: string) => makeStep('workspace', name); + +describe('UpgradeSequenceReaderService', () => { + describe('getInitialCursorForNewWorkspace', () => { + it('should return last workspace command of segment following completed instance command', async () => { + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeWorkspace('Wc2'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Wc2', status: 'completed' }); + }); + + it('should return the instance command itself when next step is another instance command', async () => { + const sequence = [ + makeWorkspace('Wc-1'), + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Ic0', status: 'completed' }); + }); + + it('should return last workspace command when all instance commands in batch are completed', async () => { + const sequence = [ + makeWorkspace('Wc-1'), + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic1', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Wc1', status: 'completed' }); + }); + + it('should stop at next instance command boundary', async () => { + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic1'), + makeWorkspace('Wc2'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Wc1', status: 'completed' }); + }); + + it('should return the instance command itself when at end of sequence', async () => { + const sequence = [makeWorkspace('Wc0'), makeFastInstance('Ic0')]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Ic0', status: 'completed' }); + }); + + it('should return the instance command itself when no workspace command exists before it', async () => { + const sequence = [ + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc0'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Ic0', status: 'completed' }); + }); + + it('should return final segment when last instance command is completed', async () => { + const sequence = [ + makeWorkspace('Wc0'), + makeFastInstance('Ic0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic1'), + makeWorkspace('Wc2'), + makeWorkspace('Wc3'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic1', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Wc3', status: 'completed' }); + }); + + it('should handle single workspace command in segment', async () => { + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc1'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Wc0', status: 'completed' }); + }); + + it('should return the instance command itself when sequence ends with instance commands batch', async () => { + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic1'), + makeFastInstance('Ic2'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic2', + status: 'completed', + }); + + expect(result).toEqual({ name: 'Ic2', status: 'completed' }); + }); + + it('should return the failed instance command when IC failed — not skip forward to WC segment', async () => { + // Sequence: Ic0 → Ic1 → Wc0 → Wc1 + // Ic1 failed → cursor stays at Ic1:failed (does NOT skip to Wc1) + const sequence = [ + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic1', + status: 'failed', + }); + + expect(result).toEqual({ name: 'Ic1', status: 'failed' }); + }); + + it('should return the failed instance command even when next step is a workspace command', async () => { + // Sequence: Ic0 → Wc0 → Wc1 + // Ic0 failed → cursor stays at Ic0:failed + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic0', + status: 'failed', + }); + + expect(result).toEqual({ name: 'Ic0', status: 'failed' }); + }); + + it('should return the failed mid-segment instance command', async () => { + // Sequence: Ic0 → Ic1 → Ic2 → Wc0 + // Ic1 failed (Ic0 completed but Ic1 is the last attempted) → cursor at Ic1:failed + const sequence = [ + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeFastInstance('Ic2'), + makeWorkspace('Wc0'), + ]; + + const service = await buildServiceWithMockedSequence(sequence); + + const result = service.getInitialCursorForNewWorkspace({ + name: 'Ic1', + status: 'failed', + }); + + expect(result).toEqual({ name: 'Ic1', status: 'failed' }); + }); + }); +}); diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/instance-command-runner.service.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/instance-command-runner.service.ts index 65e25113f35..dd56ff26ce0 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/instance-command-runner.service.ts +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/instance-command-runner.service.ts @@ -7,6 +7,7 @@ import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twent import { type FastInstanceCommand } from 'src/engine/core-modules/upgrade/interfaces/fast-instance-command.interface'; import { type SlowInstanceCommand } from 'src/engine/core-modules/upgrade/interfaces/slow-instance-command.interface'; import { UpgradeMigrationService } from 'src/engine/core-modules/upgrade/services/upgrade-migration.service'; +import { WorkspaceVersionService } from 'src/engine/workspace-manager/workspace-version/services/workspace-version.service'; type RunSingleMigrationResult = | { status: 'success' } @@ -22,6 +23,7 @@ export class InstanceCommandRunnerService { private readonly dataSource: DataSource, private readonly twentyConfigService: TwentyConfigService, private readonly upgradeMigrationService: UpgradeMigrationService, + private readonly workspaceVersionService: WorkspaceVersionService, ) {} async runFastInstanceCommand({ @@ -54,9 +56,16 @@ export class InstanceCommandRunnerService { await command.up(queryRunner); - await this.upgradeMigrationService.markAsCompleted({ + const workspaceIds = + await this.workspaceVersionService.getActiveOrSuspendedWorkspaceIds({ + queryRunner, + }); + + await this.upgradeMigrationService.recordUpgradeMigration({ name, - workspaceId: null, + workspaceIds, + isInstance: true, + status: 'completed', executedByVersion, queryRunner, }); @@ -67,9 +76,14 @@ export class InstanceCommandRunnerService { await queryRunner.rollbackTransaction(); } - await this.upgradeMigrationService.markAsFailed({ + const workspaceIds = + await this.workspaceVersionService.getActiveOrSuspendedWorkspaceIds(); + + await this.upgradeMigrationService.recordUpgradeMigration({ name, - workspaceId: null, + workspaceIds, + isInstance: true, + status: 'failed', executedByVersion, error, }); @@ -117,9 +131,14 @@ export class InstanceCommandRunnerService { try { await command.runDataMigration(this.dataSource); } catch (error) { - await this.upgradeMigrationService.markAsFailed({ + const workspaceIds = + await this.workspaceVersionService.getActiveOrSuspendedWorkspaceIds(); + + await this.upgradeMigrationService.recordUpgradeMigration({ name, - workspaceId: null, + workspaceIds, + isInstance: true, + status: 'failed', executedByVersion, error, }); @@ -133,6 +152,9 @@ export class InstanceCommandRunnerService { } } - return this.runFastInstanceCommand({ command, name }); + return this.runFastInstanceCommand({ + command, + name, + }); } } diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-migration.service.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-migration.service.ts index daa711326dd..fe6f4ba0924 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-migration.service.ts +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-migration.service.ts @@ -10,6 +10,12 @@ import { } from 'src/engine/core-modules/upgrade/upgrade-migration.entity'; import { formatUpgradeErrorForStorage } from 'src/engine/core-modules/upgrade/utils/format-upgrade-error-for-storage.util'; +export type WorkspaceCursor = { + name: string; + status: UpgradeMigrationStatus; + isInitial: boolean; +}; + @Injectable() export class UpgradeMigrationService { constructor( @@ -35,73 +41,96 @@ export class UpgradeMigrationService { return isDefined(latestAttempt) && latestAttempt.status === 'completed'; } - async markAsCompleted({ - name, - workspaceId, - executedByVersion, - queryRunner, - }: { - name: string; - workspaceId: string | null; - executedByVersion: string; - queryRunner?: QueryRunner; - }): Promise { - const repository = queryRunner - ? queryRunner.manager.getRepository(UpgradeMigrationEntity) + async recordUpgradeMigration( + params: + | { + name: string; + workspaceIds: string[]; + isInstance: boolean; + status: 'completed'; + executedByVersion: string; + queryRunner?: QueryRunner; + } + | { + name: string; + workspaceIds: string[]; + isInstance: boolean; + status: 'failed'; + executedByVersion: string; + error: unknown; + queryRunner?: QueryRunner; + }, + ): Promise { + const { name, workspaceIds, isInstance, status, executedByVersion } = + params; + + const repository = params.queryRunner + ? params.queryRunner.manager.getRepository(UpgradeMigrationEntity) : this.upgradeMigrationRepository; - const previousAttempts = await repository.count({ - where: { - name, - workspaceId: workspaceId === null ? IsNull() : workspaceId, - }, - }); - await repository.save({ - name, - status: 'completed', - attempt: previousAttempts + 1, - executedByVersion, - workspaceId, - }); + const errorMessage = + params.status === 'failed' + ? formatUpgradeErrorForStorage(params.error) + : null; + + if (isInstance) { + const previousAttempts = await repository.count({ + where: { name, workspaceId: IsNull() }, + }); + + await repository.save([ + { + name, + status, + attempt: previousAttempts + 1, + executedByVersion, + workspaceId: null, + errorMessage, + }, + ...workspaceIds.map((workspaceId) => ({ + name, + status, + attempt: previousAttempts + 1, + executedByVersion, + workspaceId, + errorMessage, + })), + ]); + + return; + } + + const rows = []; + + for (const workspaceId of workspaceIds) { + const previousAttempts = await repository.count({ + where: { name, workspaceId }, + }); + + rows.push({ + name, + status, + attempt: previousAttempts + 1, + executedByVersion, + workspaceId, + errorMessage, + }); + } + + await repository.save(rows); } - async markAsFailed({ - name, - workspaceId, - executedByVersion, - error, - }: { - name: string; - workspaceId: string | null; - executedByVersion: string; - error: unknown; - }): Promise { - const previousAttempts = await this.upgradeMigrationRepository.count({ - where: { - name, - workspaceId: workspaceId === null ? IsNull() : workspaceId, - }, - }); - - await this.upgradeMigrationRepository.save({ - name, - status: 'failed', - attempt: previousAttempts + 1, - executedByVersion, - workspaceId, - errorMessage: formatUpgradeErrorForStorage(error), - }); - } - - async markAsInitial({ + async markAsWorkspaceInitial({ name, workspaceId, executedByVersion, + status, queryRunner, }: { name: string; workspaceId: string; executedByVersion: string; + status: UpgradeMigrationStatus; queryRunner?: QueryRunner; }): Promise { const repository = queryRunner @@ -110,7 +139,7 @@ export class UpgradeMigrationService { await repository.save({ name, - status: 'completed', + status, isInitial: true, attempt: 1, executedByVersion, @@ -120,8 +149,8 @@ export class UpgradeMigrationService { // Returns the most recently attempted command (by createdAt) // across instance and active-workspace scopes, with its status. - // Workspace-scoped records from inactive/deleted workspaces are - // excluded so they cannot incorrectly influence the global cursor. + // isInitial records are excluded — they represent activation + // state, not execution progress. async getLastAttemptedCommandNameOrThrow( allActiveOrSuspendedWorkspaceIds: string[], ): Promise<{ @@ -131,6 +160,7 @@ export class UpgradeMigrationService { const queryBuilder = this.upgradeMigrationRepository .createQueryBuilder('migration') .select(['migration.name', 'migration.status']) + .andWhere('migration."isInitial" = false') .andWhere( `migration.attempt = ( SELECT MAX(sub.attempt) @@ -167,7 +197,7 @@ export class UpgradeMigrationService { async getWorkspaceLastAttemptedCommandNameOrThrow( workspaceIds: string[], - ): Promise> { + ): Promise> { if (workspaceIds.length === 0) { return new Map(); } @@ -177,6 +207,7 @@ export class UpgradeMigrationService { .select('migration.workspaceId', 'workspaceId') .addSelect('migration.name', 'name') .addSelect('migration.status', 'status') + .addSelect('migration.isInitial', 'isInitial') .where({ workspaceId: In(workspaceIds), }) @@ -195,15 +226,17 @@ export class UpgradeMigrationService { workspaceId: string; name: string; status: UpgradeMigrationStatus; + isInitial: boolean; }>(); - const cursors = new Map< - string, - { name: string; status: UpgradeMigrationStatus } - >(); + const cursors = new Map(); for (const row of results) { - cursors.set(row.workspaceId, { name: row.name, status: row.status }); + cursors.set(row.workspaceId, { + name: row.name, + status: row.status, + isInitial: row.isInitial, + }); } const missingWorkspaceIds = workspaceIds.filter( @@ -249,4 +282,33 @@ export class UpgradeMigrationService { return completedCount === workspaceIds.length; } + + async getLastAttemptedInstanceCommandOrThrow(): Promise<{ + name: string; + status: UpgradeMigrationStatus; + }> { + const migration = await this.upgradeMigrationRepository + .createQueryBuilder('migration') + .select(['migration.name', 'migration.status']) + .where('migration."workspaceId" IS NULL') + .andWhere('migration."isInitial" = false') + .andWhere( + `migration.attempt = ( + SELECT MAX(sub.attempt) + FROM core."upgradeMigration" sub + WHERE sub.name = migration.name + AND sub."workspaceId" IS NULL + )`, + ) + .orderBy('migration.createdAt', 'DESC') + .getOne(); + + if (!migration) { + throw new Error( + 'No instance command found — the database may not have been initialized', + ); + } + + return { name: migration.name, status: migration.status }; + } } diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service.ts index 56075e95630..846c15cf163 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service.ts +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service.ts @@ -7,6 +7,8 @@ import { type RegisteredWorkspaceCommand, UpgradeCommandRegistryService, } from 'src/engine/core-modules/upgrade/services/upgrade-command-registry.service'; +import { type UpgradeMigrationStatus } from 'src/engine/core-modules/upgrade/upgrade-migration.entity'; +import { isDefined } from 'twenty-shared/utils'; export type FastInstanceUpgradeStep = { kind: 'fast-instance'; @@ -78,7 +80,7 @@ export class UpgradeSequenceReaderService { return cursor; } - getWorkspaceCommandsSliceBounds({ + getWorkspaceSegmentBounds({ sequence, workspaceCommand, }: { @@ -108,7 +110,7 @@ export class UpgradeSequenceReaderService { return { startCursor, endCursor }; } - collectContiguousWorkspaceSteps({ + collectWorkspaceCommandsStartingFrom({ sequence, fromWorkspaceCommand, }: { @@ -160,19 +162,46 @@ export class UpgradeSequenceReaderService { : workspaceCommands.slice(cursorIndex); } - getLastWorkspaceCommand(): RegisteredWorkspaceCommand { + getInitialCursorForNewWorkspace(lastAttemptedInstanceCommand: { + name: string; + status: UpgradeMigrationStatus; + }): { + name: string; + status: UpgradeMigrationStatus; + } { + const { name, status } = lastAttemptedInstanceCommand; const sequence = this.getUpgradeSequence(); - for (let index = sequence.length - 1; index >= 0; index--) { - const step = sequence[index]; + const instanceCursor = this.locateStepInSequenceOrThrow({ + sequence, + stepName: name, + }); - if (step.kind === 'workspace') { - return step; + if (status === 'completed') { + const nextStep = sequence[instanceCursor + 1]; + + if (isDefined(nextStep) && nextStep.kind === 'workspace') { + const lastWc = this.findLastWorkspaceCommandInSegmentStartingAt( + sequence, + nextStep, + ); + + return { name: lastWc.name, status: 'completed' }; } } - throw new Error( - 'No workspace commands found in upgrade sequence — this should have been caught at startup', - ); + return { name, status }; + } + + private findLastWorkspaceCommandInSegmentStartingAt( + sequence: UpgradeStep[], + firstWorkspaceCommand: WorkspaceUpgradeStep, + ): RegisteredWorkspaceCommand { + const segment = this.collectWorkspaceCommandsStartingFrom({ + sequence, + fromWorkspaceCommand: firstWorkspaceCommand, + }); + + return segment[segment.length - 1]; } } diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-runner.service.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-runner.service.ts index 5c32f434ba3..8c6404140f1 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-runner.service.ts +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/upgrade-sequence-runner.service.ts @@ -6,7 +6,10 @@ import { } from 'src/database/commands/command-runners/workspace-iterator.service'; import { type ParsedUpgradeCommandOptions } from 'src/database/commands/upgrade-version-command/upgrade.command'; import { InstanceCommandRunnerService } from 'src/engine/core-modules/upgrade/services/instance-command-runner.service'; -import { UpgradeMigrationService } from 'src/engine/core-modules/upgrade/services/upgrade-migration.service'; +import { + type WorkspaceCursor, + UpgradeMigrationService, +} from 'src/engine/core-modules/upgrade/services/upgrade-migration.service'; import { type InstanceUpgradeStep, type UpgradeStep, @@ -57,16 +60,21 @@ export class UpgradeSequenceRunnerService { let totalSuccesses = 0; let totalFailures = 0; let cursor = startCursor; + let workspaceCursors = await this.fetchWorkspaceCursors( + allActiveOrSuspendedWorkspaceIds, + ); while (cursor < sequence.length) { const step = sequence[cursor]; if (step.kind === 'fast-instance' || step.kind === 'slow-instance') { const previousStep = cursor > 0 ? sequence[cursor - 1] : undefined; + if (previousStep?.kind === 'workspace') { - await this.enforceWorkspaceSyncBarrier({ + this.enforceWorkspacesCompletedPreviousWorkspaceSegment({ + sequence, previousWorkspaceStep: previousStep, - allActiveOrSuspendedWorkspaceIds, + workspaceCursors, }); } @@ -74,18 +82,20 @@ export class UpgradeSequenceRunnerService { instanceStep: step, skipDataMigration: allActiveOrSuspendedWorkspaceIds.length === 0, }); + cursor++; continue; } - const contiguousWorkspaceSteps = - this.upgradeSequenceReaderService.collectContiguousWorkspaceSteps({ + const workspaceCommandsSegment = + this.upgradeSequenceReaderService.collectWorkspaceCommandsStartingFrom({ sequence, fromWorkspaceCommand: step, }); const report = await this.resumeWorkspaceCommandsFromCursors({ - contiguousWorkspaceSteps, + workspaceCommandsSegment, + workspaceCursors, allActiveOrSuspendedWorkspaceIds, options, }); @@ -102,11 +112,16 @@ export class UpgradeSequenceRunnerService { return { totalSuccesses, totalFailures }; } - cursor += contiguousWorkspaceSteps.length; + cursor += workspaceCommandsSegment.length; + + workspaceCursors = await this.fetchWorkspaceCursors( + allActiveOrSuspendedWorkspaceIds, + ); } return { totalSuccesses, totalFailures }; } + private async resolveStartCursor({ sequence, allActiveOrSuspendedWorkspaceIds, @@ -136,12 +151,12 @@ export class UpgradeSequenceRunnerService { } case 'workspace': { const workspaceSliceBounds = - this.upgradeSequenceReaderService.getWorkspaceCommandsSliceBounds({ + this.upgradeSequenceReaderService.getWorkspaceSegmentBounds({ sequence, workspaceCommand: lastAttemptedStep, }); - await this.validateWorkspaceCursorsAreInSameWorkspaceStepsSlice({ + await this.validateWorkspaceCursorsAreInWorkspaceSegment({ sequence, allActiveOrSuspendedWorkspaceIds, workspaceSliceBounds, @@ -154,7 +169,7 @@ export class UpgradeSequenceRunnerService { } } - private async validateWorkspaceCursorsAreInSameWorkspaceStepsSlice({ + private async validateWorkspaceCursorsAreInWorkspaceSegment({ allActiveOrSuspendedWorkspaceIds, sequence, workspaceSliceBounds: { startCursor, endCursor }, @@ -167,22 +182,61 @@ export class UpgradeSequenceRunnerService { await this.upgradeMigrationService.getWorkspaceLastAttemptedCommandNameOrThrow( allActiveOrSuspendedWorkspaceIds, ); + const precedingStep = + startCursor > 0 ? sequence[startCursor - 1] : undefined; + + const invalidWorkspaces: Array<{ + workspaceId: string; + cursorName: string; + cursorStatus: string; + }> = []; for (const [workspaceId, workspaceCursor] of workspaceCursors) { - const cursor = + const cursorPosition = this.upgradeSequenceReaderService.locateStepInSequenceOrThrow({ sequence, stepName: workspaceCursor.name, }); - if (cursor < startCursor || cursor > endCursor) { - throw new Error( - `Workspace ${workspaceId} cursor "${workspaceCursor.name}" is outside the ` + - `current workspace slice [${startCursor}..${endCursor}] — ` + - 'workspaces are not aligned', - ); + const isWithinSegment = + cursorPosition >= startCursor && cursorPosition <= endCursor; + + const isAtPrecedingInstanceCommandCompleted = + isDefined(precedingStep) && + precedingStep.kind !== 'workspace' && + cursorPosition === startCursor - 1 && + workspaceCursor.status === 'completed'; + + if (!isWithinSegment && !isAtPrecedingInstanceCommandCompleted) { + invalidWorkspaces.push({ + workspaceId, + cursorName: workspaceCursor.name, + cursorStatus: workspaceCursor.status, + }); } } + + if (invalidWorkspaces.length > 0) { + const details = invalidWorkspaces + .map( + ({ workspaceId, cursorName, cursorStatus }) => + `${workspaceId} at "${cursorName}" (${cursorStatus})`, + ) + .join(', '); + + throw new Error( + `${invalidWorkspaces.length} workspace(s) have invalid cursors for ` + + `workspace segment [${startCursor}..${endCursor}]: ${details}`, + ); + } + } + + private async fetchWorkspaceCursors( + allActiveOrSuspendedWorkspaceIds: string[], + ): Promise> { + return this.upgradeMigrationService.getWorkspaceLastAttemptedCommandNameOrThrow( + allActiveOrSuspendedWorkspaceIds, + ); } private async runInstanceStep({ @@ -226,24 +280,23 @@ export class UpgradeSequenceRunnerService { } private async resumeWorkspaceCommandsFromCursors({ - contiguousWorkspaceSteps, + workspaceCommandsSegment, + workspaceCursors, allActiveOrSuspendedWorkspaceIds, options, }: { - contiguousWorkspaceSteps: WorkspaceUpgradeStep[]; + workspaceCommandsSegment: WorkspaceUpgradeStep[]; + workspaceCursors: Map; allActiveOrSuspendedWorkspaceIds: string[]; options: ParsedUpgradeCommandOptions; }): Promise { - const workspaceCursors = - await this.upgradeMigrationService.getWorkspaceLastAttemptedCommandNameOrThrow( - allActiveOrSuspendedWorkspaceIds, - ); + const workspaceIds = + isDefined(options.workspaceIds) && options.workspaceIds.length > 0 + ? options.workspaceIds + : allActiveOrSuspendedWorkspaceIds; return this.workspaceIteratorService.iterate({ - workspaceIds: - isDefined(options.workspaceIds) && options.workspaceIds.length > 0 - ? options.workspaceIds - : allActiveOrSuspendedWorkspaceIds, + workspaceIds, startFromWorkspaceId: options.startFromWorkspaceId, workspaceCountLimit: options.workspaceCountLimit, dryRun: options.dryRun, @@ -258,7 +311,7 @@ export class UpgradeSequenceRunnerService { const pendingCommands = this.upgradeSequenceReaderService.getPendingWorkspaceCommands({ - workspaceCommands: contiguousWorkspaceSteps, + workspaceCommands: workspaceCommandsSegment, workspaceCursor, }); @@ -271,24 +324,39 @@ export class UpgradeSequenceRunnerService { }); } - private async enforceWorkspaceSyncBarrier({ + private enforceWorkspacesCompletedPreviousWorkspaceSegment({ + sequence, previousWorkspaceStep, - allActiveOrSuspendedWorkspaceIds, + workspaceCursors, }: { + sequence: UpgradeStep[]; previousWorkspaceStep: WorkspaceUpgradeStep; - allActiveOrSuspendedWorkspaceIds: string[]; - }): Promise { - const allWorkspacesReady = - await this.upgradeMigrationService.areAllWorkspacesAtCommand({ - commandName: previousWorkspaceStep.name, - workspaceIds: allActiveOrSuspendedWorkspaceIds, + workspaceCursors: Map; + }): void { + const barrierCursor = + this.upgradeSequenceReaderService.locateStepInSequenceOrThrow({ + sequence, + stepName: previousWorkspaceStep.name, }); - if (!allWorkspacesReady) { - throw new Error( - 'Cannot run instance step: not all workspaces have completed ' + - `"${previousWorkspaceStep.name}"`, - ); + for (const [workspaceId, workspaceCursor] of workspaceCursors) { + const cursorPosition = + this.upgradeSequenceReaderService.locateStepInSequenceOrThrow({ + sequence, + stepName: workspaceCursor.name, + }); + + const isAtBarrierAndCompleted = + cursorPosition === barrierCursor && + workspaceCursor.status === 'completed'; + + if (!isAtBarrierAndCompleted) { + throw new Error( + `Cannot run instance step: workspace ${workspaceId} ` + + `has not completed "${previousWorkspaceStep.name}" ` + + `(cursor: "${workspaceCursor.name}", status: "${workspaceCursor.status}")`, + ); + } } } } diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/workspace-command-runner.service.ts b/packages/twenty-server/src/engine/core-modules/upgrade/services/workspace-command-runner.service.ts index 68d531e3d90..9203aed4547 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/workspace-command-runner.service.ts +++ b/packages/twenty-server/src/engine/core-modules/upgrade/services/workspace-command-runner.service.ts @@ -78,17 +78,21 @@ export class WorkspaceCommandRunnerService { }); if (!options.dryRun) { - await this.upgradeMigrationService.markAsCompleted({ + await this.upgradeMigrationService.recordUpgradeMigration({ name, - workspaceId, + workspaceIds: [workspaceId], + isInstance: false, + status: 'completed', executedByVersion, }); } } catch (error) { if (!options.dryRun) { - await this.upgradeMigrationService.markAsFailed({ + await this.upgradeMigrationService.recordUpgradeMigration({ name, - workspaceId, + workspaceIds: [workspaceId], + isInstance: false, + status: 'failed', executedByVersion, error, }); diff --git a/packages/twenty-server/src/engine/core-modules/workspace/services/workspace.service.ts b/packages/twenty-server/src/engine/core-modules/workspace/services/workspace.service.ts index aef479fd845..2b0fe8d3466 100644 --- a/packages/twenty-server/src/engine/core-modules/workspace/services/workspace.service.ts +++ b/packages/twenty-server/src/engine/core-modules/workspace/services/workspace.service.ts @@ -384,8 +384,13 @@ export class WorkspaceService extends TypeOrmQueryService { workspaceId: string; displayName: string; }): Promise { - const lastWorkspaceCommand = - this.upgradeSequenceReaderService.getLastWorkspaceCommand(); + const lastAttemptedInstanceCommand = + await this.upgradeMigrationService.getLastAttemptedInstanceCommandOrThrow(); + + const initialCursor = + this.upgradeSequenceReaderService.getInitialCursorForNewWorkspace( + lastAttemptedInstanceCommand, + ); const appVersion = this.twentyConfigService.get('APP_VERSION'); @@ -401,10 +406,11 @@ export class WorkspaceService extends TypeOrmQueryService { version: extractVersionMajorMinorPatch(appVersion), }); - await this.upgradeMigrationService.markAsInitial({ - name: lastWorkspaceCommand.name, + await this.upgradeMigrationService.markAsWorkspaceInitial({ + name: initialCursor.name, workspaceId, executedByVersion: appVersion ?? 'unknown', + status: initialCursor.status, queryRunner, }); diff --git a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant.ts b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant.ts index 6da9d7085cb..50f3c925cea 100644 --- a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant.ts +++ b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant.ts @@ -21,11 +21,17 @@ export type CreateWorkspaceInput = Pick< export const SEED_APPLE_WORKSPACE_ID = '20202020-1c25-4d02-bf25-6aeccf7ea419'; export const SEED_YCOMBINATOR_WORKSPACE_ID = '3b8e6458-5fc1-4e63-8563-008ccddaa6db'; +export const SEED_EMPTY_WORKSPACE_3_ID = '506915ec-21ca-431b-a04a-257eb216865e'; +export const SEED_EMPTY_WORKSPACE_4_ID = 'aa8fdcb1-8ee1-4012-98af-44a97caa7411'; export type SeededWorkspacesIds = | typeof SEED_APPLE_WORKSPACE_ID | typeof SEED_YCOMBINATOR_WORKSPACE_ID; +export type SeededEmptyWorkspacesIds = + | typeof SEED_EMPTY_WORKSPACE_3_ID + | typeof SEED_EMPTY_WORKSPACE_4_ID; + export const SEEDER_CREATE_WORKSPACE_INPUT = { [SEED_APPLE_WORKSPACE_ID]: { id: SEED_APPLE_WORKSPACE_ID, @@ -49,3 +55,29 @@ export const SEEDER_CREATE_WORKSPACE_INPUT = { SeededWorkspacesIds, Omit >; + +// Empty workspaces with no users, metadata, or data — used by integration tests +// that need more than 2 workspaces (e.g. upgrade sequence runner tests). +export const SEEDER_CREATE_EMPTY_WORKSPACE_INPUT = { + [SEED_EMPTY_WORKSPACE_3_ID]: { + id: SEED_EMPTY_WORKSPACE_3_ID, + displayName: 'Empty3', + subdomain: 'empty3', + inviteHash: 'empty3.dev-invite-hash', + logo: '', + activationStatus: WorkspaceActivationStatus.PENDING_CREATION, + isTwoFactorAuthenticationEnforced: false, + }, + [SEED_EMPTY_WORKSPACE_4_ID]: { + id: SEED_EMPTY_WORKSPACE_4_ID, + displayName: 'Empty4', + subdomain: 'empty4', + inviteHash: 'empty4.dev-invite-hash', + logo: '', + activationStatus: WorkspaceActivationStatus.PENDING_CREATION, + isTwoFactorAuthenticationEnforced: false, + }, +} as const satisfies Record< + SeededEmptyWorkspacesIds, + Omit +>; diff --git a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/services/dev-seeder-permissions.service.ts b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/services/dev-seeder-permissions.service.ts index 147e5e786ea..b8d193669af 100644 --- a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/services/dev-seeder-permissions.service.ts +++ b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/core/services/dev-seeder-permissions.service.ts @@ -10,6 +10,7 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat import { FieldPermissionService } from 'src/engine/metadata-modules/object-permission/field-permission/field-permission.service'; import { ObjectPermissionService } from 'src/engine/metadata-modules/object-permission/object-permission.service'; import { RoleTargetService } from 'src/engine/metadata-modules/role-target/services/role-target.service'; +import { RoleDTO } from 'src/engine/metadata-modules/role/dtos/role.dto'; import { RoleEntity } from 'src/engine/metadata-modules/role/role.entity'; import { RoleService } from 'src/engine/metadata-modules/role/role.service'; import { UserRoleService } from 'src/engine/metadata-modules/user-role/user-role.service'; @@ -146,6 +147,27 @@ export class DevSeederPermissionsService { roleId: adminRole.id, }); + const memberRole = await this.initMinimalPermissionsAndActivateWorkspace({ + workspaceId, + workspaceCustomFlatApplication, + }); + + if (memberUserWorkspaceIds.length > 0) { + await this.userRoleService.assignRoleToManyUserWorkspace({ + workspaceId, + userWorkspaceIds: memberUserWorkspaceIds, + roleId: memberRole.id, + }); + } + } + + public async initMinimalPermissionsAndActivateWorkspace({ + workspaceId, + workspaceCustomFlatApplication, + }: { + workspaceId: string; + workspaceCustomFlatApplication: FlatApplication; + }): Promise { const memberRole = await this.roleService.createMemberRole({ workspaceId, ownerFlatApplication: workspaceCustomFlatApplication, @@ -158,13 +180,7 @@ export class DevSeederPermissionsService { activationStatus: WorkspaceActivationStatus.ACTIVE, }); - if (memberUserWorkspaceIds.length > 0) { - await this.userRoleService.assignRoleToManyUserWorkspace({ - workspaceId, - userWorkspaceIds: memberUserWorkspaceIds, - roleId: memberRole.id, - }); - } + return memberRole; } private async createLimitedRoleForSeedWorkspace({ diff --git a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/services/dev-seeder.service.ts b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/services/dev-seeder.service.ts index d284fe1b031..e6b63134a2b 100644 --- a/packages/twenty-server/src/engine/workspace-manager/dev-seeder/services/dev-seeder.service.ts +++ b/packages/twenty-server/src/engine/workspace-manager/dev-seeder/services/dev-seeder.service.ts @@ -11,6 +11,7 @@ import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/s import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { UpgradeMigrationService } from 'src/engine/core-modules/upgrade/services/upgrade-migration.service'; import { UpgradeSequenceReaderService } from 'src/engine/core-modules/upgrade/services/upgrade-sequence-reader.service'; +import { type UpgradeMigrationStatus } from 'src/engine/core-modules/upgrade/upgrade-migration.entity'; import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; import { getMetadataFlatEntityMapsKey } from 'src/engine/metadata-modules/flat-entity/utils/get-metadata-flat-entity-maps-key.util'; import { getMetadataRelatedMetadataNames } from 'src/engine/metadata-modules/flat-entity/utils/get-metadata-related-metadata-names.util'; @@ -19,7 +20,9 @@ import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { + type SeededEmptyWorkspacesIds, type SeededWorkspacesIds, + SEEDER_CREATE_EMPTY_WORKSPACE_INPUT, SEEDER_CREATE_WORKSPACE_INPUT, } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant'; import { seedBillingCustomers } from 'src/engine/workspace-manager/dev-seeder/core/billing/utils/seed-billing-customers.util'; @@ -72,14 +75,18 @@ export class DevSeederService { const isBillingEnabled = this.twentyConfigService.get('IS_BILLING_ENABLED'); const appVersion = this.twentyConfigService.get('APP_VERSION') ?? 'unknown'; - const lastWorkspaceCommand = - this.upgradeSequenceReaderService.getLastWorkspaceCommand(); + const lastAttemptedInstanceCommand = + await this.upgradeMigrationService.getLastAttemptedInstanceCommandOrThrow(); + const initialCursor = + this.upgradeSequenceReaderService.getInitialCursorForNewWorkspace( + lastAttemptedInstanceCommand, + ); await this.seedCoreSchema({ workspaceId, seedBilling: isBillingEnabled, appVersion, - lastUpgradeStepName: lastWorkspaceCommand.name, + initialCursor, }); await this.applicationRegistrationService.createCliRegistrationIfNotExists(); @@ -191,15 +198,93 @@ export class DevSeederService { await this.workspaceCacheStorageService.flush(workspaceId, undefined); } + public async seedEmptyWorkspace( + workspaceId: SeededEmptyWorkspacesIds, + ): Promise { + const appVersion = this.twentyConfigService.get('APP_VERSION') ?? 'unknown'; + const lastAttemptedInstanceCommand = + await this.upgradeMigrationService.getLastAttemptedInstanceCommandOrThrow(); + const initialCursor = + this.upgradeSequenceReaderService.getInitialCursorForNewWorkspace( + lastAttemptedInstanceCommand, + ); + + const createWorkspaceStaticInput = + SEEDER_CREATE_EMPTY_WORKSPACE_INPUT[workspaceId]; + const queryRunner = this.coreDataSource.createQueryRunner(); + + await queryRunner.connect(); + await queryRunner.startTransaction(); + + try { + const workspaceCustomApplicationId = v4(); + + await createWorkspace({ + queryRunner, + schemaName: 'core', + createWorkspaceInput: { + ...createWorkspaceStaticInput, + workspaceCustomApplicationId, + }, + }); + + await this.applicationService.createWorkspaceCustomApplication( + { + workspaceId, + applicationId: workspaceCustomApplicationId, + }, + queryRunner, + ); + + await this.applicationService.createTwentyStandardApplication( + { + workspaceId, + skipCacheInvalidation: true, + }, + queryRunner, + ); + + await queryRunner.commitTransaction(); + } catch (error) { + await queryRunner.rollbackTransaction(); + throw error; + } finally { + await queryRunner.release(); + } + + const { workspaceCustomFlatApplication } = + await this.applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow( + { + workspaceId, + }, + ); + + await this.devSeederPermissionsService.initMinimalPermissionsAndActivateWorkspace( + { + workspaceId, + workspaceCustomFlatApplication, + }, + ); + + await this.upgradeMigrationService.markAsWorkspaceInitial({ + name: initialCursor.name, + workspaceId, + executedByVersion: appVersion, + status: initialCursor.status, + }); + + await this.workspaceCacheStorageService.flush(workspaceId, undefined); + } + private async seedCoreSchema({ workspaceId, appVersion, - lastUpgradeStepName, + initialCursor, seedBilling = true, }: { workspaceId: SeededWorkspacesIds; appVersion: string; - lastUpgradeStepName: string; + initialCursor: { name: string; status: UpgradeMigrationStatus }; seedBilling?: boolean; }): Promise { const schemaName = 'core'; @@ -257,10 +342,11 @@ export class DevSeederService { await seedMetadataEntities({ queryRunner, schemaName, workspaceId }); - await this.upgradeMigrationService.markAsInitial({ - name: lastUpgradeStepName, + await this.upgradeMigrationService.markAsWorkspaceInitial({ + name: initialCursor.name, workspaceId, executedByVersion: appVersion, + status: initialCursor.status, queryRunner, }); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-version/services/workspace-version.service.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-version/services/workspace-version.service.ts index d1ac5c841ac..c5bd1874bfe 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-version/services/workspace-version.service.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-version/services/workspace-version.service.ts @@ -2,7 +2,7 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { In, MoreThanOrEqual, Repository } from 'typeorm'; +import { In, MoreThanOrEqual, QueryRunner, Repository } from 'typeorm'; import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity'; @@ -27,11 +27,17 @@ export class WorkspaceVersionService { async getActiveOrSuspendedWorkspaceIds({ startFromWorkspaceId, workspaceCountLimit, + queryRunner, }: { startFromWorkspaceId?: string; workspaceCountLimit?: number; + queryRunner?: QueryRunner; } = {}): Promise { - const workspaces = await this.workspaceRepository.find({ + const repository = queryRunner + ? queryRunner.manager.getRepository(WorkspaceEntity) + : this.workspaceRepository; + + const workspaces = await repository.find({ select: ['id'], where: { activationStatus: In([ diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/__snapshots__/failing-sequence-runner.integration-spec.ts.snap b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/__snapshots__/failing-sequence-runner.integration-spec.ts.snap similarity index 100% rename from packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/__snapshots__/failing-sequence-runner.integration-spec.ts.snap rename to packages/twenty-server/test/integration/upgrade/suites/sequence-runner/__snapshots__/failing-sequence-runner.integration-spec.ts.snap diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/failing-sequence-runner.integration-spec.ts b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/failing-sequence-runner.integration-spec.ts similarity index 87% rename from packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/failing-sequence-runner.integration-spec.ts rename to packages/twenty-server/test/integration/upgrade/suites/sequence-runner/failing-sequence-runner.integration-spec.ts index e211cc076ac..74203a69cea 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/failing-sequence-runner.integration-spec.ts +++ b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/failing-sequence-runner.integration-spec.ts @@ -11,12 +11,13 @@ import { makeStep, makeWorkspace, resetSeedSequenceCounter, - seedMigration, + seedInstanceMigration, + seedWorkspaceMigration, setMockActiveWorkspaceIds, testGetLatestMigrationForCommand, WS_1, WS_2, -} from './utils/upgrade-sequence-runner-integration-test.util'; +} from 'test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util'; const makeFailingFastInstance = (name: string, error: Error): UpgradeStep => ({ @@ -94,7 +95,7 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = it('should throw when cursor command is not found in the sequence', async () => { const sequence = [makeFastInstance('Ic1'), makeFastInstance('Ic2')]; - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'RemovedCommand', status: 'completed', }); @@ -118,31 +119,32 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = setMockActiveWorkspaceIds([WS_1, WS_2]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc2', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_2, }); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc3', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc4', status: 'completed', workspaceId: WS_1, @@ -153,7 +155,9 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = sequence, options: DEFAULT_OPTIONS, }), - ).rejects.toThrow('workspaces are not aligned'); + ).rejects.toThrow( + 'workspace(s) have invalid cursors for workspace segment', + ); }); it('should throw when an active workspace has no migration history', async () => { @@ -161,7 +165,7 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = setMockActiveWorkspaceIds([WS_1, WS_2]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, @@ -182,7 +186,7 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = makeFailingFastInstance('Ic2', error), ]; - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', }); @@ -221,9 +225,10 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); await expect( @@ -251,7 +256,7 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'failed', workspaceId: WS_1, @@ -295,26 +300,27 @@ describe('UpgradeSequenceRunnerService — failing sequence (integration)', () = setMockActiveWorkspaceIds([WS_1, WS_2]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc0', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc0', status: 'completed', workspaceId: WS_2, }); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1, WS_2], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_2, diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/successful-sequence-runner.integration-spec.ts b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/successful-sequence-runner.integration-spec.ts similarity index 88% rename from packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/successful-sequence-runner.integration-spec.ts rename to packages/twenty-server/test/integration/upgrade/suites/sequence-runner/successful-sequence-runner.integration-spec.ts index e522236c77c..d1346bdff86 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/successful-sequence-runner.integration-spec.ts +++ b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/successful-sequence-runner.integration-spec.ts @@ -8,12 +8,13 @@ import { makeSlowInstance, makeWorkspace, resetSeedSequenceCounter, - seedMigration, + seedInstanceMigration, + seedWorkspaceMigration, setMockActiveWorkspaceIds, testGetLatestMigrationForCommand, WS_1, WS_2, -} from './utils/upgrade-sequence-runner-integration-test.util'; +} from 'test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util'; describe('UpgradeSequenceRunnerService — execution (integration)', () => { let context: IntegrationTestContext; @@ -51,11 +52,11 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { makeSlowInstance('Ic3'), ]; - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', }); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic2', status: 'completed', }); @@ -87,11 +88,11 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { it('should retry a failed instance command', async () => { const sequence = [makeFastInstance('Ic1'), makeFastInstance('Ic2')]; - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', }); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic2', status: 'failed', }); @@ -119,11 +120,12 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, @@ -149,7 +151,7 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, @@ -172,7 +174,7 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { it('should skip data migration for slow instance commands when no workspaces exist', async () => { const sequence = [makeFastInstance('Ic1'), makeSlowInstance('Ic2')]; - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', }); @@ -208,9 +210,10 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic0', status: 'completed', + workspaceIds: [WS_1], }); const instanceCommandRunnerService = context.module.get( @@ -248,16 +251,17 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1, WS_2]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1, WS_2], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_2, @@ -300,14 +304,15 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc0', status: 'completed', workspaceId: WS_1, }); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); await context.runner.run({ @@ -340,11 +345,12 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'failed', workspaceId: WS_1, @@ -388,11 +394,12 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, @@ -430,18 +437,19 @@ describe('UpgradeSequenceRunnerService — execution (integration)', () => { setMockActiveWorkspaceIds([WS_1]); - await seedMigration(context.dataSource, { + await seedInstanceMigration(context.dataSource, { name: 'Ic1', status: 'completed', + workspaceIds: [WS_1], }); - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc1', status: 'completed', workspaceId: WS_1, }); // WS_2 is inactive — its record is more recent (seeded later) // but should not influence the global cursor - await seedMigration(context.dataSource, { + await seedWorkspaceMigration(context.dataSource, { name: 'Wc2', status: 'completed', workspaceId: WS_2, diff --git a/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/workspace-segment-alignment.integration-spec.ts b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/workspace-segment-alignment.integration-spec.ts new file mode 100644 index 00000000000..f1d745006b2 --- /dev/null +++ b/packages/twenty-server/test/integration/upgrade/suites/sequence-runner/workspace-segment-alignment.integration-spec.ts @@ -0,0 +1,587 @@ +import { + type IntegrationTestContext, + createUpgradeSequenceRunnerIntegrationTestModule, + DEFAULT_OPTIONS, + makeFastInstance, + makeSlowInstance, + makeWorkspace, + migrationRecordToKey, + resetSeedSequenceCounter, + seedInstanceMigration, + seedWorkspaceMigration, + setMockActiveWorkspaceIds, + testGetExecutedMigrationsInOrder, + WS_1, + WS_2, + WS_3, +} from 'test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util'; + +describe('UpgradeSequenceRunnerService — workspace segment alignment (integration)', () => { + let context: IntegrationTestContext; + + beforeAll(async () => { + context = await createUpgradeSequenceRunnerIntegrationTestModule(); + }, 30000); + + afterAll(async () => { + await context.dataSource.query('DELETE FROM core."upgradeMigration"'); + await context.module?.close(); + await context.dataSource?.destroy(); + }, 15000); + + beforeEach(async () => { + await context.dataSource.query('DELETE FROM core."upgradeMigration"'); + resetSeedSequenceCounter(); + setMockActiveWorkspaceIds([]); + jest.restoreAllMocks(); + }); + + it('should upgrade all workspaces when they are in the same segment at different positions', async () => { + // Sequence: Ic0 → Wc0 → Wc1 → Wc2 → Ic1 → Ic2 → Wc3 → Wc4 + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeWorkspace('Wc2'), + makeFastInstance('Ic1'), + makeSlowInstance('Ic2'), + makeWorkspace('Wc3'), + makeWorkspace('Wc4'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2, WS_3]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1, WS_2, WS_3], + }); + + // WS_1: at Wc0, completed + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + + // WS_2: at Wc1, failed (needs retry) + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_2, + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc1', + status: 'failed', + workspaceId: WS_2, + }); + + // WS_3: at Wc2, completed (most advanced in segment) + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_3, + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc1', + status: 'completed', + workspaceId: WS_3, + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc2', + status: 'completed', + workspaceId: WS_3, + }); + + const report = await context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2, WS_3], + }, + }); + + expect(report.totalFailures).toBe(0); + + const executed = await testGetExecutedMigrationsInOrder(context.dataSource); + + expect(executed.map(migrationRecordToKey)).toStrictEqual([ + // Seeds (Ic0 global + workspace rows inserted at same timestamp) + 'Ic0:instance:completed:1', + `Ic0:${WS_1}:completed:1`, + `Ic0:${WS_2}:completed:1`, + `Ic0:${WS_3}:completed:1`, + `Wc0:${WS_1}:completed:1`, + `Wc0:${WS_2}:completed:1`, + `Wc1:${WS_2}:failed:1`, + `Wc0:${WS_3}:completed:1`, + `Wc1:${WS_3}:completed:1`, + `Wc2:${WS_3}:completed:1`, + + // Segment A: WS_1 runs Wc1, Wc2; WS_2 retries Wc1, runs Wc2; WS_3 already done + `Wc1:${WS_1}:completed:1`, + `Wc2:${WS_1}:completed:1`, + `Wc1:${WS_2}:completed:2`, + `Wc2:${WS_2}:completed:1`, + + // Sync barrier → instance steps (with workspace-level rows) + 'Ic1:instance:completed:1', + `Ic1:${WS_1}:completed:1`, + `Ic1:${WS_2}:completed:1`, + `Ic1:${WS_3}:completed:1`, + 'Ic2:instance:completed:1', + `Ic2:${WS_1}:completed:1`, + `Ic2:${WS_2}:completed:1`, + `Ic2:${WS_3}:completed:1`, + + // Segment B: all workspaces run Wc3, Wc4 + `Wc3:${WS_1}:completed:1`, + `Wc4:${WS_1}:completed:1`, + `Wc3:${WS_2}:completed:1`, + `Wc4:${WS_2}:completed:1`, + `Wc3:${WS_3}:completed:1`, + `Wc4:${WS_3}:completed:1`, + ]); + }); + + it('should reject workspaces with cursors ahead of the current segment', async () => { + // Sequence: Ic0 → Wc0 → Wc1 → Ic1 → Wc2 → Wc3 + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic1'), + makeWorkspace('Wc2'), + makeWorkspace('Wc3'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + // WS_1 existed when Ic0 ran; WS_2 was created later (initial at Wc2) + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1], + }); + + // WS_1: at Wc0 (in segment A) - correct + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + + // WS_2: at Wc2 (in segment B) - WRONG! Ahead of current segment + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc2', + status: 'completed', + workspaceId: WS_2, + isInitial: true, + }); + + await expect( + context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }), + ).rejects.toThrow( + /workspace\(s\) have invalid cursors for workspace segment/, + ); + }); + + it('should reject workspaces with cursors behind the current segment', async () => { + // Sequence: Wc-1 → Ic0 → Wc0 → Wc1 → Ic1 → Wc2 + const sequence = [ + makeWorkspace('Wc-1'), + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic1'), + makeWorkspace('Wc2'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1, WS_2], + }); + + // WS_1: at Wc0 (in segment after Ic0) - correct + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + + // WS_2: at Wc-1 (in segment before Ic0) - WRONG! Behind current segment + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc-1', + status: 'completed', + workspaceId: WS_2, + }); + + await expect( + context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }), + ).rejects.toThrow( + /workspace\(s\) have invalid cursors for workspace segment/, + ); + }); + + it('should handle workspaces created at correct segment via isInitial', async () => { + // Sequence: Wc0 → Ic0 → Ic1 → Wc1 → Wc2 + // WS_1 existed from the start, WS_2 was created after Ic1 + const sequence = [ + makeWorkspace('Wc0'), + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc1'), + makeWorkspace('Wc2'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1], + }); + await seedInstanceMigration(context.dataSource, { + name: 'Ic1', + status: 'completed', + workspaceIds: [WS_1], + }); + + // WS_1: at Wc1 (in correct segment after Ic1) + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc1', + status: 'completed', + workspaceId: WS_1, + }); + + // WS_2: newly created with isInitial at Wc2 (correct segment after Ic1) + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc2', + status: 'completed', + workspaceId: WS_2, + isInitial: true, + }); + + const report = await context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }); + + expect(report.totalFailures).toBe(0); + + const executed = await testGetExecutedMigrationsInOrder(context.dataSource); + + expect(executed.map(migrationRecordToKey)).toStrictEqual([ + // Seeds + 'Ic0:instance:completed:1', + `Ic0:${WS_1}:completed:1`, + 'Ic1:instance:completed:1', + `Ic1:${WS_1}:completed:1`, + `Wc1:${WS_1}:completed:1`, + `Wc2:${WS_2}:completed:1:initial`, + + // WS_1 runs Wc2, WS_2 already at Wc2 (no new commands needed) + `Wc2:${WS_1}:completed:1`, + ]); + }); + + it('should accept workspaces at the preceding completed IC when others are in the WC segment (-w scenario)', async () => { + // Sequence: Ic0 → Ic1 → Wc0 → Wc1 + // WS_1 was upgraded via -w and is at Wc1:completed + // WS_2 is still at Ic1:completed (preceding IC) + const sequence = [ + makeFastInstance('Ic0'), + makeFastInstance('Ic1'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1, WS_2], + }); + await seedInstanceMigration(context.dataSource, { + name: 'Ic1', + status: 'completed', + workspaceIds: [WS_1, WS_2], + }); + + // WS_1 was upgraded ahead via -w + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc1', + status: 'completed', + workspaceId: WS_1, + }); + + const report = await context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }); + + expect(report.totalFailures).toBe(0); + + const executed = await testGetExecutedMigrationsInOrder(context.dataSource); + + expect(executed.map(migrationRecordToKey)).toStrictEqual([ + // Seeds + 'Ic0:instance:completed:1', + `Ic0:${WS_1}:completed:1`, + `Ic0:${WS_2}:completed:1`, + 'Ic1:instance:completed:1', + `Ic1:${WS_1}:completed:1`, + `Ic1:${WS_2}:completed:1`, + `Wc0:${WS_1}:completed:1`, + `Wc1:${WS_1}:completed:1`, + + // WS_2 runs full segment, WS_1 already done + `Wc0:${WS_2}:completed:1`, + `Wc1:${WS_2}:completed:1`, + ]); + }); + + it('should reject workspace at preceding IC with failed status', async () => { + // Sequence: Ic0 → Wc0 → Wc1 + // WS_1 is in the WC segment, WS_2 is at Ic0:failed + const sequence = [ + makeFastInstance('Ic0'), + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1], + }); + + // WS_2 at Ic0:failed — should be rejected + await seedWorkspaceMigration(context.dataSource, { + name: 'Ic0', + status: 'failed', + workspaceId: WS_2, + }); + + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + + await expect( + context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }), + ).rejects.toThrow( + /workspace\(s\) have invalid cursors for workspace segment/, + ); + }); + + it('should reject workspace stuck in a previous WC segment (corrupted state)', async () => { + // Sequence: Wc0 → Wc1 → Ic0 → Wc2 → Wc3 + // WS_1 is in segment [Wc2..Wc3], WS_2 is stuck at Wc0 (previous WC segment) + const sequence = [ + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + makeFastInstance('Ic0'), + makeWorkspace('Wc2'), + makeWorkspace('Wc3'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_1, + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc1', + status: 'completed', + workspaceId: WS_1, + }); + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1], + }); + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc2', + status: 'completed', + workspaceId: WS_1, + }); + + // WS_2 stuck at Wc0 in previous WC segment — corrupted + await seedWorkspaceMigration(context.dataSource, { + name: 'Wc0', + status: 'completed', + workspaceId: WS_2, + }); + + await expect( + context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }), + ).rejects.toThrow( + /workspace\(s\) have invalid cursors for workspace segment/, + ); + }); + + it('should write workspace rows for IC failure, accept workspace created mid-failure, and succeed on restart', async () => { + // Sequence: Ic0 → Ic1 → Wc0 → Wc1 + // First run: Ic1 fails → global + workspace rows written as failed + // WS_3 is created while Ic1 is failed → initial cursor at Ic1:failed + // Second run: Ic1 retries and succeeds → WC segment runs for all 3 workspaces + const failOnce = { shouldFail: true }; + + const ic1Command = { + up: async () => { + if (failOnce.shouldFail) { + failOnce.shouldFail = false; + throw new Error('Ic1 temporary failure'); + } + }, + down: async () => {}, + }; + + const sequence = [ + makeFastInstance('Ic0'), + { + ...makeFastInstance('Ic1'), + command: ic1Command, + } as unknown as ReturnType, + makeWorkspace('Wc0'), + makeWorkspace('Wc1'), + ]; + + setMockActiveWorkspaceIds([WS_1, WS_2]); + + await seedInstanceMigration(context.dataSource, { + name: 'Ic0', + status: 'completed', + workspaceIds: [WS_1, WS_2], + }); + + // First run — Ic1 fails + await expect( + context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2], + }, + }), + ).rejects.toThrow('Ic1 temporary failure'); + + const afterFailure = await testGetExecutedMigrationsInOrder( + context.dataSource, + ); + + expect(afterFailure.map(migrationRecordToKey)).toStrictEqual([ + // Seeds + 'Ic0:instance:completed:1', + `Ic0:${WS_1}:completed:1`, + `Ic0:${WS_2}:completed:1`, + + // Ic1 failed globally + workspace rows + 'Ic1:instance:failed:1', + `Ic1:${WS_1}:failed:1`, + `Ic1:${WS_2}:failed:1`, + ]); + + // WS_3 created while Ic1 is failed — + // getInitialCursorForNewWorkspace returns { name: 'Ic1', status: 'failed' } + await seedWorkspaceMigration(context.dataSource, { + name: 'Ic1', + status: 'failed', + workspaceId: WS_3, + isInitial: true, + useCurrentTimestamp: true, + }); + + setMockActiveWorkspaceIds([WS_1, WS_2, WS_3]); + + // Second run — Ic1 retries and succeeds, then WC segment runs for all 3 + const report = await context.runner.run({ + sequence, + options: { + ...DEFAULT_OPTIONS, + workspaceIds: [WS_1, WS_2, WS_3], + }, + }); + + expect(report.totalFailures).toBe(0); + + const afterRetry = await testGetExecutedMigrationsInOrder( + context.dataSource, + ); + + expect(afterRetry.map(migrationRecordToKey)).toStrictEqual([ + // Seeds + 'Ic0:instance:completed:1', + `Ic0:${WS_1}:completed:1`, + `Ic0:${WS_2}:completed:1`, + + // First run — Ic1 failed + 'Ic1:instance:failed:1', + `Ic1:${WS_1}:failed:1`, + `Ic1:${WS_2}:failed:1`, + + // WS_3 created after Ic1 failure, initial cursor at Ic1:failed + `Ic1:${WS_3}:failed:1:initial`, + + // Second run — Ic1 retry succeeds + 'Ic1:instance:completed:2', + `Ic1:${WS_1}:completed:2`, + `Ic1:${WS_2}:completed:2`, + `Ic1:${WS_3}:completed:2`, + + // WC segment runs for all 3 workspaces + `Wc0:${WS_1}:completed:1`, + `Wc1:${WS_1}:completed:1`, + `Wc0:${WS_2}:completed:1`, + `Wc1:${WS_2}:completed:1`, + `Wc0:${WS_3}:completed:1`, + `Wc1:${WS_3}:completed:1`, + ]); + }); + +}); diff --git a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/utils/upgrade-sequence-runner-integration-test.util.ts b/packages/twenty-server/test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util.ts similarity index 72% rename from packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/utils/upgrade-sequence-runner-integration-test.util.ts rename to packages/twenty-server/test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util.ts index 3b22e19d2fe..3f07649d945 100644 --- a/packages/twenty-server/src/engine/core-modules/upgrade/services/__tests__/utils/upgrade-sequence-runner-integration-test.util.ts +++ b/packages/twenty-server/test/integration/upgrade/utils/upgrade-sequence-runner-integration-test.util.ts @@ -18,6 +18,8 @@ import { WorkspaceCommandRunnerService } from 'src/engine/core-modules/upgrade/s import { UpgradeMigrationEntity } from 'src/engine/core-modules/upgrade/upgrade-migration.entity'; import { SEED_APPLE_WORKSPACE_ID, + SEED_EMPTY_WORKSPACE_3_ID, + SEED_EMPTY_WORKSPACE_4_ID, SEED_YCOMBINATOR_WORKSPACE_ID, } from 'src/engine/workspace-manager/dev-seeder/core/constants/seeder-workspaces.constant'; import { WorkspaceVersionService } from 'src/engine/workspace-manager/workspace-version/services/workspace-version.service'; @@ -31,6 +33,8 @@ config({ export const WS_1 = SEED_APPLE_WORKSPACE_ID; export const WS_2 = SEED_YCOMBINATOR_WORKSPACE_ID; +export const WS_3 = SEED_EMPTY_WORKSPACE_3_ID; +export const WS_4 = SEED_EMPTY_WORKSPACE_4_ID; const EXECUTED_BY_VERSION = '42.42.42'; @@ -208,31 +212,88 @@ export const resetSeedSequenceCounter = () => { seedSequenceCounter = 0; }; -export const seedMigration = async ( +export const seedInstanceMigration = async ( dataSource: DataSource, { name, status, - workspaceId = null, + workspaceIds = [], attempt = 1, }: { name: string; status: 'completed' | 'failed'; - workspaceId?: string | null; + workspaceIds?: string[]; attempt?: number; }, ) => { + // Seeds must have past timestamps so the runner's NOW()-based records + // always sort after them in createdAt order. const createdAt = new Date( - Date.now() + seedSequenceCounter * 1000, + Date.now() - (1000000 - seedSequenceCounter * 1000), ).toISOString(); seedSequenceCounter++; - await dataSource.query( - `INSERT INTO core."upgradeMigration" (name, status, attempt, "executedByVersion", "workspaceId", "createdAt") - VALUES ($1, $2, $3, $4, $5, $6)`, - [name, status, attempt, EXECUTED_BY_VERSION, workspaceId, createdAt], + const values: string[] = []; + const args: unknown[] = []; + let paramIndex = 1; + + values.push( + `($${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, NULL, $${paramIndex++}, false)`, ); + args.push(name, status, attempt, EXECUTED_BY_VERSION, createdAt); + + for (const workspaceId of workspaceIds) { + values.push( + `($${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, false)`, + ); + args.push(name, status, attempt, EXECUTED_BY_VERSION, workspaceId, createdAt); + } + + await dataSource.query( + `INSERT INTO core."upgradeMigration" (name, status, attempt, "executedByVersion", "workspaceId", "createdAt", "isInitial") + VALUES ${values.join(', ')}`, + args, + ); +}; + +export const seedWorkspaceMigration = async ( + dataSource: DataSource, + { + name, + status, + workspaceId, + attempt = 1, + isInitial = false, + useCurrentTimestamp = false, + }: { + name: string; + status: 'completed' | 'failed'; + workspaceId: string; + attempt?: number; + isInitial?: boolean; + useCurrentTimestamp?: boolean; + }, +) => { + if (useCurrentTimestamp) { + await dataSource.query( + `INSERT INTO core."upgradeMigration" (name, status, attempt, "executedByVersion", "workspaceId", "isInitial") + VALUES ($1, $2, $3, $4, $5, $6)`, + [name, status, attempt, EXECUTED_BY_VERSION, workspaceId, isInitial], + ); + } else { + const createdAt = new Date( + Date.now() - (1000000 - seedSequenceCounter * 1000), + ).toISOString(); + + seedSequenceCounter++; + + await dataSource.query( + `INSERT INTO core."upgradeMigration" (name, status, attempt, "executedByVersion", "workspaceId", "createdAt", "isInitial") + VALUES ($1, $2, $3, $4, $5, $6, $7)`, + [name, status, attempt, EXECUTED_BY_VERSION, workspaceId, createdAt, isInitial], + ); + } }; export const testCountMigrationsForCommand = async ( @@ -273,3 +334,34 @@ export const testGetLatestMigrationForCommand = async ( return rows.length > 0 ? rows[0] : null; }; + +export type ExecutedMigrationRecord = { + name: string; + status: string; + attempt: number; + workspaceId: string | null; + isInitial: boolean; +}; + +export const testGetExecutedMigrationsInOrder = async ( + dataSource: DataSource, +): Promise => { + return dataSource.query( + `SELECT name, status, attempt, "workspaceId", "isInitial" + FROM core."upgradeMigration" + ORDER BY "createdAt" ASC, "workspaceId" ASC NULLS FIRST, attempt ASC`, + ); +}; + +export const migrationRecordToKey = ({ + name, + workspaceId, + status, + attempt, + isInitial, +}: ExecutedMigrationRecord): string => { + const scope = workspaceId ?? 'instance'; + const initial = isInitial ? ':initial' : ''; + + return `${name}:${scope}:${status}:${attempt}${initial}`; +};