diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 39adb37374..b90dcd635d 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Realtime; +use Appwrite\Event\StatsUsage; use Exception; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -13,9 +14,14 @@ use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Conflict; use Utopia\Database\Exception\Restricted; use Utopia\Database\Exception\Structure; +use Utopia\Database\Validator\Authorization as AuthorizationValidator; use Utopia\Migration\Destination; use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite; use Utopia\Migration\Exception as MigrationException; +use Utopia\Migration\Resource; +use Utopia\Migration\Resources\Database\Database as ResourceDatabase; +use Utopia\Migration\Resources\Database\Row as ResourceRow; +use Utopia\Migration\Resources\Database\Table as ResourceTable; use Utopia\Migration\Source; use Utopia\Migration\Sources\Appwrite as SourceAppwrite; use Utopia\Migration\Sources\CSV; @@ -45,6 +51,7 @@ class Migrations extends Action */ protected array $sourceReport = []; + private string $source; /** * @var callable */ @@ -69,13 +76,14 @@ class Migrations extends Action ->inject('logError') ->inject('queueForRealtime') ->inject('deviceForImports') + ->inject('queueForStatsUsage') // Added ->callback($this->action(...)); } /** * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForImports): void + public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForImports, StatsUsage $queueForStatsUsage): void { $payload = $message->getPayload() ?? []; $this->deviceForImports = $deviceForImports; @@ -103,7 +111,7 @@ class Migrations extends Action return; } - $this->processMigration($migration, $queueForRealtime); + $this->processMigration($migration, $queueForRealtime, $queueForStatsUsage); // Updated } /** @@ -267,7 +275,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration, Realtime $queueForRealtime): void + protected function processMigration(Document $migration, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage): void { $project = $this->project; $projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId()); @@ -301,6 +309,7 @@ class Migrations extends Action $destination ); + $aggregatedResources = []; /** Start Transfer */ if (empty($source->getErrors())) { $migration->setAttribute('stage', 'migrating'); @@ -308,9 +317,13 @@ class Migrations extends Action $transfer->run( $migration->getAttribute('resources'), - function () use ($migration, $transfer, $projectDocument, $queueForRealtime) { + function ($resources) use ($migration, $transfer, $projectDocument, $queueForRealtime, &$aggregatedResources) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); + + if (!empty($resources)) { + $aggregatedResources[] = $resources; + } $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); }, $migration->getAttribute('resourceId'), @@ -412,9 +425,91 @@ class Migrations extends Action } if ($migration->getAttribute('status', '') === 'completed') { + foreach ($aggregatedResources as $resource) { + $this->processMigrationResourceStats( + $resource, + $queueForStatsUsage, + $projectDocument, + $migration->getAttribute('source'), + $migration->getAttribute('resourceId') + ); + } $destination?->success(); $source?->success(); } } } + + private function processMigrationResourceStats(array $resources, StatsUsage $queueForStatsUsage, Document $projectDocument, string $source, ?string $resourceId) + { + /** + * @var Resource $resource + */ + var_dump($resources); + $resource = $resources[0]; + + $databaseInternalId = null; + $tableInternalId = null; + if ($source === CSV::getName() && !empty($resourceId)) { + [$databaseId, $tableId] = explode(':', $resourceId); + $database = AuthorizationValidator::skip(fn () => $this->dbForProject->getDocument('databases', $databaseId)); + $table = AuthorizationValidator::skip(fn () => $this->dbForProject->getDocument('database_' . $database->getSequence(), $tableId)); + $databaseInternalId = (int) $database->getSequence(); + $tableInternalId = (int) $table->getSequence(); + } + + $count = count($resources); + + switch ($resource->getName()) { + case ResourceDatabase::getName(): + $queueForStatsUsage->addMetric(METRIC_DATABASES, $count); + break; + + case ResourceTable::getName(): + /** @var ResourceTable $resource */ + $dbSeq = ($source === CSV::getName() && !empty($databaseInternalId)) + ? (int) $databaseInternalId + : (int) $resource->getDatabase()->getSequence(); + + $queueForStatsUsage + ->addMetric(METRIC_COLLECTIONS, $count) + ->addMetric( + str_replace('{databaseInternalId}', $dbSeq, METRIC_DATABASE_ID_COLLECTIONS), + $count + ); + break; + + case ResourceRow::getName(): + /** @var ResourceRow $resource */ + + $table = $resource->getTable(); + $dbSeq = ($source === CSV::getName() && !empty($databaseInternalId)) + ? (int) $databaseInternalId + : (int) $table->getDatabase()->getSequence(); + $colSeq = ($source === CSV::getName() && !empty($tableInternalId)) + ? (int) $tableInternalId + : (int) $table->getSequence(); + + $queueForStatsUsage + ->addMetric( + str_replace( + ['{databaseInternalId}','{collectionInternalId}'], + [$dbSeq, $colSeq], + METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS + ), + $count + ) + ->addMetric( + str_replace('{databaseInternalId}', $dbSeq, METRIC_DATABASE_ID_DOCUMENTS), + $count + ); + break; + + default: + break; + } + + $queueForStatsUsage->setProject($projectDocument)->trigger(); + $queueForStatsUsage->reset(); + } }