Update usage dump to upsert documents in batches

This commit is contained in:
Jake Barnby 2025-01-23 01:44:33 +13:00
parent 758d8496c1
commit d533113d86
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C

View file

@ -8,6 +8,7 @@ use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Database\Exception\NotFound;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
@ -70,10 +71,11 @@ class UsageDump extends Action
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
@ -83,109 +85,73 @@ class UsageDump extends Action
try {
$this->handleDatabaseStorage($key, $dbForProject);
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage());
Console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage());
}
continue;
}
$documents = [];
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats',
$id,
'value',
$value
);
}
}
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
}
$dbForProject->createOrUpdateDocumentsWithInplaceIncrease(
collection: 'stats',
attribute: 'value',
documents: $documents
);
}
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
}
private function handleDatabaseStorage(string $key, Database $dbForProject): void
{
$data = explode('.', $key);
$start = microtime(true);
$data = \explode('.', $key);
$start = \microtime(true);
$updateMetric = function (Database $dbForProject, int $value, string $key, string $period, string|null $time) {
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats',
$id,
'value',
$value
);
}
}
};
$documents = [];
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$time = 'inf' === $period ? null : \date($format, \time());
$id = \md5("{$time}_{$period}_{$key}");
$value = 0;
$previousValue = 0;
try {
$previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0);
} catch (\Exception $e) {
$previousValue = $dbForProject
->getDocument('stats', $id)
->getAttribute('value', 0);
} catch (\Exception) {
// No previous value
}
switch (count($data)) {
switch (\count($data)) {
// Collection Level
case METRIC_COLLECTION_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collectionInternalId = $data[1];
try {
$value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId);
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
if (!$e instanceof NotFound) {
throw $e;
}
}
@ -197,18 +163,43 @@ class UsageDump extends Action
break;
}
// Update Collection
$updateMetric($dbForProject, $diff, $key, $period, $time);
$databaseKey = \str_replace(
['{databaseInternalId}'],
[$data[0]],
METRIC_DATABASE_ID_STORAGE
);
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $diff, $databaseKey, $period, $time);
// Database
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $databaseKey,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
// Collection
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
// Project
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => METRIC_DATABASES_STORAGE,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
break;
// Database Level
// Database Level
case METRIC_DATABASE_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
@ -217,8 +208,7 @@ class UsageDump extends Action
try {
$collections = $dbForProject->find('database_' . $databaseInternalId);
} catch (\Exception $e) {
// Database not found
if ($e->getMessage() !== 'Collection not found') {
if (!$e instanceof NotFound) {
throw $e;
}
}
@ -227,8 +217,7 @@ class UsageDump extends Action
try {
$value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
if (!$e instanceof NotFound) {
throw $e;
}
}
@ -240,19 +229,37 @@ class UsageDump extends Action
break;
}
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $diff, $databaseKey, $period, $time);
// Database
$databaseKey = str_replace(
['{databaseInternalId}'],
[$data[0]],
METRIC_DATABASE_ID_STORAGE
);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $databaseKey,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
// Project
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => METRIC_DATABASES_STORAGE,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
break;
// Project Level
// Project Level
case METRIC_PROJECT_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']');
// Get all project databases
$databases = $dbForProject->find('database');
$databases = $dbForProject->find('databases');
// Recalculate all databases
foreach ($databases as $database) {
@ -262,8 +269,7 @@ class UsageDump extends Action
try {
$value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
if (!$e instanceof NotFound) {
throw $e;
}
}
@ -272,15 +278,27 @@ class UsageDump extends Action
$diff = $value - $previousValue;
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $diff, $projectKey, $period, $time);
// Project
$documents[] = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => METRIC_DATABASES_STORAGE,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
break;
}
}
$dbForProject->createOrUpdateDocumentsWithInplaceIncrease(
collection: 'stats',
attribute: 'value',
documents: $documents
);
$end = microtime(true);
console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds');
Console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds');
}
}