sync with 1.6.x

This commit is contained in:
shimon 2024-11-24 10:31:21 +02:00
parent e91c762953
commit 9839a4ef36

View file

@ -16,7 +16,6 @@ use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID;
use Utopia\Logger\Log;
use Utopia\Migration\Destination;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite;
use Utopia\Migration\Exception as MigrationException;
@ -37,6 +36,8 @@ class Migrations extends Action
protected Document $project;
protected $logError;
public static function getName(): string
{
return 'migrations';
@ -52,14 +53,14 @@ class Migrations extends Action
->inject('message')
->inject('dbForProject')
->inject('dbForConsole')
->inject('log')
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log));
->inject('logError')
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, callable $logError) => $this->action($message, $dbForProject, $dbForConsole, $logError));
}
/**
* @throws Exception
*/
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void
public function action(Message $message, Database $dbForProject, Database $dbForConsole, callable $logError): void
{
$payload = $message->getPayload() ?? [];
@ -78,6 +79,7 @@ class Migrations extends Action
$this->dbForProject = $dbForProject;
$this->dbForConsole = $dbForConsole;
$this->project = $project;
$this->logError = $logError;
/**
* Handle Event execution.
@ -86,10 +88,7 @@ class Migrations extends Action
return;
}
$log->addTag('migrationId', $migration->getId());
$log->addTag('projectId', $project->getId());
$this->processMigration($migration, $log);
$this->processMigration($migration);
}
/**
@ -259,7 +258,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration, Log $log): void
protected function processMigration(Document $migration): void
{
$project = $this->project;
$projectDocument = $this->dbForConsole->getDocument('projects', $project->getId());
@ -268,8 +267,6 @@ class Migrations extends Action
$transfer = $source = $destination = null;
try {
$migration = $this->dbForProject->getDocument('migrations', $migration->getId());
if (
$migration->getAttribute('source') === SourceAppwrite::getName() &&
empty($migration->getAttribute('credentials', []))
@ -287,8 +284,6 @@ class Migrations extends Action
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument);
$log->addTag('type', $migration->getAttribute('source'));
$source = $this->processSource($migration);
$destination = $this->processDestination($migration, $tempAPIKey->getAttribute('secret'));
@ -326,7 +321,6 @@ class Migrations extends Action
$errorMessages = [];
foreach ($sourceErrors as $error) {
/** @var $sourceErrors $error */
$message = "Error occurred while fetching '{$error->getResourceName()}:{$error->getResourceId()}' from source with message: '{$error->getMessage()}'";
if ($error->getPrevious()) {
$message .= " Message: ".$error->getPrevious()->getMessage() . " File: ".$error->getPrevious()->getFile() . " Line: ".$error->getPrevious()->getLine();
@ -346,7 +340,6 @@ class Migrations extends Action
}
$migration->setAttribute('errors', $errorMessages);
$log->addExtra('migrationErrors', json_encode($errorMessages));
$this->updateMigrationDocument($migration, $projectDocument);
return;
@ -361,7 +354,6 @@ class Migrations extends Action
if (! $migration->isEmpty()) {
$migration->setAttribute('status', 'failed');
$migration->setAttribute('stage', 'finished');
$migration->setAttribute('errors', [$th->getMessage()]);
return;
}
@ -381,7 +373,6 @@ class Migrations extends Action
}
$migration->setAttribute('errors', $errorMessages);
$log->addTag('migrationErrors', json_encode($errorMessages));
}
} finally {
if (! $tempAPIKey->isEmpty()) {
@ -396,7 +387,23 @@ class Migrations extends Action
$destination->error();
$source->error();
throw new Exception('Migration failed');
foreach ($source->getErrors() as $error) {
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId() ?? '',
'source' => $migration->getAttribute('source') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
]);
}
foreach ($destination->getErrors() as $error) {
call_user_func($this->logError, $error, 'appwrite-worker', 'appwrite-queue-' . self::getName(), [
'migrationId' => $migration->getId() ?? '',
'source' => $migration->getAttribute('source') ?? '',
'resourceName' => $error->getResourceName(),
'resourceGroup' => $error->getResourceGroup()
]);
}
}
if ($migration->getAttribute('status', '') === 'completed') {