added processing of migration resource stats

This commit is contained in:
ArnabChatterjee20k 2025-08-28 19:06:18 +05:30
parent 8a76979ba8
commit 0737c78cb5

View file

@ -4,6 +4,7 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Exception;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -13,9 +14,14 @@ use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Conflict;
use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Validator\Authorization as AuthorizationValidator;
use Utopia\Migration\Destination;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite;
use Utopia\Migration\Exception as MigrationException;
use Utopia\Migration\Resource;
use Utopia\Migration\Resources\Database\Database as ResourceDatabase;
use Utopia\Migration\Resources\Database\Row as ResourceRow;
use Utopia\Migration\Resources\Database\Table as ResourceTable;
use Utopia\Migration\Source;
use Utopia\Migration\Sources\Appwrite as SourceAppwrite;
use Utopia\Migration\Sources\CSV;
@ -45,6 +51,7 @@ class Migrations extends Action
*/
protected array $sourceReport = [];
private string $source;
/**
* @var callable
*/
@ -69,13 +76,14 @@ class Migrations extends Action
->inject('logError')
->inject('queueForRealtime')
->inject('deviceForImports')
->inject('queueForStatsUsage') // Added
->callback($this->action(...));
}
/**
* @throws Exception
*/
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForImports): void
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime, Device $deviceForImports, StatsUsage $queueForStatsUsage): void
{
$payload = $message->getPayload() ?? [];
$this->deviceForImports = $deviceForImports;
@ -103,7 +111,7 @@ class Migrations extends Action
return;
}
$this->processMigration($migration, $queueForRealtime);
$this->processMigration($migration, $queueForRealtime, $queueForStatsUsage); // Updated
}
/**
@ -267,7 +275,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration, Realtime $queueForRealtime): void
protected function processMigration(Document $migration, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage): void
{
$project = $this->project;
$projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId());
@ -301,6 +309,7 @@ class Migrations extends Action
$destination
);
$aggregatedResources = [];
/** Start Transfer */
if (empty($source->getErrors())) {
$migration->setAttribute('stage', 'migrating');
@ -308,9 +317,13 @@ class Migrations extends Action
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument, $queueForRealtime) {
function ($resources) use ($migration, $transfer, $projectDocument, $queueForRealtime, &$aggregatedResources) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
if (!empty($resources)) {
$aggregatedResources[] = $resources;
}
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
},
$migration->getAttribute('resourceId'),
@ -412,9 +425,91 @@ class Migrations extends Action
}
if ($migration->getAttribute('status', '') === 'completed') {
foreach ($aggregatedResources as $resource) {
$this->processMigrationResourceStats(
$resource,
$queueForStatsUsage,
$projectDocument,
$migration->getAttribute('source'),
$migration->getAttribute('resourceId')
);
}
$destination?->success();
$source?->success();
}
}
}
private function processMigrationResourceStats(array $resources, StatsUsage $queueForStatsUsage, Document $projectDocument, string $source, ?string $resourceId)
{
/**
* @var Resource $resource
*/
var_dump($resources);
$resource = $resources[0];
$databaseInternalId = null;
$tableInternalId = null;
if ($source === CSV::getName() && !empty($resourceId)) {
[$databaseId, $tableId] = explode(':', $resourceId);
$database = AuthorizationValidator::skip(fn () => $this->dbForProject->getDocument('databases', $databaseId));
$table = AuthorizationValidator::skip(fn () => $this->dbForProject->getDocument('database_' . $database->getSequence(), $tableId));
$databaseInternalId = (int) $database->getSequence();
$tableInternalId = (int) $table->getSequence();
}
$count = count($resources);
switch ($resource->getName()) {
case ResourceDatabase::getName():
$queueForStatsUsage->addMetric(METRIC_DATABASES, $count);
break;
case ResourceTable::getName():
/** @var ResourceTable $resource */
$dbSeq = ($source === CSV::getName() && !empty($databaseInternalId))
? (int) $databaseInternalId
: (int) $resource->getDatabase()->getSequence();
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $count)
->addMetric(
str_replace('{databaseInternalId}', $dbSeq, METRIC_DATABASE_ID_COLLECTIONS),
$count
);
break;
case ResourceRow::getName():
/** @var ResourceRow $resource */
$table = $resource->getTable();
$dbSeq = ($source === CSV::getName() && !empty($databaseInternalId))
? (int) $databaseInternalId
: (int) $table->getDatabase()->getSequence();
$colSeq = ($source === CSV::getName() && !empty($tableInternalId))
? (int) $tableInternalId
: (int) $table->getSequence();
$queueForStatsUsage
->addMetric(
str_replace(
['{databaseInternalId}','{collectionInternalId}'],
[$dbSeq, $colSeq],
METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS
),
$count
)
->addMetric(
str_replace('{databaseInternalId}', $dbSeq, METRIC_DATABASE_ID_DOCUMENTS),
$count
);
break;
default:
break;
}
$queueForStatsUsage->setProject($projectDocument)->trigger();
$queueForStatsUsage->reset();
}
}