2023-10-01 17:39:26 +00:00
< ? php
namespace Appwrite\Platform\Workers ;
use Appwrite\Event\Event ;
use Appwrite\Messaging\Adapter\Realtime ;
use Appwrite\Permission ;
use Appwrite\Role ;
2024-03-06 17:34:21 +00:00
use Exception ;
2023-10-01 17:39:26 +00:00
use Utopia\CLI\Console ;
use Utopia\Database\Database ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Document ;
use Utopia\Database\Exception\Authorization ;
use Utopia\Database\Exception\Conflict ;
use Utopia\Database\Exception\Restricted ;
use Utopia\Database\Exception\Structure ;
2023-10-01 17:39:26 +00:00
use Utopia\Database\Helpers\ID ;
2023-11-22 13:50:57 +00:00
use Utopia\Logger\Log ;
2024-05-01 17:15:16 +00:00
use Utopia\Logger\Log\Breadcrumb ;
2024-05-28 16:59:54 +00:00
use Utopia\Migration\Destination ;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite ;
2024-05-28 17:12:58 +00:00
use Utopia\Migration\Sources\Appwrite as SourceAppwrite ;
2024-03-06 17:34:21 +00:00
use Utopia\Migration\Exception as MigrationException ;
2023-10-01 17:39:26 +00:00
use Utopia\Migration\Source ;
use Utopia\Migration\Sources\Firebase ;
use Utopia\Migration\Sources\NHost ;
use Utopia\Migration\Sources\Supabase ;
use Utopia\Migration\Transfer ;
2024-03-06 17:34:21 +00:00
use Utopia\Platform\Action ;
use Utopia\Queue\Message ;
2023-10-01 17:39:26 +00:00
class Migrations extends Action
{
2024-05-28 17:37:37 +00:00
protected Database $dbForProject ;
protected Database $dbForConsole ;
protected Document $project ;
2023-10-01 17:39:26 +00:00
public static function getName () : string
{
return 'migrations' ;
}
/**
* @ throws Exception
*/
public function __construct ()
{
$this
-> desc ( 'Migrations worker' )
-> inject ( 'message' )
2023-10-17 03:44:36 +00:00
-> inject ( 'dbForProject' )
2023-10-01 17:39:26 +00:00
-> inject ( 'dbForConsole' )
2023-11-22 13:50:57 +00:00
-> inject ( 'log' )
2024-03-06 17:34:21 +00:00
-> callback ( fn ( Message $message , Database $dbForProject , Database $dbForConsole , Log $log ) => $this -> action ( $message , $dbForProject , $dbForConsole , $log ));
2023-10-01 17:39:26 +00:00
}
/**
* @ param Message $message
2023-10-17 03:44:36 +00:00
* @ param Database $dbForProject
2023-10-01 17:39:26 +00:00
* @ param Database $dbForConsole
2023-11-22 13:50:57 +00:00
* @ param Log $log
2023-10-01 17:39:26 +00:00
* @ return void
* @ throws Exception
*/
2023-11-22 13:50:57 +00:00
public function action ( Message $message , Database $dbForProject , Database $dbForConsole , Log $log ) : void
2023-10-01 17:39:26 +00:00
{
$payload = $message -> getPayload () ? ? [];
if ( empty ( $payload )) {
throw new Exception ( 'Missing payload' );
}
$events = $payload [ 'events' ] ? ? [];
$project = new Document ( $payload [ 'project' ] ? ? []);
$migration = new Document ( $payload [ 'migration' ] ? ? []);
if ( $project -> getId () === 'console' ) {
return ;
}
2023-10-17 03:44:36 +00:00
$this -> dbForProject = $dbForProject ;
2023-10-01 17:39:26 +00:00
$this -> dbForConsole = $dbForConsole ;
2024-05-28 17:37:37 +00:00
$this -> project = $project ;
2023-10-01 17:39:26 +00:00
/**
* Handle Event execution .
*/
if ( ! empty ( $events )) {
return ;
}
2024-05-01 16:31:31 +00:00
$log -> addTag ( 'migrationId' , $migration -> getId ());
2023-11-22 13:50:57 +00:00
$log -> addTag ( 'projectId' , $project -> getId ());
2024-05-28 17:37:37 +00:00
$this -> processMigration ( $migration , $log );
2023-10-01 17:39:26 +00:00
}
/**
2024-05-28 17:05:36 +00:00
* @ param string $source
* @ param array $credentials
2023-10-01 17:39:26 +00:00
* @ return Source
* @ throws Exception
*/
2024-05-28 17:05:36 +00:00
protected function processSource ( string $source , array $credentials ) : Source
2023-10-01 17:39:26 +00:00
{
return match ( $source ) {
Firebase :: getName () => new Firebase (
json_decode ( $credentials [ 'serviceAccount' ], true ),
),
Supabase :: getName () => new Supabase (
$credentials [ 'endpoint' ],
$credentials [ 'apiKey' ],
$credentials [ 'databaseHost' ],
'postgres' ,
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
NHost :: getName () => new NHost (
$credentials [ 'subdomain' ],
$credentials [ 'region' ],
$credentials [ 'adminSecret' ],
$credentials [ 'database' ],
$credentials [ 'username' ],
$credentials [ 'password' ],
$credentials [ 'port' ],
),
2024-05-28 17:12:58 +00:00
SourceAppwrite :: getName () => new SourceAppwrite (
2024-05-28 17:05:36 +00:00
$credentials [ 'projectId' ],
str_starts_with ( $credentials [ 'endpoint' ], 'http://localhost/v1' ) ? 'http://appwrite/v1' : $credentials [ 'endpoint' ],
$credentials [ 'apiKey' ]
),
2023-10-01 17:39:26 +00:00
default => throw new \Exception ( 'Invalid source type' ),
};
}
2024-05-28 16:59:54 +00:00
/**
* @ param string $destination
* @ param array $credentials
* @ return Destination
* @ throws Exception
*/
protected function processDestination ( string $destination , array $credentials ) : Destination
{
return match ( $destination ) {
DestinationAppwrite :: getName () => new DestinationAppwrite (
$credentials [ 'projectId' ],
str_starts_with ( $credentials [ 'endpoint' ], 'http://localhost/v1' ) ? 'http://appwrite/v1' : $credentials [ 'endpoint' ],
$credentials [ 'apiKey' ]
),
default => throw new \Exception ( 'Invalid destination type' ),
};
}
2023-10-01 17:39:26 +00:00
/**
* @ throws Authorization
* @ throws Structure
* @ throws Conflict
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
protected function updateMigrationDocument ( Document $migration , Document $project ) : Document
{
/** Trigger Realtime */
$allEvents = Event :: generateEvents ( 'migrations.[migrationId].update' , [
'migrationId' => $migration -> getId (),
]);
$target = Realtime :: fromPayload (
event : $allEvents [ 0 ],
payload : $migration ,
project : $project
);
Realtime :: send (
projectId : 'console' ,
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
Realtime :: send (
projectId : $project -> getId (),
payload : $migration -> getArrayCopy (),
events : $allEvents ,
channels : $target [ 'channels' ],
roles : $target [ 'roles' ],
);
return $this -> dbForProject -> updateDocument ( 'migrations' , $migration -> getId (), $migration );
}
/**
* @ param Document $apiKey
* @ return void
* @ throws \Utopia\Database\Exception
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
*/
protected function removeAPIKey ( Document $apiKey ) : void
{
$this -> dbForConsole -> deleteDocument ( 'keys' , $apiKey -> getId ());
}
/**
* @ param Document $project
* @ return Document
* @ throws Authorization
* @ throws Structure
* @ throws \Utopia\Database\Exception
* @ throws Exception
*/
protected function generateAPIKey ( Document $project ) : Document
{
$generatedSecret = bin2hex ( \random_bytes ( 128 ));
$key = new Document ([
'$id' => ID :: unique (),
'$permissions' => [
Permission :: read ( Role :: any ()),
Permission :: update ( Role :: any ()),
Permission :: delete ( Role :: any ()),
],
'projectInternalId' => $project -> getInternalId (),
'projectId' => $project -> getId (),
'name' => 'Transfer API Key' ,
'scopes' => [
'users.read' ,
'users.write' ,
'teams.read' ,
'teams.write' ,
'databases.read' ,
'databases.write' ,
'collections.read' ,
'collections.write' ,
'documents.read' ,
'documents.write' ,
'buckets.read' ,
'buckets.write' ,
'files.read' ,
'files.write' ,
'functions.read' ,
'functions.write' ,
],
'expire' => null ,
'sdks' => [],
'accessedAt' => null ,
'secret' => $generatedSecret ,
]);
$this -> dbForConsole -> createDocument ( 'keys' , $key );
2023-12-14 13:32:06 +00:00
$this -> dbForConsole -> purgeCachedDocument ( 'projects' , $project -> getId ());
2023-10-01 17:39:26 +00:00
return $key ;
}
/**
* @ param Document $project
* @ param Document $migration
2023-11-22 13:50:57 +00:00
* @ param Log $log
2023-10-01 17:39:26 +00:00
* @ return void
* @ throws Authorization
* @ throws Conflict
* @ throws Restricted
* @ throws Structure
* @ throws \Utopia\Database\Exception
*/
2024-05-28 17:37:37 +00:00
protected function processMigration ( Document $migration , Log $log ) : void
2023-10-01 17:39:26 +00:00
{
2024-05-28 17:37:37 +00:00
$project = $this -> project ;
2023-10-01 17:39:26 +00:00
$projectDocument = $this -> dbForConsole -> getDocument ( 'projects' , $project -> getId ());
$tempAPIKey = $this -> generateAPIKey ( $projectDocument );
try {
$migrationDocument = $this -> dbForProject -> getDocument ( 'migrations' , $migration -> getId ());
$migrationDocument -> setAttribute ( 'stage' , 'processing' );
$migrationDocument -> setAttribute ( 'status' , 'processing' );
2024-05-01 17:15:16 +00:00
$log -> addBreadcrumb ( new Breadcrumb ( " debug " , " migration " , " Migration hit stage 'processing' " , \microtime ( true )));
2023-10-01 17:39:26 +00:00
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
2023-11-22 13:50:57 +00:00
$log -> addTag ( 'type' , $migrationDocument -> getAttribute ( 'source' ));
2024-05-28 17:05:36 +00:00
$source = $this -> processSource (
$migrationDocument -> getAttribute ( 'source' ),
$migrationDocument -> getAttribute ( 'credentials' )
);
$destination = $this -> processDestination (
$migrationDocument -> getAttribute ( 'destination' ),
[
'projectId' => $projectDocument -> getId (),
'endpoint' => 'http://appwrite/v1' ,
'apiKey' => $tempAPIKey [ 'secret' ]
]
);
2023-10-01 17:39:26 +00:00
$source -> report ();
$transfer = new Transfer (
$source ,
$destination
);
/** Start Transfer */
$migrationDocument -> setAttribute ( 'stage' , 'migrating' );
2024-05-01 17:15:16 +00:00
$log -> addBreadcrumb ( new Breadcrumb ( " debug " , " migration " , " Migration hit stage 'migrating' " , \microtime ( true )));
2023-10-01 17:39:26 +00:00
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
$transfer -> run ( $migrationDocument -> getAttribute ( 'resources' ), function () use ( $migrationDocument , $transfer , $projectDocument ) {
$migrationDocument -> setAttribute ( 'resourceData' , json_encode ( $transfer -> getCache ()));
$migrationDocument -> setAttribute ( 'statusCounters' , json_encode ( $transfer -> getStatusCounters ()));
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
});
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
if ( ! empty ( $sourceErrors ) || ! empty ( $destinationErrors )) {
2023-10-01 17:39:26 +00:00
$migrationDocument -> setAttribute ( 'status' , 'failed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
2024-05-01 17:15:16 +00:00
$log -> addBreadcrumb ( new Breadcrumb ( " debug " , " migration " , " Migration hit stage 'finished' and failed " , \microtime ( true )));
2023-10-01 17:39:26 +00:00
$errorMessages = [];
2024-02-24 14:18:55 +00:00
foreach ( $sourceErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:19:50 +00:00
$errorMessages [] = " Error occurred while fetching ' { $error -> getResourceType () } : { $error -> getResourceId () } ' from source with message: ' { $error -> getMessage () } ' " ;
2024-02-24 14:18:55 +00:00
}
foreach ( $destinationErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:22:30 +00:00
$errorMessages [] = " Error occurred while pushing ' { $error -> getResourceType () } : { $error -> getResourceId () } ' to destination with message: ' { $error -> getMessage () } ' " ;
2023-10-01 17:39:26 +00:00
}
$migrationDocument -> setAttribute ( 'errors' , $errorMessages );
2024-05-01 16:31:31 +00:00
$log -> addExtra ( 'migrationErrors' , json_encode ( $errorMessages ));
2023-10-01 17:39:26 +00:00
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
2024-05-01 17:15:16 +00:00
return ;
2023-10-01 17:39:26 +00:00
}
$migrationDocument -> setAttribute ( 'status' , 'completed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
2024-05-01 17:15:16 +00:00
$log -> addBreadcrumb ( new Breadcrumb ( " debug " , " migration " , " Migration hit stage 'finished' and succeeded " , \microtime ( true )));
2023-10-01 17:39:26 +00:00
} catch ( \Throwable $th ) {
Console :: error ( $th -> getMessage ());
if ( $migrationDocument ) {
Console :: error ( $th -> getMessage ());
Console :: error ( $th -> getTraceAsString ());
$migrationDocument -> setAttribute ( 'status' , 'failed' );
$migrationDocument -> setAttribute ( 'stage' , 'finished' );
$migrationDocument -> setAttribute ( 'errors' , [ $th -> getMessage ()]);
return ;
}
if ( $transfer ) {
2024-02-24 14:18:55 +00:00
$sourceErrors = $source -> getErrors ();
$destinationErrors = $destination -> getErrors ();
2023-10-01 17:39:26 +00:00
2024-02-24 14:18:55 +00:00
$errorMessages = [];
foreach ( $sourceErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:21:21 +00:00
$errorMessages [] = " Error occurred while fetching ' { $error -> getResourceType () } : { $error -> getResourceId () } ' from source with message ' { $error -> getMessage () } ' " ;
2023-10-01 17:39:26 +00:00
}
2024-02-24 14:18:55 +00:00
foreach ( $destinationErrors as $error ) {
/** @var MigrationException $error */
2024-02-24 15:21:21 +00:00
$errorMessages [] = " Error occurred while pushing ' { $error -> getResourceType () } : { $error -> getResourceId () } ' to destination with message ' { $error -> getMessage () } ' " ;
2024-02-24 14:18:55 +00:00
}
$migrationDocument -> setAttribute ( 'errors' , $errorMessages );
2024-05-01 16:31:31 +00:00
$log -> addTag ( 'migrationErrors' , json_encode ( $errorMessages ));
2023-10-01 17:39:26 +00:00
}
} finally {
2024-02-24 14:43:33 +00:00
if ( $tempAPIKey ) {
$this -> removeAPIKey ( $tempAPIKey );
}
2023-10-01 17:39:26 +00:00
if ( $migrationDocument ) {
$this -> updateMigrationDocument ( $migrationDocument , $projectDocument );
2024-02-24 14:18:55 +00:00
if ( $migrationDocument -> getAttribute ( 'status' , '' ) == 'failed' ) {
2024-05-01 16:31:31 +00:00
throw new Exception ( " Migration failed " );
2024-02-24 14:18:55 +00:00
}
2023-10-01 17:39:26 +00:00
}
}
}
2024-05-01 16:46:19 +00:00
}