appwrite/src/Appwrite/Platform/Workers/Migrations.php

394 lines
14 KiB
PHP
Raw Normal View History

2023-10-01 17:39:26 +00:00
<?php
namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
2023-10-01 17:39:26 +00:00
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
2024-03-06 17:34:21 +00:00
use Exception;
2023-10-01 17:39:26 +00:00
use Utopia\CLI\Console;
2024-07-28 06:27:18 +00:00
use Utopia\Config\Config;
2023-10-01 17:39:26 +00:00
use Utopia\Database\Database;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
2024-05-28 16:59:54 +00:00
use Utopia\Migration\Destination;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite;
2024-03-06 17:34:21 +00:00
use Utopia\Migration\Exception as MigrationException;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Source;
2024-06-19 10:26:52 +00:00
use Utopia\Migration\Sources\Appwrite as SourceAppwrite;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Sources\Firebase;
use Utopia\Migration\Sources\NHost;
use Utopia\Migration\Sources\Supabase;
use Utopia\Migration\Transfer;
2024-03-06 17:34:21 +00:00
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
2023-10-01 17:39:26 +00:00
class Migrations extends Action
{
2024-05-28 17:37:37 +00:00
protected Database $dbForProject;
2024-07-28 06:29:02 +00:00
protected Database $dbForPlatform;
2024-07-28 06:29:02 +00:00
2024-05-28 17:37:37 +00:00
protected Document $project;
2023-10-01 17:39:26 +00:00
2024-11-13 01:29:43 +00:00
protected $logError;
2023-10-01 17:39:26 +00:00
public static function getName(): string
{
return 'migrations';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
->desc('Migrations worker')
->inject('message')
2025-01-16 06:05:22 +00:00
->inject('project')
2023-10-17 03:44:36 +00:00
->inject('dbForProject')
->inject('dbForPlatform')
2024-11-13 01:29:43 +00:00
->inject('logError')
2025-01-16 06:05:22 +00:00
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError));
2023-10-01 17:39:26 +00:00
}
/**
* @throws Exception
*/
2025-01-16 06:05:22 +00:00
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void
2023-10-01 17:39:26 +00:00
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$events = $payload['events'] ?? [];
$migration = new Document($payload['migration'] ?? []);
if ($project->getId() === 'console') {
return;
}
2023-10-17 03:44:36 +00:00
$this->dbForProject = $dbForProject;
$this->dbForPlatform = $dbForPlatform;
2024-05-28 17:37:37 +00:00
$this->project = $project;
2024-11-13 01:29:43 +00:00
$this->logError = $logError;
2023-10-01 17:39:26 +00:00
/**
* Handle Event execution.
*/
if (! empty($events)) {
return;
}
2024-11-13 00:13:00 +00:00
$this->processMigration($migration);
2023-10-01 17:39:26 +00:00
}
/**
* @throws Exception
*/
protected function processSource(Document $migration): Source
2023-10-01 17:39:26 +00:00
{
$source = $migration->getAttribute('source');
$credentials = $migration->getAttribute('credentials');
2023-10-01 17:39:26 +00:00
return match ($source) {
Firebase::getName() => new Firebase(
json_decode($credentials['serviceAccount'], true),
),
Supabase::getName() => new Supabase(
$credentials['endpoint'],
$credentials['apiKey'],
$credentials['databaseHost'],
'postgres',
$credentials['username'],
$credentials['password'],
$credentials['port'],
),
NHost::getName() => new NHost(
$credentials['subdomain'],
$credentials['region'],
$credentials['adminSecret'],
$credentials['database'],
$credentials['username'],
$credentials['password'],
$credentials['port'],
),
2024-05-28 17:12:58 +00:00
SourceAppwrite::getName() => new SourceAppwrite(
2024-08-12 08:15:14 +00:00
$credentials['projectId'],
$credentials['endpoint'] === 'http://localhost/v1' ? 'http://appwrite/v1' : $credentials['endpoint'],
2024-08-12 08:15:14 +00:00
$credentials['apiKey'],
2024-05-28 17:05:36 +00:00
),
2023-10-01 17:39:26 +00:00
default => throw new \Exception('Invalid source type'),
};
}
2024-05-28 16:59:54 +00:00
/**
* @throws Exception
*/
protected function processDestination(Document $migration, string $apiKey): Destination
2024-05-28 16:59:54 +00:00
{
$destination = $migration->getAttribute('destination');
2024-05-28 16:59:54 +00:00
return match ($destination) {
DestinationAppwrite::getName() => new DestinationAppwrite(
$this->project->getId(),
'http://appwrite/v1',
$apiKey,
2024-07-28 06:27:18 +00:00
$this->dbForProject,
2024-07-28 06:29:02 +00:00
Config::getParam('collections', [])['databases']['collections'],
2024-05-28 16:59:54 +00:00
),
default => throw new \Exception('Invalid destination type'),
};
}
2023-10-01 17:39:26 +00:00
/**
* @throws Authorization
* @throws Structure
* @throws Conflict
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function updateMigrationDocument(Document $migration, Document $project): Document
{
/** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
'migrationId' => $migration->getId(),
]);
$target = Realtime::fromPayload(
event: $allEvents[0],
payload: $migration,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
Realtime::send(
projectId: $project->getId(),
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
}
/**
* @throws Exception
*/
protected function generateAPIKey(Document $project): string
2023-10-01 17:39:26 +00:00
{
$jwt = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', 86400, 0);
2025-02-11 09:02:20 +00:00
$apiKey = $jwt->encode([
2023-10-01 17:39:26 +00:00
'projectId' => $project->getId(),
'usage' => false,
2023-10-01 17:39:26 +00:00
'scopes' => [
'users.read',
'users.write',
'teams.read',
'teams.write',
'buckets.read',
'buckets.write',
'files.read',
'files.write',
'functions.read',
'functions.write',
2025-01-30 03:38:19 +00:00
'databases.read',
'collections.read',
'documents.read',
],
2023-10-01 17:39:26 +00:00
]);
return API_KEY_DYNAMIC . '_' . $apiKey;
2023-10-01 17:39:26 +00:00
}
/**
* @throws Authorization
* @throws Conflict
* @throws Restricted
* @throws Structure
* @throws \Utopia\Database\Exception
2024-06-19 09:03:32 +00:00
* @throws Exception
2023-10-01 17:39:26 +00:00
*/
2024-11-13 00:13:00 +00:00
protected function processMigration(Document $migration): void
2023-10-01 17:39:26 +00:00
{
2024-05-28 17:37:37 +00:00
$project = $this->project;
$projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId());
2023-10-01 17:39:26 +00:00
$tempAPIKey = $this->generateAPIKey($projectDocument);
2024-08-08 16:46:44 +00:00
2024-06-19 09:03:32 +00:00
$transfer = $source = $destination = null;
2023-10-01 17:39:26 +00:00
try {
2024-08-12 08:15:14 +00:00
if (
$migration->getAttribute('source') === SourceAppwrite::getName() &&
empty($migration->getAttribute('credentials', []))
2024-08-12 08:15:14 +00:00
) {
$credentials = $migration->getAttribute('credentials', []);
$credentials['projectId'] = $credentials['projectId'] ?? $projectDocument->getId();
$credentials['endpoint'] = $credentials['endpoint'] ?? 'http://appwrite/v1';
$credentials['apiKey'] = $credentials['apiKey'] ?? $tempAPIKey;
2024-08-12 08:15:14 +00:00
$migration->setAttribute('credentials', $credentials);
}
2024-06-19 09:03:32 +00:00
$migration->setAttribute('stage', 'processing');
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument);
2023-10-01 17:39:26 +00:00
2024-08-08 12:14:43 +00:00
$source = $this->processSource($migration);
$destination = $this->processDestination($migration, $tempAPIKey);
2023-10-01 17:39:26 +00:00
$source->report();
$transfer = new Transfer(
$source,
$destination
);
/** Start Transfer */
2024-06-19 09:03:32 +00:00
$migration->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migration, $projectDocument);
2024-05-28 17:52:32 +00:00
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migration, $projectDocument);
},
2024-08-06 09:46:13 +00:00
$migration->getAttribute('resourceId'),
$migration->getAttribute('resourceType')
);
2023-10-01 17:39:26 +00:00
2024-05-28 17:52:32 +00:00
$destination->shutDown();
2024-06-26 07:54:57 +00:00
$source->shutDown();
2024-05-28 17:52:32 +00:00
2024-02-24 14:18:55 +00:00
$sourceErrors = $source->getErrors();
$destinationErrors = $destination->getErrors();
2023-10-01 17:39:26 +00:00
2024-07-28 06:29:02 +00:00
if (! empty($sourceErrors) || ! empty($destinationErrors)) {
2024-06-19 09:03:32 +00:00
$migration->setAttribute('status', 'failed');
$migration->setAttribute('stage', 'finished');
2023-10-01 17:39:26 +00:00
$errorMessages = [];
2024-02-24 14:18:55 +00:00
foreach ($sourceErrors as $error) {
2024-09-05 16:33:05 +00:00
$message = "Error occurred while fetching '{$error->getResourceName()}:{$error->getResourceId()}' from source with message: '{$error->getMessage()}'";
2024-09-30 14:32:50 +00:00
if ($error->getPrevious()) {
2024-09-05 16:40:04 +00:00
$message .= " Message: ".$error->getPrevious()->getMessage() . " File: ".$error->getPrevious()->getFile() . " Line: ".$error->getPrevious()->getLine();
2024-09-05 16:33:05 +00:00
}
$errorMessages[] = $message;
2024-02-24 14:18:55 +00:00
}
foreach ($destinationErrors as $error) {
2024-09-05 16:33:05 +00:00
$message = "Error occurred while pushing '{$error->getResourceName()}:{$error->getResourceId()}' to destination with message: '{$error->getMessage()}'";
2024-09-30 14:32:50 +00:00
if ($error->getPrevious()) {
2024-09-05 16:40:04 +00:00
$message .= " Message: ".$error->getPrevious()->getMessage() . " File: ".$error->getPrevious()->getFile() . " Line: ".$error->getPrevious()->getLine();
2024-09-05 16:33:05 +00:00
}
2024-09-05 16:16:15 +00:00
2024-02-24 14:18:55 +00:00
/** @var MigrationException $error */
2024-09-05 16:33:05 +00:00
$errorMessages[] = $message;
2023-10-01 17:39:26 +00:00
}
2024-06-19 09:03:32 +00:00
$migration->setAttribute('errors', $errorMessages);
$this->updateMigrationDocument($migration, $projectDocument);
return;
2023-10-01 17:39:26 +00:00
}
2024-06-19 09:03:32 +00:00
$migration->setAttribute('status', 'completed');
$migration->setAttribute('stage', 'finished');
2023-10-01 17:39:26 +00:00
} catch (\Throwable $th) {
Console::error($th->getMessage());
2024-06-19 09:03:32 +00:00
Console::error($th->getTraceAsString());
2023-10-01 17:39:26 +00:00
2024-07-28 06:29:02 +00:00
if (! $migration->isEmpty()) {
2024-06-19 09:03:32 +00:00
$migration->setAttribute('status', 'failed');
$migration->setAttribute('stage', 'finished');
2023-10-01 17:39:26 +00:00
2024-12-11 10:57:25 +00:00
call_user_func($this->logError, $th, 'appwrite-worker', 'appwrite-queue-'.self::getName(), [
'migrationId' => $migration->getId(),
'source' => $migration->getAttribute('source') ?? '',
'destination' => $migration->getAttribute('destination') ?? '',
]);
2023-10-01 17:39:26 +00:00
return;
}
if ($transfer) {
2024-02-24 14:18:55 +00:00
$sourceErrors = $source->getErrors();
$destinationErrors = $destination->getErrors();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
$errorMessages = [];
foreach ($sourceErrors as $error) {
/** @var MigrationException $error */
2024-07-25 11:34:19 +00:00
$errorMessages[] = "Error occurred while fetching '{$error->getResourceName()}:{$error->getResourceId()}' from source with message '{$error->getMessage()}'";
2023-10-01 17:39:26 +00:00
}
2024-02-24 14:18:55 +00:00
foreach ($destinationErrors as $error) {
/** @var MigrationException $error */
2024-07-25 11:34:19 +00:00
$errorMessages[] = "Error occurred while pushing '{$error->getResourceName()}:{$error->getResourceId()}' to destination with message '{$error->getMessage()}'";
2024-02-24 14:18:55 +00:00
}
2024-06-19 09:03:32 +00:00
$migration->setAttribute('errors', $errorMessages);
2023-10-01 17:39:26 +00:00
}
} finally {
2024-06-19 09:03:32 +00:00
$this->updateMigrationDocument($migration, $projectDocument);
2024-09-24 19:02:38 +00:00
if ($migration->getAttribute('status', '') === 'failed') {
2024-09-30 08:01:01 +00:00
Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');
2024-12-11 11:31:17 +00:00
if ($destination) {
$destination->error();
foreach ($destination->getErrors() as $error) {
/** @var MigrationException $error */
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId(),
'source' => $migration->getAttribute('source') ?? '',
'destination' => $migration->getAttribute('destination') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
]);
}
}
2024-12-11 11:31:17 +00:00
if ($source) {
$source->error();
foreach ($source->getErrors() as $error) {
/** @var MigrationException $error */
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId(),
'source' => $migration->getAttribute('source') ?? '',
'destination' => $migration->getAttribute('destination') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
]);
}
}
2023-10-01 17:39:26 +00:00
}
2024-10-10 08:15:18 +00:00
if ($migration->getAttribute('status', '') === 'completed') {
2024-12-11 11:31:17 +00:00
$destination?->success();
$source?->success();
2024-10-10 08:15:18 +00:00
}
2023-10-01 17:39:26 +00:00
}
}
2024-05-01 16:46:19 +00:00
}