chore: refactor migrations realtime queue

This commit is contained in:
Chirag Aggarwal 2025-02-11 04:03:02 +00:00
parent 2c4c42de05
commit 47fbb777ed

View file

@ -3,8 +3,7 @@
namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Exception;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -54,13 +53,14 @@ class Migrations extends Action
->inject('dbForProject')
->inject('dbForPlatform')
->inject('logError')
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError));
->inject('queueForRealtime')
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError, $queueForRealtime));
}
/**
* @throws Exception
*/
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime): void
{
$payload = $message->getPayload() ?? [];
@ -87,7 +87,7 @@ class Migrations extends Action
return;
}
$this->processMigration($migration);
$this->processMigration($migration, $queueForRealtime);
}
/**
@ -155,34 +155,24 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function updateMigrationDocument(Document $migration, Document $project): Document
protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document
{
/** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
'migrationId' => $migration->getId(),
]);
/** Trigger Realtime Events */
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($migration->getArrayCopy())
->trigger();
$target = Realtime::fromPayload(
event: $allEvents[0],
payload: $migration,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
Realtime::send(
projectId: $project->getId(),
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
$queueForRealtime
->setProject($project)
->setProjectId($project->getId())
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($migration->getArrayCopy())
->trigger();
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
}
@ -241,7 +231,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration): void
protected function processMigration(Document $migration, Realtime $queueForRealtime): void
{
$project = $this->project;
$projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId());
@ -265,7 +255,7 @@ class Migrations extends Action
$migration->setAttribute('stage', 'processing');
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$source = $this->processSource($migration);
$destination = $this->processDestination($migration, $tempAPIKey);
@ -279,14 +269,14 @@ class Migrations extends Action
/** Start Transfer */
$migration->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument) {
function () use ($migration, $transfer, $projectDocument, $queueForRealtime) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
},
$migration->getAttribute('resourceId'),
$migration->getAttribute('resourceType')
@ -323,7 +313,7 @@ class Migrations extends Action
}
$migration->setAttribute('errors', $errorMessages);
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
return;
}
@ -364,7 +354,7 @@ class Migrations extends Action
$migration->setAttribute('errors', $errorMessages);
}
} finally {
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
if ($migration->getAttribute('status', '') === 'failed') {
Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');