diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php index fa31691a18..d373c65a88 100644 --- a/src/Appwrite/Platform/Workers/StatsUsageDump.php +++ b/src/Appwrite/Platform/Workers/StatsUsageDump.php @@ -7,7 +7,6 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; -use Utopia\Database\Exception\NotFound; use Utopia\Platform\Action; use Utopia\Queue\Message; use Utopia\System\System; @@ -64,6 +63,11 @@ class StatsUsageDump extends Action '.builds.storage', ]; + /** + * @var callable + */ + protected mixed $getLogsDB; + protected array $periods = [ '1h' => 'Y-m-d H:00', '1d' => 'Y-m-d 00:00', @@ -90,59 +94,56 @@ class StatsUsageDump extends Action /** * @param Message $message * @param callable $getProjectDB + * @param callable $getLogsDB * @return void * @throws Exception * @throws \Utopia\Database\Exception */ public function action(Message $message, callable $getProjectDB, callable $getLogsDB): void { + $this->getLogsDB = $getLogsDB; $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } - try { - foreach ($payload['stats'] ?? [] as $stats) { - $project = new Document($stats['project'] ?? []); - $numberOfKeys = !empty($stats['keys']) ? \count($stats['keys']) : 0; - $receivedAt = $stats['receivedAt'] ?? 'NONE'; - if ($numberOfKeys === 0) { - continue; - } + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + + /** + * End temp bug fallback + */ + $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; + $receivedAt = $stats['receivedAt'] ?? 'NONE'; + if ($numberOfKeys === 0) { + continue; + } + + console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + + try { + /** @var \Utopia\Database\Database $dbForProject */ $dbForProject = $getProjectDB($project); - $dbForLogs = $getLogsDB($project); - $projectDocuments = []; - $databaseCache = []; - $collectionSizeCache = []; - - Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys . ' Started'); - $start = \microtime(true); - foreach ($stats['keys'] ?? [] as $key => $value) { if ($value == 0) { continue; } + if (str_contains($key, METRIC_DATABASES_STORAGE)) { + try { + $this->handleDatabaseStorage($key, $dbForProject, $project); + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage()); + } + continue; + } + foreach ($this->periods as $period => $format) { - $time = 'inf' === $period ? null : \date($format, \time()); + $time = 'inf' === $period ? null : date($format, time()); $id = \md5("{$time}_{$period}_{$key}"); - if (\str_contains($key, METRIC_DATABASES_STORAGE)) { - $this->handleDatabaseStorage( - $id, - $key, - $time, - $period, - $dbForProject, - $projectDocuments, - $databaseCache, - $collectionSizeCache - ); - continue; - } - - $projectDocuments[] = new Document([ + $document = new Document([ '$id' => $id, 'period' => $period, 'time' => $time, @@ -150,223 +151,192 @@ class StatsUsageDump extends Action 'value' => $value, 'region' => System::getEnv('_APP_REGION', 'default'), ]); + $dbForProject->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); + + $this->writeToLogsDB($project, $document); } } - - /** - * Create clone to dual write - * This is required as first request to db - * modifies the document's details like internal ID - * which will conflict in new DB - */ - $clonedProjectDoucments = []; - foreach ($projectDocuments as $document) { - if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) { - continue; - } - foreach ($this->skipParentIdMetrics as $skipMetric) { - if (str_ends_with($document->getAttribute('metric'), $skipMetric)) { - continue; - } - } - $clonedProjectDoucments[] = new Document($document->getArrayCopy()); - } - - $dbForProject->createOrUpdateDocumentsWithIncrease( - collection: 'stats', - attribute: 'value', - documents: $projectDocuments - ); - - $dbForLogs->createOrUpdateDocumentsWithIncrease( - collection: 'usage', - attribute: 'value', - documents: $clonedProjectDoucments - ); - $end = \microtime(true); - Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys. ' Time: '.($end - $start).'s'); + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } - } catch (\Exception $e) { - Console::error('[' . DateTime::now() . '] Error processing stats: ' . $e->getMessage()); } } - private function handleDatabaseStorage( - string $id, - string $key, - ?string $time, - string $period, - Database $dbForProject, - array &$projectDocuments, - array &$databaseCache, - array &$collectionSizeCache, - ): void { - $data = \explode('.', $key); - $value = 0; - $previousValue = 0; + private function handleDatabaseStorage(string $key, Database $dbForProject, Document $project): void + { + $data = explode('.', $key); + $start = microtime(true); - try { - $previousValue = $dbForProject - ->getDocument('stats', $id) - ->getAttribute('value', 0); - } catch (\Exception) { - // No previous value - } + $updateMetric = function (Database $dbForProject, Document $project, int $value, string $key, string $period, string|null $time) { + $id = \md5("{$time}_{$period}_{$key}"); - switch (\count($data)) { - case METRIC_COLLECTION_LEVEL_STORAGE: - $databaseInternalId = $data[0]; - $collectionInternalId = $data[1]; - $collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}"; + $document = new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => System::getEnv('_APP_REGION', 'default'), + ]); + $dbForProject->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); + $this->writeToLogsDB($project, $document); + }; + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + $value = 0; + $previousValue = 0; + try { + $previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0); + } catch (\Exception $e) { + // No previous value + } + + switch (count($data)) { + // Collection Level + case METRIC_COLLECTION_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']'); + $databaseInternalId = $data[0]; + $collectionInternalId = $data[1]; - if (!isset($collectionSizeCache[$collectionId])) { try { - $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + $value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId); } catch (\Exception $e) { - if (!$e instanceof NotFound) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { throw $e; } - $collectionSizeCache[$collectionId] = 0; } - } - $value = $collectionSizeCache[$collectionId]; + // Compare with previous value + $diff = $value - $previousValue; - $diff = $value - $previousValue; - if ($diff === 0) { + if ($diff === 0) { + break; + } + + // Update Collection + $updateMetric($dbForProject, $project, $diff, $key, $period, $time); + + // Update Database + $databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE); + $updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time); + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); break; - } + // Database Level + case METRIC_DATABASE_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']'); + $databaseInternalId = $data[0]; - $keys = [ - $key, - \str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE), - METRIC_DATABASES_STORAGE - ]; - - foreach ($keys as $metric) { - $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); - } - break; - - case METRIC_DATABASE_LEVEL_STORAGE: - $databaseInternalId = $data[0]; - $databaseId = "database_{$databaseInternalId}"; - - if (!isset($databaseCache[$databaseId])) { + $collections = []; try { - $databaseCache[$databaseId] = $dbForProject->find($databaseId); + $collections = $dbForProject->find('database_' . $databaseInternalId); } catch (\Exception $e) { - if (!$e instanceof NotFound) { + // Database not found + if ($e->getMessage() !== 'Collection not found') { throw $e; } - $databaseCache[$databaseId] = []; } - } - foreach ($databaseCache[$databaseId] as $collection) { - $collectionId = "{$databaseId}_collection_{$collection->getInternalId()}"; - - if (!isset($collectionSizeCache[$collectionId])) { + foreach ($collections as $collection) { try { - $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + $value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId()); } catch (\Exception $e) { - if (!$e instanceof NotFound) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { throw $e; } - $collectionSizeCache[$collectionId] = 0; } } - $value += $collectionSizeCache[$collectionId]; - } - $diff = $value - $previousValue; - if ($diff === 0) { + $diff = $value - $previousValue; + + if ($diff === 0) { + break; + } + + // Update Database + $databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE); + $updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time); + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); break; - } + // Project Level + case METRIC_PROJECT_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']'); + // Get all project databases + $databases = $dbForProject->find('database'); - $keys = [ - \str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE), - METRIC_DATABASES_STORAGE - ]; + // Recalculate all databases + foreach ($databases as $database) { + $collections = $dbForProject->find('database_' . $database->getInternalId()); - foreach ($keys as $metric) { - $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); - } - break; - - case METRIC_PROJECT_LEVEL_STORAGE: - if (!isset($databaseCache['*'])) { - try { - $databaseCache['*'] = $dbForProject->find('databases'); - } catch (\Exception $e) { - if (!$e instanceof NotFound) { - throw $e; - } - $databaseCache['*'] = []; - } - } - - foreach ($databaseCache['*'] as $database) { - $databaseId = "database_{$database->getInternalId()}"; - if (!isset($databaseCache[$databaseId])) { - try { - $databaseCache[$databaseId] = $dbForProject->find($databaseId); - } catch (\Exception $e) { - if (!$e instanceof NotFound) { - throw $e; - } - $databaseCache[$databaseId] = []; - } - } - - foreach ($databaseCache[$databaseId] as $collection) { - $collectionId = "{$databaseId}_collection_{$collection->getInternalId()}"; - - if (!isset($collectionSizeCache[$collectionId])) { + foreach ($collections as $collection) { try { - $collectionSizeCache[$collectionId] = $dbForProject->getSizeOfCollection($collectionId); + $value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); } catch (\Exception $e) { - if (!$e instanceof NotFound) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { throw $e; } - $collectionSizeCache[$collectionId] = 0; } } - $value += $collectionSizeCache[$collectionId]; } - } - $diff = $value - $previousValue; - if ($diff === 0) { + $diff = $value - $previousValue; + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); break; - } - - $keys = [ - METRIC_DATABASES_STORAGE - ]; - - foreach ($keys as $metric) { - $projectDocuments[] = $this->createStatsDocument($id, $period, $time, $metric, $diff); - } - - break; + } } + + $end = microtime(true); + + console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds'); } - private function createStatsDocument( - string $id, - string $period, - ?string $time, - string $key, - int $diff, - ): Document { - return new Document([ - '$id' => $id, - 'period' => $period, - 'time' => $time, - 'metric' => $key, - 'value' => $diff, - 'region' => System::getEnv('_APP_REGION', 'default'), - ]); + protected function writeToLogsDB(Document $project, Document $document) + { + $databasesToDualWrite = explode(',', System::getEnv('_APP_STATS_USAGE_DUAL_WRITING_DBS', '')); + + $db = $project->getAttribute('database'); + if (!in_array($db, $databasesToDualWrite)) { + return; + } + + /** @var \Utopia\Database\Database $dbForLogs*/ + $dbForLogs = call_user_func($this->getLogsDB, $project); + + if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) { + return; + } + foreach ($this->skipParentIdMetrics as $skipMetric) { + if (str_ends_with($document->getAttribute('metric'), $skipMetric)) { + return; + } + } + + $dbForLogs->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); } }