get latest usageDump and dual write

This commit is contained in:
Damodar Lohani 2025-02-05 09:11:03 +00:00
parent 5b91b5cec1
commit 5052fd8528

View file

@ -7,7 +7,6 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\NotFound;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
@ -64,6 +63,11 @@ class StatsUsageDump extends Action
'.builds.storage',
];
/**
* @var callable
*/
protected mixed $getLogsDB;
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
@ -90,59 +94,56 @@ class StatsUsageDump extends Action
/**
* @param Message $message
* @param callable $getProjectDB
* @param callable $getLogsDB
* @return void
* @throws Exception
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, callable $getProjectDB, callable $getLogsDB): void
{
$this->getLogsDB = $getLogsDB;
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
try {
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
$numberOfKeys = !empty($stats['keys']) ? \count($stats['keys']) : 0;
$receivedAt = $stats['receivedAt'] ?? 'NONE';
if ($numberOfKeys === 0) {
continue;
}
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
/**
* End temp bug fallback
*/
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
$receivedAt = $stats['receivedAt'] ?? 'NONE';
if ($numberOfKeys === 0) {
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
/** @var \Utopia\Database\Database $dbForProject */
$dbForProject = $getProjectDB($project);
$dbForLogs = $getLogsDB($project);
$projectDocuments = [];
$databaseCache = [];
$collectionSizeCache = [];
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys . ' Started');
$start = \microtime(true);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
if (str_contains($key, METRIC_DATABASES_STORAGE)) {
try {
$this->handleDatabaseStorage($key, $dbForProject, $project);
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage());
}
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : \date($format, \time());
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
if (\str_contains($key, METRIC_DATABASES_STORAGE)) {
$this->handleDatabaseStorage(
$id,
$key,
$time,
$period,
$dbForProject,
$projectDocuments,
$databaseCache,
$collectionSizeCache
);
continue;
}
$projectDocuments[] = new Document([
$document = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
@ -150,223 +151,192 @@ class StatsUsageDump extends Action
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
$dbForProject->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
$this->writeToLogsDB($project, $document);
}
}
/**
* Create clone to dual write
* This is required as first request to db
* modifies the document's details like internal ID
* which will conflict in new DB
*/
$clonedProjectDoucments = [];
foreach ($projectDocuments as $document) {
if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) {
continue;
}
foreach ($this->skipParentIdMetrics as $skipMetric) {
if (str_ends_with($document->getAttribute('metric'), $skipMetric)) {
continue;
}
}
$clonedProjectDoucments[] = new Document($document->getArrayCopy());
}
$dbForProject->createOrUpdateDocumentsWithIncrease(
collection: 'stats',
attribute: 'value',
documents: $projectDocuments
);
$dbForLogs->createOrUpdateDocumentsWithIncrease(
collection: 'usage',
attribute: 'value',
documents: $clonedProjectDoucments
);
$end = \microtime(true);
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys. ' Time: '.($end - $start).'s');
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
} catch (\Exception $e) {
Console::error('[' . DateTime::now() . '] Error processing stats: ' . $e->getMessage());
}
}
private function handleDatabaseStorage(
string $id,
string $key,
?string $time,
string $period,
Database $dbForProject,
array &$projectDocuments,
array &$databaseCache,
array &$collectionSizeCache,
): void {
$data = \explode('.', $key);
$value = 0;
$previousValue = 0;
private function handleDatabaseStorage(string $key, Database $dbForProject, Document $project): void
{
$data = explode('.', $key);
$start = microtime(true);
try {
$previousValue = $dbForProject
->getDocument('stats', $id)
->getAttribute('value', 0);
} catch (\Exception) {
// No previous value
}
$updateMetric = function (Database $dbForProject, Document $project, int $value, string $key, string $period, string|null $time) {
$id = \md5("{$time}_{$period}_{$key}");
switch (\count($data)) {
case METRIC_COLLECTION_LEVEL_STORAGE:
$databaseInternalId = $data[0];
$collectionInternalId = $data[1];
$collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}";
$document = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
$dbForProject->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
$this->writeToLogsDB($project, $document);
};
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
$value = 0;
$previousValue = 0;
try {
$previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0);
} catch (\Exception $e) {
// No previous value
}
switch (count($data)) {
// Collection Level
case METRIC_COLLECTION_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collectionInternalId = $data[1];
if (!isset($collectionSizeCache[$collectionId])) {
try {
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
$value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId);
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
$collectionSizeCache[$collectionId] = 0;
}
}
$value = $collectionSizeCache[$collectionId];
// Compare with previous value
$diff = $value - $previousValue;
$diff = $value - $previousValue;
if ($diff === 0) {
if ($diff === 0) {
break;
}
// Update Collection
$updateMetric($dbForProject, $project, $diff, $key, $period, $time);
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
}
// Database Level
case METRIC_DATABASE_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$keys = [
$key,
\str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE),
METRIC_DATABASES_STORAGE
];
foreach ($keys as $metric) {
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
}
break;
case METRIC_DATABASE_LEVEL_STORAGE:
$databaseInternalId = $data[0];
$databaseId = "database_{$databaseInternalId}";
if (!isset($databaseCache[$databaseId])) {
$collections = [];
try {
$databaseCache[$databaseId] = $dbForProject->find($databaseId);
$collections = $dbForProject->find('database_' . $databaseInternalId);
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
// Database not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
$databaseCache[$databaseId] = [];
}
}
foreach ($databaseCache[$databaseId] as $collection) {
$collectionId = "{$databaseId}_collection_{$collection->getInternalId()}";
if (!isset($collectionSizeCache[$collectionId])) {
foreach ($collections as $collection) {
try {
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
$value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
$collectionSizeCache[$collectionId] = 0;
}
}
$value += $collectionSizeCache[$collectionId];
}
$diff = $value - $previousValue;
if ($diff === 0) {
$diff = $value - $previousValue;
if ($diff === 0) {
break;
}
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
}
// Project Level
case METRIC_PROJECT_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']');
// Get all project databases
$databases = $dbForProject->find('database');
$keys = [
\str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE),
METRIC_DATABASES_STORAGE
];
// Recalculate all databases
foreach ($databases as $database) {
$collections = $dbForProject->find('database_' . $database->getInternalId());
foreach ($keys as $metric) {
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
}
break;
case METRIC_PROJECT_LEVEL_STORAGE:
if (!isset($databaseCache['*'])) {
try {
$databaseCache['*'] = $dbForProject->find('databases');
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
throw $e;
}
$databaseCache['*'] = [];
}
}
foreach ($databaseCache['*'] as $database) {
$databaseId = "database_{$database->getInternalId()}";
if (!isset($databaseCache[$databaseId])) {
try {
$databaseCache[$databaseId] = $dbForProject->find($databaseId);
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
throw $e;
}
$databaseCache[$databaseId] = [];
}
}
foreach ($databaseCache[$databaseId] as $collection) {
$collectionId = "{$databaseId}_collection_{$collection->getInternalId()}";
if (!isset($collectionSizeCache[$collectionId])) {
foreach ($collections as $collection) {
try {
$collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId);
$value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
if (!$e instanceof NotFound) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
$collectionSizeCache[$collectionId] = 0;
}
}
$value += $collectionSizeCache[$collectionId];
}
}
$diff = $value - $previousValue;
if ($diff === 0) {
$diff = $value - $previousValue;
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
}
$keys = [
METRIC_DATABASES_STORAGE
];
foreach ($keys as $metric) {
$projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff);
}
break;
}
}
$end = microtime(true);
console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds');
}
private function createStatsDocument(
string $id,
string $period,
?string $time,
string $key,
int $diff,
): Document {
return new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $diff,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
protected function writeToLogsDB(Document $project, Document $document)
{
$databasesToDualWrite = explode(',', System::getEnv('_APP_STATS_USAGE_DUAL_WRITING_DBS', ''));
$db = $project->getAttribute('database');
if (!in_array($db, $databasesToDualWrite)) {
return;
}
/** @var \Utopia\Database\Database $dbForLogs*/
$dbForLogs = call_user_func($this->getLogsDB, $project);
if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) {
return;
}
foreach ($this->skipParentIdMetrics as $skipMetric) {
if (str_ends_with($document->getAttribute('metric'), $skipMetric)) {
return;
}
}
$dbForLogs->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
}
}