2023-10-01 17:39:26 +00:00
< ? php
namespace Appwrite\Platform\Workers ;
use Appwrite\Event\Event ;
use Appwrite\Messaging\Adapter\Realtime ;
use Appwrite\Permission ;
use Appwrite\Role ;
2024-03-06 17:34:21 +00:00
use Exception ;
2023-10-01 17:39:26 +00:00
use Utopia\CLI\Console ;
use Utopia\Database\Database ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Document ;
use Utopia\Database\Exception\Authorization ;
use Utopia\Database\Exception\Conflict ;
use Utopia\Database\Exception\Restricted ;
use Utopia\Database\Exception\Structure ;
2023-10-01 17:39:26 +00:00
use Utopia\Database\Helpers\ID ;
2023-11-22 13:50:57 +00:00
use Utopia\Logger\Log ;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Destinations\Appwrite as DestinationsAppwrite ;
2024-03-06 17:34:21 +00:00
use Utopia\Migration\Exception as MigrationException ;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Source ;
use Utopia\Migration\Sources\Appwrite ;
use Utopia\Migration\Sources\Firebase ;
use Utopia\Migration\Sources\NHost ;
use Utopia\Migration\Sources\Supabase ;
use Utopia\Migration\Transfer ;
2024-03-06 17:34:21 +00:00
use Utopia\Platform\Action ;
use Utopia\Queue\Message ;
2023-10-01 17:39:26 +00:00
class Migrations extends Action
{
private ? Database $dbForProject = null ;
private ? Database $dbForConsole = null ;
public static function getName () : string
{
return 'migrations' ;
}
/**
* @ throws Exception
*/
public function __construct ()
{
$this
-> desc ( 'Migrations worker' )
-> inject ( 'message' )
2023-10-17 03:44:36 +00:00
-> inject ( 'dbForProject' )
2023-10-01 17:39:26 +00:00
-> inject ( 'dbForConsole' )
2023-11-22 13:50:57 +00:00
-> inject ( 'log' )
2024-03-06 17:34:21 +00:00
-> callback ( fn ( Message $message , Database $dbForProject , Database $dbForConsole , Log $log ) => $this -> action ( $message , $dbForProject , $dbForConsole , $log ));
2023-10-01 17:39:26 +00:00
}
/**
* @ param Message $message
2023-10-17 03:44:36 +00:00
* @ param Database $dbForProject
2023-10-01 17:39:26 +00:00
* @ param Database $dbForConsole
2023-11-22 13:50:57 +00:00
* @ param Log $log
2023-10-01 17:39:26 +00:00
* @ return void
* @ throws Exception
*/
2023-11-22 13:50:57 +00:00
public function action ( Message $message , Database $dbForProject , Database $dbForConsole , Log $log ) : void
2023-10-01 17:39:26 +00:00
{
$payload = $message -> getPayload () ? ? [];
if ( empty ( $payload )) {
throw new Exception ( 'Missing payload' );
}
$events = $payload [ 'events' ] ? ? [];
$project = new Document ( $payload [ 'project' ] ? ? []);
$migration = new Document ( $payload [ 'migration' ] ? ? []);
if ( $project -> getId () === 'console' ) {
return ;
}
2023-10-17 03:44:36 +00:00
$this -> dbForProject = $dbForProject ;
2023-10-01 17:39:26 +00:00
$this -> dbForConsole = $dbForConsole ;
/**
* Handle Event execution .
*/
if ( ! empty ( $events )) {
return ;
}
2023-11-22 13:50:57 +00:00
$log -> addTag ( 'projectId' , $project -> getId ());
$this -> processMigration ( $project , $migration , $log );
2023-10-01 17:39:26 +00:00
}
/**
* @ param string $source
* @ param array $credentials
* @ return Source
* @ throws Exception
*/
protected function processSource ( string $source , array $credentials ) : Source
{
return match ( $source ) {
Firebase :: getName () => new Firebase (
json_decode ( $credentials [ 'serviceAccount' ], true ),
),
Supabase :: getName () => new Supabase (
$credentials [ 'endpoint' ],
$credentials [ 'apiKey' ],
$credentials [ 'databaseHost' ],
'postgres' ,
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
NHost :: getName () => new NHost (
$credentials [ 'subdomain' ],
$credentials [ 'region' ],
$credentials [ 'adminSecret' ],
$credentials [ 'database' ],
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
Appwrite :: getName () => new Appwrite ( $credentials [ 'projectId' ], str_starts_with ( $credentials [ 'endpoint' ], 'http://localhost/v1' ) ? 'http://appwrite/v1' : $credentials [ 'endpoint' ], $credentials [ 'apiKey' ]),
default => throw new \Exception ( 'Invalid source type' ),
};
}
/**
* @ throws Authorization
* @ throws Structure
* @ throws Conflict
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
protected function updateMigrationDocument ( Document $migration , Document $project ) : Document
{
/** Trigger Realtime */
$allEvents = Event :: generateEvents ( 'migrations.[migrationId].update' , [
'migrationId' => $migration -> getId (),
]);
$target = Realtime :: fromPayload (
event : $allEvents [ 0 ],
payload : $migration ,
project : $project
);
Realtime :: send (
projectId : 'console' ,
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
Realtime :: send (
projectId : $project -> getId (),
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
return $this -> dbForProject -> updateDocument ( 'migrations' , $migration -> getId (), $migration );
}
/**
* @ param Document $apiKey
* @ return void
* @ throws \Utopia\Database\Exception
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
*/
protected function removeAPIKey ( Document $apiKey ) : void
{
$this -> dbForConsole -> deleteDocument ( 'keys' , $apiKey -> getId ());
}
/**
* @ param Document $project
* @ return Document
* @ throws Authorization
* @ throws Structure
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
protected function generateAPIKey ( Document $project ) : Document
{
$generatedSecret = bin2hex ( \random_bytes ( 128 ));
$key = new Document ([
'$id' => ID :: unique (),
'$permissions' => [
Permission :: read ( Role :: any ()),
Permission :: update ( Role :: any ()),
Permission :: delete ( Role :: any ()),
],
'projectInternalId' => $project -> getInternalId (),
'projectId' => $project -> getId (),
'name' => 'Transfer API Key' ,
'scopes' => [
'users.read' ,
'users.write' ,
'teams.read' ,
'teams.write' ,
'databases.read' ,
'databases.write' ,
'collections.read' ,
'collections.write' ,
'documents.read' ,
'documents.write' ,
'buckets.read' ,
'buckets.write' ,
'files.read' ,
'files.write' ,
'functions.read' ,
'functions.write' ,
],
'expire' => null ,
'sdks' => [],
'accessedAt' => null ,
'secret' => $generatedSecret ,
]);
$this -> dbForConsole -> createDocument ( 'keys' , $key );
2023-12-14 13:32:06 +00:00
$this -> dbForConsole -> purgeCachedDocument ( 'projects' , $project -> getId ());
2023-10-01 17:39:26 +00:00
return $key ;
}
/**
* @ param Document $project
* @ param Document $migration
2023-11-22 13:50:57 +00:00
* @ param Log $log
2023-10-01 17:39:26 +00:00
* @ return void
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
* @ throws \Utopia\Database\Exception
*/
2023-11-22 13:50:57 +00:00
protected function processMigration ( Document $project , Document $migration , Log $log ) : void
2023-10-01 17:39:26 +00:00
{
/**
* @ var Document $migrationDocument
* @ var Transfer $transfer
*/
$migrationDocument = null ;
$transfer = null ;
$projectDocument = $this -> dbForConsole -> getDocument ( 'projects' , $project -> getId ());
$tempAPIKey = $this -> generateAPIKey ( $projectDocument );
try {
$migrationDocument = $this -> dbForProject -> getDocument ( 'migrations' , $migration -> getId ());
$migrationDocument -> setAttribute ( 'stage' , 'processing' );
$migrationDocument -> setAttribute ( 'status' , 'processing' );
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
2023-11-22 13:50:57 +00:00
$log -> addTag ( 'type' , $migrationDocument -> getAttribute ( 'source' ));
2023-10-01 17:39:26 +00:00
$source = $this -> processSource ( $migrationDocument -> getAttribute ( 'source' ), $migrationDocument -> getAttribute ( 'credentials' ));
$source -> report ();
$destination = new DestinationsAppwrite (
$projectDocument -> getId (),
'http://appwrite/v1' ,
$tempAPIKey [ 'secret' ],
);
$transfer = new Transfer (
$source ,
$destination
);
/** Start Transfer */
$migrationDocument -> setAttribute ( 'stage' , 'migrating' );
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
$transfer -> run ( $migrationDocument -> getAttribute ( 'resources' ), function () use ( $migrationDocument , $transfer , $projectDocument ) {
$migrationDocument -> setAttribute ( 'resourceData' , json_encode ( $transfer -> getCache ()));
$migrationDocument -> setAttribute ( 'statusCounters' , json_encode ( $transfer -> getStatusCounters ()));
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
});
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
if ( ! empty ( $sourceErrors ) || ! empty ( $destinationErrors )) {
2023-10-01 17:39:26 +00:00
$migrationDocument -> setAttribute ( 'status' , 'failed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
$errorMessages = [];
2024-02-24 14:18:55 +00:00
foreach ( $sourceErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:19:50 +00:00
$errorMessages [] = " Error occurred while fetching ' { $error -> getResourceType () } : { $error -> getResourceId () } ' from source with message: ' { $error -> getMessage () } ' " ;
2024-02-24 14:18:55 +00:00
}
foreach ( $destinationErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:22:30 +00:00
$errorMessages [] = " Error occurred while pushing ' { $error -> getResourceType () } : { $error -> getResourceId () } ' to destination with message: ' { $error -> getMessage () } ' " ;
2023-10-01 17:39:26 +00:00
}
$migrationDocument -> setAttribute ( 'errors' , $errorMessages );
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
return ;
}
$migrationDocument -> setAttribute ( 'status' , 'completed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
} catch ( \Throwable $th ) {
Console :: error ( $th -> getMessage ());
if ( $migrationDocument ) {
Console :: error ( $th -> getMessage ());
Console :: error ( $th -> getTraceAsString ());
$migrationDocument -> setAttribute ( 'status' , 'failed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
$migrationDocument -> setAttribute ( 'errors' , [ $th -> getMessage ()]);
return ;
}
if ( $transfer ) {
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
$errorMessages = [];
foreach ( $sourceErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:21:21 +00:00
$errorMessages [] = " Error occurred while fetching ' { $error -> getResourceType () } : { $error -> getResourceId () } ' from source with message ' { $error -> getMessage () } ' " ;
2023-10-01 17:39:26 +00:00
}
2024-02-24 14:18:55 +00:00
foreach ( $destinationErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:21:21 +00:00
$errorMessages [] = " Error occurred while pushing ' { $error -> getResourceType () } : { $error -> getResourceId () } ' to destination with message ' { $error -> getMessage () } ' " ;
2024-02-24 14:18:55 +00:00
}
$migrationDocument -> setAttribute ( 'errors' , $errorMessages );
2023-10-01 17:39:26 +00:00
}
} finally {
2024-02-24 14:43:33 +00:00
if ( $tempAPIKey ) {
$this -> removeAPIKey ( $tempAPIKey );
}
2023-10-01 17:39:26 +00:00
if ( $migrationDocument ) {
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
2024-02-24 14:18:55 +00:00
if ( $migrationDocument -> getAttribute ( 'status' , '' ) == 'failed' ) {
2024-02-25 14:09:22 +00:00
throw new Exception ( implode ( " \n " , $migrationDocument -> getAttribute ( 'errors' , [])));
2024-02-24 14:18:55 +00:00
}
2023-10-01 17:39:26 +00:00
}
}
}
}