From 47fbb777ed2893e8230316d0386febed76e407a4 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 04:03:02 +0000 Subject: [PATCH] chore: refactor migrations realtime queue --- src/Appwrite/Platform/Workers/Migrations.php | 66 +++++++++----------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index cd567f6fa3..50d6002d28 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -3,8 +3,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; -use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -54,13 +53,14 @@ class Migrations extends Action ->inject('dbForProject') ->inject('dbForPlatform') ->inject('logError') - ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError)); + ->inject('queueForRealtime') + ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError, $queueForRealtime)); } /** * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void + public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime): void { $payload = $message->getPayload() ?? []; @@ -87,7 +87,7 @@ class Migrations extends Action return; } - $this->processMigration($migration); + $this->processMigration($migration, $queueForRealtime); } /** @@ -155,34 +155,24 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function updateMigrationDocument(Document $migration, Document $project): Document + protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document { - /** Trigger Realtime */ - $allEvents = Event::generateEvents('migrations.[migrationId].update', [ - 'migrationId' => $migration->getId(), - ]); + /** Trigger Realtime Events */ + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent('migrations.[migrationId].update') + ->setParam('migrationId', $migration->getId()) + ->setPayload($migration->getArrayCopy()) + ->trigger(); - $target = Realtime::fromPayload( - event: $allEvents[0], - payload: $migration, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); - - Realtime::send( - projectId: $project->getId(), - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); + $queueForRealtime + ->setProject($project) + ->setProjectId($project->getId()) + ->setEvent('migrations.[migrationId].update') + ->setParam('migrationId', $migration->getId()) + ->setPayload($migration->getArrayCopy()) + ->trigger(); return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration); } @@ -241,7 +231,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration): void + protected function processMigration(Document $migration, Realtime $queueForRealtime): void { $project = $this->project; $projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId()); @@ -265,7 +255,7 @@ class Migrations extends Action $migration->setAttribute('stage', 'processing'); $migration->setAttribute('status', 'processing'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $source = $this->processSource($migration); $destination = $this->processDestination($migration, $tempAPIKey); @@ -279,14 +269,14 @@ class Migrations extends Action /** Start Transfer */ $migration->setAttribute('stage', 'migrating'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $transfer->run( $migration->getAttribute('resources'), - function () use ($migration, $transfer, $projectDocument) { + function () use ($migration, $transfer, $projectDocument, $queueForRealtime) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); }, $migration->getAttribute('resourceId'), $migration->getAttribute('resourceType') @@ -323,7 +313,7 @@ class Migrations extends Action } $migration->setAttribute('errors', $errorMessages); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); return; } @@ -364,7 +354,7 @@ class Migrations extends Action $migration->setAttribute('errors', $errorMessages); } } finally { - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); if ($migration->getAttribute('status', '') === 'failed') { Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');