2023-10-01 17:39:26 +00:00
< ? php
namespace Appwrite\Platform\Workers ;
2025-02-04 16:56:14 +00:00
use Ahc\Jwt\JWT ;
2023-10-01 17:39:26 +00:00
use Appwrite\Event\Event ;
use Appwrite\Messaging\Adapter\Realtime ;
2024-03-06 17:34:21 +00:00
use Exception ;
2023-10-01 17:39:26 +00:00
use Utopia\CLI\Console ;
2024-07-28 06:27:18 +00:00
use Utopia\Config\Config ;
2023-10-01 17:39:26 +00:00
use Utopia\Database\Database ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Document ;
use Utopia\Database\Exception\Authorization ;
use Utopia\Database\Exception\Conflict ;
use Utopia\Database\Exception\Restricted ;
use Utopia\Database\Exception\Structure ;
2024-05-28 16:59:54 +00:00
use Utopia\Migration\Destination ;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite ;
2024-03-06 17:34:21 +00:00
use Utopia\Migration\Exception as MigrationException ;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Source ;
2024-06-19 10:26:52 +00:00
use Utopia\Migration\Sources\Appwrite as SourceAppwrite ;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Sources\Firebase ;
use Utopia\Migration\Sources\NHost ;
use Utopia\Migration\Sources\Supabase ;
use Utopia\Migration\Transfer ;
2024-03-06 17:34:21 +00:00
use Utopia\Platform\Action ;
use Utopia\Queue\Message ;
2025-02-04 16:56:14 +00:00
use Utopia\System\System ;
2023-10-01 17:39:26 +00:00
class Migrations extends Action
{
2024-05-28 17:37:37 +00:00
protected Database $dbForProject ;
2024-07-28 06:29:02 +00:00
2025-02-04 16:56:14 +00:00
protected Database $dbForPlatform ;
2024-07-28 06:29:02 +00:00
2024-05-28 17:37:37 +00:00
protected Document $project ;
2023-10-01 17:39:26 +00:00
2025-02-04 16:56:14 +00:00
protected $logError ;
2023-10-01 17:39:26 +00:00
public static function getName () : string
{
return 'migrations' ;
}
/**
* @ throws Exception
*/
public function __construct ()
{
$this
-> desc ( 'Migrations worker' )
-> inject ( 'message' )
2025-02-04 16:56:14 +00:00
-> inject ( 'project' )
2023-10-17 03:44:36 +00:00
-> inject ( 'dbForProject' )
2025-02-04 16:56:14 +00:00
-> inject ( 'dbForPlatform' )
-> inject ( 'logError' )
-> callback ( fn ( Message $message , Document $project , Database $dbForProject , Database $dbForPlatform , callable $logError ) => $this -> action ( $message , $project , $dbForProject , $dbForPlatform , $logError ));
2023-10-01 17:39:26 +00:00
}
/**
* @ throws Exception
*/
2025-02-04 16:56:14 +00:00
public function action ( Message $message , Document $project , Database $dbForProject , Database $dbForPlatform , callable $logError ) : void
2023-10-01 17:39:26 +00:00
{
$payload = $message -> getPayload () ? ? [];
if ( empty ( $payload )) {
throw new Exception ( 'Missing payload' );
}
$events = $payload [ 'events' ] ? ? [];
$migration = new Document ( $payload [ 'migration' ] ? ? []);
if ( $project -> getId () === 'console' ) {
return ;
}
2023-10-17 03:44:36 +00:00
$this -> dbForProject = $dbForProject ;
2025-02-04 16:56:14 +00:00
$this -> dbForPlatform = $dbForPlatform ;
2024-05-28 17:37:37 +00:00
$this -> project = $project ;
2025-02-04 16:56:14 +00:00
$this -> logError = $logError ;
2023-10-01 17:39:26 +00:00
/**
* Handle Event execution .
*/
if ( ! empty ( $events )) {
return ;
}
2025-02-04 16:56:14 +00:00
$this -> processMigration ( $migration );
2023-10-01 17:39:26 +00:00
}
/**
* @ throws Exception
*/
2024-06-19 08:45:07 +00:00
protected function processSource ( Document $migration ) : Source
2023-10-01 17:39:26 +00:00
{
2024-06-19 08:45:07 +00:00
$source = $migration -> getAttribute ( 'source' );
$credentials = $migration -> getAttribute ( 'credentials' );
2023-10-01 17:39:26 +00:00
return match ( $source ) {
Firebase :: getName () => new Firebase (
json_decode ( $credentials [ 'serviceAccount' ], true ),
),
Supabase :: getName () => new Supabase (
$credentials [ 'endpoint' ],
$credentials [ 'apiKey' ],
$credentials [ 'databaseHost' ],
'postgres' ,
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
NHost :: getName () => new NHost (
$credentials [ 'subdomain' ],
$credentials [ 'region' ],
$credentials [ 'adminSecret' ],
$credentials [ 'database' ],
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
2024-05-28 17:12:58 +00:00
SourceAppwrite :: getName () => new SourceAppwrite (
2024-08-12 08:15:14 +00:00
$credentials [ 'projectId' ],
2025-02-04 16:56:14 +00:00
$credentials [ 'endpoint' ] === 'http://localhost/v1' ? 'http://appwrite/v1' : $credentials [ 'endpoint' ],
2024-08-12 08:15:14 +00:00
$credentials [ 'apiKey' ],
2024-05-28 17:05:36 +00:00
),
2023-10-01 17:39:26 +00:00
default => throw new \Exception ( 'Invalid source type' ),
};
}
2024-05-28 16:59:54 +00:00
/**
* @ throws Exception
*/
2025-02-04 16:56:14 +00:00
protected function processDestination ( Document $migration , string $apiKey ) : Destination
2024-05-28 16:59:54 +00:00
{
2024-06-19 08:45:07 +00:00
$destination = $migration -> getAttribute ( 'destination' );
2024-05-28 16:59:54 +00:00
return match ( $destination ) {
DestinationAppwrite :: getName () => new DestinationAppwrite (
2025-02-04 16:56:14 +00:00
$this -> project -> getId (),
'http://appwrite/v1' ,
$apiKey ,
2024-07-28 06:27:18 +00:00
$this -> dbForProject ,
2024-07-28 06:29:02 +00:00
Config :: getParam ( 'collections' , [])[ 'databases' ][ 'collections' ],
2024-05-28 16:59:54 +00:00
),
default => throw new \Exception ( 'Invalid destination type' ),
};
}
2023-10-01 17:39:26 +00:00
/**
* @ throws Authorization
* @ throws Structure
* @ throws Conflict
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
protected function updateMigrationDocument ( Document $migration , Document $project ) : Document
{
/** Trigger Realtime */
$allEvents = Event :: generateEvents ( 'migrations.[migrationId].update' , [
'migrationId' => $migration -> getId (),
]);
$target = Realtime :: fromPayload (
event : $allEvents [ 0 ],
payload : $migration ,
project : $project
);
Realtime :: send (
projectId : 'console' ,
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
Realtime :: send (
projectId : $project -> getId (),
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
return $this -> dbForProject -> updateDocument ( 'migrations' , $migration -> getId (), $migration );
}
/**
* @ throws \Utopia\Database\Exception
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
*/
protected function removeAPIKey ( Document $apiKey ) : void
{
2025-02-04 16:56:14 +00:00
$this -> dbForPlatform -> deleteDocument ( 'keys' , $apiKey -> getId ());
2023-10-01 17:39:26 +00:00
}
/**
* @ throws Authorization
* @ throws Structure
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
2025-02-04 16:56:14 +00:00
protected function generateAPIKey ( Document $project ) : string
2023-10-01 17:39:26 +00:00
{
2025-02-04 16:56:14 +00:00
$jwt = new JWT ( System :: getEnv ( '_APP_OPENSSL_KEY_V1' ), 'HS256' , 86400 , 0 );
$apiKey = $jwt -> encode ([
2023-10-01 17:39:26 +00:00
'projectId' => $project -> getId (),
'scopes' => [
'users.read' ,
'users.write' ,
'teams.read' ,
'teams.write' ,
'buckets.read' ,
'buckets.write' ,
'files.read' ,
'files.write' ,
'functions.read' ,
'functions.write' ,
2025-02-04 16:56:14 +00:00
'databases.read' ,
'databases.write' ,
'collections.read' ,
'collections.write' ,
'documents.read' ,
'documents.write'
]
2023-10-01 17:39:26 +00:00
]);
2025-02-04 16:56:14 +00:00
return API_KEY_DYNAMIC . '_' . $apiKey ;
2023-10-01 17:39:26 +00:00
}
/**
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
* @ throws \Utopia\Database\Exception
2024-06-19 09:03:32 +00:00
* @ throws Exception
2023-10-01 17:39:26 +00:00
*/
2025-02-04 16:56:14 +00:00
protected function processMigration ( Document $migration ) : void
2023-10-01 17:39:26 +00:00
{
2024-05-28 17:37:37 +00:00
$project = $this -> project ;
2025-02-04 16:56:14 +00:00
$projectDocument = $this -> dbForPlatform -> getDocument ( 'projects' , $project -> getId ());
2023-10-01 17:39:26 +00:00
$tempAPIKey = $this -> generateAPIKey ( $projectDocument );
2024-08-08 16:46:44 +00:00
2024-06-19 09:03:32 +00:00
$transfer = $source = $destination = null ;
2023-10-01 17:39:26 +00:00
try {
2024-08-12 08:15:14 +00:00
if (
2025-02-04 16:56:14 +00:00
$migration -> getAttribute ( 'source' ) === SourceAppwrite :: getName () &&
empty ( $migration -> getAttribute ( 'credentials' , []))
2024-08-12 08:15:14 +00:00
) {
$credentials = $migration -> getAttribute ( 'credentials' , []);
$credentials [ 'projectId' ] = $credentials [ 'projectId' ] ? ? $projectDocument -> getId ();
$credentials [ 'endpoint' ] = $credentials [ 'endpoint' ] ? ? 'http://appwrite/v1' ;
2025-02-04 16:56:14 +00:00
$credentials [ 'apiKey' ] = $credentials [ 'apiKey' ] ? ? $tempAPIKey ;
2024-08-12 08:15:14 +00:00
$migration -> setAttribute ( 'credentials' , $credentials );
}
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'stage' , 'processing' );
$migration -> setAttribute ( 'status' , 'processing' );
$this -> updateMigrationDocument ( $migration , $projectDocument );
2023-10-01 17:39:26 +00:00
2024-08-08 12:14:43 +00:00
$source = $this -> processSource ( $migration );
2025-02-04 16:56:14 +00:00
$destination = $this -> processDestination ( $migration , $tempAPIKey );
2023-10-01 17:39:26 +00:00
$source -> report ();
$transfer = new Transfer (
$source ,
$destination
);
/** Start Transfer */
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'stage' , 'migrating' );
$this -> updateMigrationDocument ( $migration , $projectDocument );
2024-05-28 17:52:32 +00:00
2024-07-15 14:22:36 +00:00
$transfer -> run (
$migration -> getAttribute ( 'resources' ),
function () use ( $migration , $transfer , $projectDocument ) {
$migration -> setAttribute ( 'resourceData' , json_encode ( $transfer -> getCache ()));
$migration -> setAttribute ( 'statusCounters' , json_encode ( $transfer -> getStatusCounters ()));
$this -> updateMigrationDocument ( $migration , $projectDocument );
},
2024-08-06 09:46:13 +00:00
$migration -> getAttribute ( 'resourceId' ),
$migration -> getAttribute ( 'resourceType' )
2024-07-15 14:22:36 +00:00
);
2023-10-01 17:39:26 +00:00
2024-05-28 17:52:32 +00:00
$destination -> shutDown ();
2024-06-26 07:54:57 +00:00
$source -> shutDown ();
2024-05-28 17:52:32 +00:00
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-07-28 06:29:02 +00:00
if ( ! empty ( $sourceErrors ) || ! empty ( $destinationErrors )) {
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'status' , 'failed' );
$migration -> setAttribute ( 'stage' , 'finished' );
2023-10-01 17:39:26 +00:00
$errorMessages = [];
2024-02-24 14:18:55 +00:00
foreach ( $sourceErrors as $error ) {
2024-09-05 16:33:05 +00:00
$message = " Error occurred while fetching ' { $error -> getResourceName () } : { $error -> getResourceId () } ' from source with message: ' { $error -> getMessage () } ' " ;
2024-09-30 14:32:50 +00:00
if ( $error -> getPrevious ()) {
2024-09-05 16:40:04 +00:00
$message .= " Message: " . $error -> getPrevious () -> getMessage () . " File: " . $error -> getPrevious () -> getFile () . " Line: " . $error -> getPrevious () -> getLine ();
2024-09-05 16:33:05 +00:00
}
$errorMessages [] = $message ;
2024-02-24 14:18:55 +00:00
}
foreach ( $destinationErrors as $error ) {
2024-09-05 16:33:05 +00:00
$message = " Error occurred while pushing ' { $error -> getResourceName () } : { $error -> getResourceId () } ' to destination with message: ' { $error -> getMessage () } ' " ;
2024-09-30 14:32:50 +00:00
if ( $error -> getPrevious ()) {
2024-09-05 16:40:04 +00:00
$message .= " Message: " . $error -> getPrevious () -> getMessage () . " File: " . $error -> getPrevious () -> getFile () . " Line: " . $error -> getPrevious () -> getLine ();
2024-09-05 16:33:05 +00:00
}
2024-09-05 16:16:15 +00:00
2024-02-24 14:18:55 +00:00
/** @var MigrationException $error */
2024-09-05 16:33:05 +00:00
$errorMessages [] = $message ;
2023-10-01 17:39:26 +00:00
}
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'errors' , $errorMessages );
$this -> updateMigrationDocument ( $migration , $projectDocument );
2024-05-01 17:15:16 +00:00
return ;
2023-10-01 17:39:26 +00:00
}
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'status' , 'completed' );
$migration -> setAttribute ( 'stage' , 'finished' );
2023-10-01 17:39:26 +00:00
} catch ( \Throwable $th ) {
Console :: error ( $th -> getMessage ());
2024-06-19 09:03:32 +00:00
Console :: error ( $th -> getTraceAsString ());
2023-10-01 17:39:26 +00:00
2024-07-28 06:29:02 +00:00
if ( ! $migration -> isEmpty ()) {
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'status' , 'failed' );
$migration -> setAttribute ( 'stage' , 'finished' );
2025-02-04 16:56:14 +00:00
call_user_func ( $this -> logError , $th , 'appwrite-worker' , 'appwrite-queue-' . self :: getName (), [
'migrationId' => $migration -> getId (),
'source' => $migration -> getAttribute ( 'source' ) ? ? '' ,
'destination' => $migration -> getAttribute ( 'destination' ) ? ? '' ,
]);
2023-10-01 17:39:26 +00:00
return ;
}
if ( $transfer ) {
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
$errorMessages = [];
foreach ( $sourceErrors as $error ) {
/** @var MigrationException $error */
2024-07-25 11:34:19 +00:00
$errorMessages [] = " Error occurred while fetching ' { $error -> getResourceName () } : { $error -> getResourceId () } ' from source with message ' { $error -> getMessage () } ' " ;
2023-10-01 17:39:26 +00:00
}
2024-02-24 14:18:55 +00:00
foreach ( $destinationErrors as $error ) {
/** @var MigrationException $error */
2024-07-25 11:34:19 +00:00
$errorMessages [] = " Error occurred while pushing ' { $error -> getResourceName () } : { $error -> getResourceId () } ' to destination with message ' { $error -> getMessage () } ' " ;
2024-02-24 14:18:55 +00:00
}
2024-06-19 09:03:32 +00:00
$migration -> setAttribute ( 'errors' , $errorMessages );
2023-10-01 17:39:26 +00:00
}
} finally {
2024-06-19 09:03:32 +00:00
$this -> updateMigrationDocument ( $migration , $projectDocument );
2024-09-24 19:02:38 +00:00
if ( $migration -> getAttribute ( 'status' , '' ) === 'failed' ) {
2024-09-30 08:01:01 +00:00
Console :: error ( 'Migration(' . $migration -> getInternalId () . ':' . $migration -> getId () . ') failed, Project(' . $this -> project -> getInternalId () . ':' . $this -> project -> getId () . ')' );
2025-02-04 16:56:14 +00:00
if ( $destination ) {
$destination -> error ();
foreach ( $destination -> getErrors () as $error ) {
/** @var MigrationException $error */
call_user_func ( $this -> logError , $error , 'appwrite-worker' , 'appwrite-queue-' . self :: getName (), [
'migrationId' => $migration -> getId (),
'source' => $migration -> getAttribute ( 'source' ) ? ? '' ,
'destination' => $migration -> getAttribute ( 'destination' ) ? ? '' ,
'resourceName' => $error -> getResourceName (),
'resourceGroup' => $error -> getResourceGroup ()
]);
}
}
2024-08-07 13:58:00 +00:00
2025-02-04 16:56:14 +00:00
if ( $source ) {
$source -> error ();
foreach ( $source -> getErrors () as $error ) {
/** @var MigrationException $error */
call_user_func ( $this -> logError , $error , 'appwrite-worker' , 'appwrite-queue-' . self :: getName (), [
'migrationId' => $migration -> getId (),
'source' => $migration -> getAttribute ( 'source' ) ? ? '' ,
'destination' => $migration -> getAttribute ( 'destination' ) ? ? '' ,
'resourceName' => $error -> getResourceName (),
'resourceGroup' => $error -> getResourceGroup ()
]);
}
}
2023-10-01 17:39:26 +00:00
}
2024-10-10 08:15:18 +00:00
if ( $migration -> getAttribute ( 'status' , '' ) === 'completed' ) {
2025-02-04 16:56:14 +00:00
$destination ? -> success ();
$source ? -> success ();
2024-10-10 08:15:18 +00:00
}
2023-10-01 17:39:26 +00:00
}
}
2024-05-01 16:46:19 +00:00
}