diff --git a/.env b/.env index 8f7d7996e7..06a6fa9b3e 100644 --- a/.env +++ b/.env @@ -86,6 +86,7 @@ _APP_MAINTENANCE_RETENTION_EXECUTION=1209600 _APP_MAINTENANCE_RETENTION_ABUSE=86400 _APP_MAINTENANCE_RETENTION_AUDIT=1209600 _APP_USAGE_AGGREGATION_INTERVAL=30 +_APP_STATS_RESOURCES_INTERVAL=3600 _APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000 _APP_MAINTENANCE_RETENTION_SCHEDULES=86400 _APP_USAGE_STATS=enabled @@ -110,3 +111,4 @@ _APP_MESSAGE_PUSH_TEST_DSN= _APP_WEBHOOK_MAX_FAILED_ATTEMPTS=10 _APP_PROJECT_REGIONS=default _APP_FUNCTIONS_CREATION_ABUSE_LIMIT=5000 +_APP_STATS_USAGE_DUAL_WRITING_DBS=database_db_main \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e89aa369cf..d53e3786da 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -367,7 +367,7 @@ In file `app/controllers/shared/api.php` On the database listener, add to an exi ```php case $document->getCollection() === 'teams': - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_TEAMS, $value); // per project break; ``` @@ -379,10 +379,10 @@ In that case you need also to handle children removal using addReduce() method c ```php case $document->getCollection() === 'buckets': //buckets - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_BUCKETS, $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; @@ -428,16 +428,16 @@ public function __construct() ->inject('dbForProject') ->inject('queueForFunctions') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('log') - ->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log)); + ->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log)); } ``` and then trigger the queue with the new metric like so: ```php -$queueForUsage +$queueForStatsUsage ->addMetric(METRIC_BUILDS, 1) ->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0)) ->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000) diff --git a/Dockerfile b/Dockerfile index 41810f5dc4..2bb9f80d9e 100755 --- a/Dockerfile +++ b/Dockerfile @@ -85,6 +85,10 @@ RUN chmod +x /usr/local/bin/doctor && \ chmod +x /usr/local/bin/worker-messaging && \ chmod +x /usr/local/bin/worker-migrations && \ chmod +x /usr/local/bin/worker-webhooks && \ + chmod +x /usr/local/bin/worker-stats-usage && \ + chmod +x /usr/local/bin/worker-stats-usage-dump && \ + chmod +x /usr/local/bin/stats-resources && \ + chmod +x /usr/local/bin/worker-stats-resources && \ chmod +x /usr/local/bin/worker-usage && \ chmod +x /usr/local/bin/worker-usage-dump diff --git a/app/cli.php b/app/cli.php index abf75c9608..0b2cb884e6 100644 --- a/app/cli.php +++ b/app/cli.php @@ -5,6 +5,8 @@ require_once __DIR__ . '/init.php'; use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Func; +use Appwrite\Event\StatsResources; +use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Utopia\Cache\Adapter\Sharding; @@ -160,6 +162,45 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform }; }, ['pools', 'dbForPlatform', 'cache']); +CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { + $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { + if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + return $database; + } + + $dbAdapter = $pools + ->get('logs') + ->pop() + ->getResource(); + + $database = new Database( + $dbAdapter, + $cache + ); + + $database + ->setSharedTables(true) + ->setNamespace('logsV1') + ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS) + ->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES); + + // set tenant + if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + } + + return $database; + }; +}, ['pools', 'cache']); + +CLI::setResource('queueForStatsUsage', function (Connection $publisher) { + return new StatsUsage($publisher); +}, ['publisher']); +CLI::setResource('queueForStatsResources', function (Publisher $publisher) { + return new StatsResources($publisher); +}, ['publisher']); CLI::setResource('publisher', function (Group $pools) { return $pools->get('publisher')->pop()->getResource(); }, ['pools']); diff --git a/app/config/collections.php b/app/config/collections.php index 8c8356aafd..533dee57a8 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -5,6 +5,7 @@ $common = include __DIR__ . '/collections/common.php'; $projects = include __DIR__ . '/collections/projects.php'; $databases = include __DIR__ . '/collections/databases.php'; $platform = include __DIR__ . '/collections/platform.php'; +$logs = include __DIR__ . '/collections/logs.php'; // see - http.php#245 // $collections['buckets']['files']; @@ -27,6 +28,7 @@ $collections = [ 'databases' => $databases, 'projects' => array_merge($projects, $common), 'console' => array_merge($platform, $common), + 'logs' => $logs, ]; return $collections; diff --git a/app/config/collections/logs.php b/app/config/collections/logs.php new file mode 100644 index 0000000000..069dcf5a4b --- /dev/null +++ b/app/config/collections/logs.php @@ -0,0 +1,94 @@ + ID::custom(Database::METADATA), + '$id' => ID::custom('stats'), + 'name' => 'stats', + 'attributes' => [ + [ + '$id' => ID::custom('metric'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 255, + 'signed' => true, + 'required' => true, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('region'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 255, + 'signed' => true, + 'required' => true, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('value'), + 'type' => Database::VAR_INTEGER, + 'format' => '', + 'size' => 8, + 'signed' => true, + 'required' => true, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('time'), + 'type' => Database::VAR_DATETIME, + 'format' => '', + 'size' => 0, + 'signed' => false, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => ['datetime'], + ], + [ + '$id' => ID::custom('period'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 4, + 'signed' => true, + 'required' => true, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + ], + 'indexes' => [ + [ + '$id' => ID::custom('_key_time'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['time'], + 'lengths' => [], + 'orders' => [Database::ORDER_DESC], + ], + [ + '$id' => ID::custom('_key_period_time'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['period', 'time'], + 'lengths' => [], + 'orders' => [Database::ORDER_ASC], + ], + [ + '$id' => ID::custom('_key_metric_period_time'), + 'type' => Database::INDEX_UNIQUE, + 'attributes' => ['metric', 'period', 'time'], + 'lengths' => [], + 'orders' => [Database::ORDER_DESC], + ], + ], +]; + +return $logsCollection; diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index bd9562110f..21a04800b5 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -17,7 +17,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; use Appwrite\Hooks\Hooks; use Appwrite\Network\Validator\Email; @@ -2432,9 +2432,9 @@ App::post('/v1/account/tokens/phone') ->inject('queueForMessaging') ->inject('locale') ->inject('timelimit') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('plan') - ->action(function (string $userId, string $phone, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, Usage $queueForUsage, array $plan) { + ->action(function (string $userId, string $phone, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) { if (empty(System::getEnv('_APP_SMS_PROVIDER'))) { throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured'); } @@ -2583,11 +2583,11 @@ App::post('/v1/account/tokens/phone') $countryCode = $helper->parse($phone)->getCountryCode(); if (!empty($countryCode)) { - $queueForUsage + $queueForStatsUsage ->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1); } } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_AUTH_METHOD_PHONE, 1) ->setProject($project) ->trigger(); @@ -3678,9 +3678,9 @@ App::post('/v1/account/verification/phone') ->inject('project') ->inject('locale') ->inject('timelimit') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('plan') - ->action(function (Request $request, Response $response, Document $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, Usage $queueForUsage, array $plan) { + ->action(function (Request $request, Response $response, Document $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) { if (empty(System::getEnv('_APP_SMS_PROVIDER'))) { throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured'); } @@ -3775,11 +3775,11 @@ App::post('/v1/account/verification/phone') $countryCode = $helper->parse($phone)->getCountryCode(); if (!empty($countryCode)) { - $queueForUsage + $queueForStatsUsage ->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1); } } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_AUTH_METHOD_PHONE, 1) ->setProject($project) ->trigger(); @@ -4310,9 +4310,9 @@ App::post('/v1/account/mfa/challenge') ->inject('queueForMessaging') ->inject('queueForMails') ->inject('timelimit') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('plan') - ->action(function (string $factor, Response $response, Database $dbForProject, Document $user, Locale $locale, Document $project, Request $request, Event $queueForEvents, Messaging $queueForMessaging, Mail $queueForMails, callable $timelimit, Usage $queueForUsage, array $plan) { + ->action(function (string $factor, Response $response, Database $dbForProject, Document $user, Locale $locale, Document $project, Request $request, Event $queueForEvents, Messaging $queueForMessaging, Mail $queueForMails, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) { $expire = DateTime::addSeconds(new \DateTime(), Auth::TOKEN_EXPIRATION_CONFIRM); $code = Auth::codeGenerator(); @@ -4383,11 +4383,11 @@ App::post('/v1/account/mfa/challenge') $countryCode = $helper->parse($phone)->getCountryCode(); if (!empty($countryCode)) { - $queueForUsage + $queueForStatsUsage ->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1); } } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_AUTH_METHOD_PHONE, 1) ->setProject($project) ->trigger(); diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index df46c1890b..98ba59106d 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -4,7 +4,7 @@ use Appwrite\Auth\Auth; use Appwrite\Detector\Detector; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Event; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; use Appwrite\Network\Validator\Email; use Appwrite\SDK\AuthType; @@ -476,8 +476,8 @@ App::post('/v1/databases') ->inject('response') ->inject('dbForProject') ->inject('queueForEvents') - ->inject('queueForUsage') - ->action(function (string $databaseId, string $name, bool $enabled, Response $response, Database $dbForProject, Event $queueForEvents, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $name, bool $enabled, Response $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage) { $databaseId = $databaseId == 'unique()' ? ID::unique() : $databaseId; @@ -527,7 +527,7 @@ App::post('/v1/databases') } $queueForEvents->setParam('databaseId', $database->getId()); - $queueForUsage->addMetric(str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_STORAGE), 1); // per database + $queueForStatsUsage->addMetric(str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_STORAGE), 1); // per database $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -797,8 +797,8 @@ App::delete('/v1/databases/:databaseId') ->inject('dbForProject') ->inject('queueForDatabase') ->inject('queueForEvents') - ->inject('queueForUsage') - ->action(function (string $databaseId, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, StatsUsage $queueForStatsUsage) { $database = $dbForProject->getDocument('databases', $databaseId); @@ -821,7 +821,7 @@ App::delete('/v1/databases/:databaseId') ->setParam('databaseId', $database->getId()) ->setPayload($response->output($database, Response::MODEL_DATABASE)); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_STORAGE, 1); // Global, deletion forces full recalculation $response->noContent(); @@ -2618,8 +2618,8 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/attributes/:key ->inject('dbForProject') ->inject('queueForDatabase') ->inject('queueForEvents') - ->inject('queueForUsage') - ->action(function (string $databaseId, string $collectionId, string $key, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $collectionId, string $key, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, StatsUsage $queueForStatsUsage) { $db = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId)); @@ -2716,7 +2716,7 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/attributes/:key ->setContext('database', $db) ->setPayload($response->output($attribute, $model)); - $queueForUsage + $queueForStatsUsage ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$db->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection $response->noContent(); @@ -3134,9 +3134,9 @@ App::post('/v1/databases/:databaseId/collections/:collectionId/documents') ->inject('dbForProject') ->inject('user') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('mode') - ->action(function (string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, Response $response, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode) { + ->action(function (string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, Response $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, string $mode) { $data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array @@ -3339,7 +3339,7 @@ App::post('/v1/databases/:databaseId/collections/:collectionId/documents') $processDocument($collection, $document); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $operations) ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), $operations) ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection @@ -3392,8 +3392,8 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents') ->inject('response') ->inject('dbForProject') ->inject('mode') - ->inject('queueForUsage') - ->action(function (string $databaseId, string $collectionId, array $queries, Response $response, Database $dbForProject, string $mode, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $collectionId, array $queries, Response $response, Database $dbForProject, string $mode, StatsUsage $queueForStatsUsage) { $database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId)); $isAPIKey = Auth::isAppUser(Authorization::getRoles()); $isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles()); @@ -3505,7 +3505,7 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents') $processDocument($collection, $document); } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_READS, $operations) ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations) ; @@ -3571,8 +3571,8 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen ->inject('response') ->inject('dbForProject') ->inject('mode') - ->inject('queueForUsage') - ->action(function (string $databaseId, string $collectionId, string $documentId, array $queries, Response $response, Database $dbForProject, string $mode, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $collectionId, string $documentId, array $queries, Response $response, Database $dbForProject, string $mode, StatsUsage $queueForStatsUsage) { $database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId)); $isAPIKey = Auth::isAppUser(Authorization::getRoles()); $isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles()); @@ -3648,7 +3648,7 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen $processDocument($collection, $document); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_READS, $operations) ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations) ; @@ -3805,8 +3805,8 @@ App::patch('/v1/databases/:databaseId/collections/:collectionId/documents/:docum ->inject('dbForProject') ->inject('queueForEvents') ->inject('mode') - ->inject('queueForUsage') - ->action(function (string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, string $mode, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, string $mode, StatsUsage $queueForStatsUsage) { $data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array @@ -3946,7 +3946,7 @@ App::patch('/v1/databases/:databaseId/collections/:collectionId/documents/:docum $setCollection($collection, $newDocument); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $operations) ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), $operations) ; @@ -4058,9 +4058,9 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/documents/:docu ->inject('response') ->inject('dbForProject') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('mode') - ->action(function (string $databaseId, string $collectionId, string $documentId, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, Usage $queueForUsage, string $mode) { + ->action(function (string $databaseId, string $collectionId, string $documentId, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage, string $mode) { $database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId)); $isAPIKey = Auth::isAppUser(Authorization::getRoles()); @@ -4129,7 +4129,7 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/documents/:docu $processDocument($collection, $document); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, 1) ->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), 1) ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 3d9240d494..14255ef7a4 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -6,7 +6,7 @@ use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; @@ -1900,10 +1900,10 @@ App::post('/v1/functions/:functionId/executions') ->inject('dbForPlatform') ->inject('user') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('geodb') - ->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) { + ->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb) { $async = \strval($async) === 'true' || \strval($async) === '1'; if (!$async && !is_null($scheduledAt)) { @@ -2230,7 +2230,7 @@ App::post('/v1/functions/:functionId/executions') throw $th; } } finally { - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_EXECUTIONS, 1) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1) ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000)) // per project diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 305444abc5..e5336067c8 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -692,15 +692,15 @@ App::get('/v1/health/queue/functions') $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); }, ['response']); -App::get('/v1/health/queue/usage') - ->desc('Get usage queue') +App::get('/v1/health/queue/stats-resources') + ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( auth: [AuthType::KEY], namespace: 'health', - name: 'getQueueUsage', - description: '/docs/references/health/get-queue-usage.md', + name: 'getQueueStatsResources', + description: '/docs/references/health/get-queue-stats-resources.md', responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -715,7 +715,7 @@ App::get('/v1/health/queue/usage') ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::USAGE_QUEUE_NAME)); + $size = $publisher->getQueueSize(new Queue(Event::STATS_RESOURCES_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -724,15 +724,15 @@ App::get('/v1/health/queue/usage') $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); }); -App::get('/v1/health/queue/usage-dump') - ->desc('Get usage dump queue') +App::get('/v1/health/queue/stats-usage') + ->desc('Get stats usage queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( auth: [AuthType::KEY], namespace: 'health', - name: 'getQueueUsageDump', - description: '/docs/references/health/get-queue-usage-dump.md', + name: 'getQueueUsage', + description: '/docs/references/health/get-queue-stats-usage.md', responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -747,7 +747,39 @@ App::get('/v1/health/queue/usage-dump') ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::USAGE_DUMP_QUEUE_NAME)); + $size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_QUEUE_NAME)); + + if ($size >= $threshold) { + throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); + } + + $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); + }); + +App::get('/v1/health/queue/stats-usage-dump') + ->desc('Get usage dump queue') + ->groups(['api', 'health']) + ->label('scope', 'health.read') + ->label('sdk', new Method( + auth: [AuthType::KEY], + namespace: 'health', + name: 'getQueueStatsUsageDump', + description: '/docs/references/health/get-queue-stats-usage-dump.md', + responses: [ + new SDKResponse( + code: Response::STATUS_CODE_OK, + model: Response::MODEL_HEALTH_QUEUE, + ) + ], + contentType: ContentType::JSON + )) + ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) + ->inject('publisher') + ->inject('response') + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + $threshold = \intval($threshold); + + $size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_DUMP_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -920,8 +952,9 @@ App::get('/v1/health/queue/failed/:name') Event::AUDITS_QUEUE_NAME, Event::MAILS_QUEUE_NAME, Event::FUNCTIONS_QUEUE_NAME, - Event::USAGE_QUEUE_NAME, - Event::USAGE_DUMP_QUEUE_NAME, + Event::STATS_RESOURCES_QUEUE_NAME, + Event::STATS_USAGE_QUEUE_NAME, + Event::STATS_USAGE_DUMP_QUEUE_NAME, Event::WEBHOOK_QUEUE_NAME, Event::CERTIFICATES_QUEUE_NAME, Event::BUILDS_QUEUE_NAME, diff --git a/app/controllers/api/storage.php b/app/controllers/api/storage.php index b5ddc94c9d..f180c22acf 100644 --- a/app/controllers/api/storage.php +++ b/app/controllers/api/storage.php @@ -6,7 +6,7 @@ use Appwrite\Auth\Auth; use Appwrite\ClamAV\Network; use Appwrite\Event\Delete; use Appwrite\Event\Event; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; use Appwrite\OpenSSL\OpenSSL; use Appwrite\SDK\AuthType; @@ -942,8 +942,8 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/preview') ->inject('mode') ->inject('deviceForFiles') ->inject('deviceForLocal') - ->inject('queueForUsage') - ->action(function (string $bucketId, string $fileId, int $width, int $height, string $gravity, int $quality, int $borderWidth, string $borderColor, int $borderRadius, float $opacity, int $rotation, string $background, string $output, Request $request, Response $response, Document $project, Database $dbForProject, string $mode, Device $deviceForFiles, Device $deviceForLocal, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (string $bucketId, string $fileId, int $width, int $height, string $gravity, int $quality, int $borderWidth, string $borderColor, int $borderRadius, float $opacity, int $rotation, string $background, string $output, Request $request, Response $response, Document $project, Database $dbForProject, string $mode, Device $deviceForFiles, Device $deviceForLocal, StatsUsage $queueForStatsUsage) { if (!\extension_loaded('imagick')) { throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Imagick extension is missing'); @@ -1071,7 +1071,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/preview') $contentType = (\array_key_exists($output, $outputs)) ? $outputs[$output] : $outputs['jpg']; - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_FILES_TRANSFORMATIONS, 1) ->addMetric(str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_TRANSFORMATIONS), 1) ; diff --git a/app/controllers/api/teams.php b/app/controllers/api/teams.php index 7f988d726f..18faaeceeb 100644 --- a/app/controllers/api/teams.php +++ b/app/controllers/api/teams.php @@ -8,7 +8,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; use Appwrite\Network\Validator\Email; use Appwrite\Platform\Workers\Deletes; @@ -466,9 +466,9 @@ App::post('/v1/teams/:teamId/memberships') ->inject('queueForMessaging') ->inject('queueForEvents') ->inject('timelimit') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('plan') - ->action(function (string $teamId, string $email, string $userId, string $phone, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, Messaging $queueForMessaging, Event $queueForEvents, callable $timelimit, Usage $queueForUsage, array $plan) { + ->action(function (string $teamId, string $email, string $userId, string $phone, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, Messaging $queueForMessaging, Event $queueForEvents, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) { $isAppUser = Auth::isAppUser(Authorization::getRoles()); $isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles()); @@ -757,11 +757,11 @@ App::post('/v1/teams/:teamId/memberships') $countryCode = $helper->parse($phone)->getCountryCode(); if (!empty($countryCode)) { - $queueForUsage + $queueForStatsUsage ->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1); } } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_AUTH_METHOD_PHONE, 1) ->setProject($project) ->trigger(); diff --git a/app/controllers/general.php b/app/controllers/general.php index 7e691d033f..1d56e79b74 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -7,7 +7,7 @@ use Appwrite\Auth\Auth; use Appwrite\Event\Certificate; use Appwrite\Event\Event; use Appwrite\Event\Func; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Network\Validator\Origin; use Appwrite\SDK\AuthType; @@ -50,7 +50,7 @@ Config::setParam('domainVerification', false); Config::setParam('cookieDomain', 'localhost'); Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE); -function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) +function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $utopia->getRoute()?->label('error', __DIR__ . '/../views/general/error.phtml'); @@ -434,7 +434,7 @@ function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, Sw $fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size']; } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_NETWORK_REQUESTS, 1) ->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize()) @@ -499,13 +499,13 @@ App::init() ->inject('localeCodes') ->inject('clients') ->inject('geodb') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForEvents') ->inject('queueForCertificates') ->inject('queueForFunctions') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, Usage $queueForUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) { /* * Appwrite Router */ @@ -513,7 +513,7 @@ App::init() $mainDomain = System::getEnv('_APP_DOMAIN', ''); // Only run Router when external domain if ($host !== $mainDomain || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { return; } } @@ -732,12 +732,12 @@ App::options() ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { /* * Appwrite Router */ @@ -745,7 +745,7 @@ App::options() $mainDomain = System::getEnv('_APP_DOMAIN', ''); // Only run Router when external domain if ($host !== $mainDomain || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { return; } } @@ -770,8 +770,8 @@ App::error() ->inject('project') ->inject('logger') ->inject('log') - ->inject('queueForUsage') - ->action(function (Throwable $error, App $utopia, Request $request, Response $response, Document $project, ?Logger $logger, Log $log, Usage $queueForUsage) { + ->inject('queueForStatsUsage') + ->action(function (Throwable $error, App $utopia, Request $request, Response $response, Document $project, ?Logger $logger, Log $log, StatsUsage $queueForStatsUsage) { $version = System::getEnv('_APP_VERSION', 'UNKNOWN'); $route = $utopia->getRoute(); $class = \get_class($error); @@ -882,13 +882,13 @@ App::error() $fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size']; } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_NETWORK_REQUESTS, 1) ->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize()); } - $queueForUsage + $queueForStatsUsage ->setProject($project) ->trigger(); } @@ -1040,12 +1040,12 @@ App::get('/robots.txt') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $host = $request->getHostname() ?? ''; $mainDomain = System::getEnv('_APP_DOMAIN', ''); @@ -1053,7 +1053,7 @@ App::get('/robots.txt') $template = new View(__DIR__ . '/../views/general/robots.phtml'); $response->text($template->render(false)); } else { - router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); + router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); } }); @@ -1068,12 +1068,12 @@ App::get('/humans.txt') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $host = $request->getHostname() ?? ''; $mainDomain = System::getEnv('_APP_DOMAIN', ''); @@ -1081,7 +1081,7 @@ App::get('/humans.txt') $template = new View(__DIR__ . '/../views/general/humans.phtml'); $response->text($template->render(false)); } else { - router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); + router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); } }); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index bce36e9303..bcee2550c2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -12,7 +12,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Messaging; use Appwrite\Event\Realtime; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; @@ -90,7 +90,7 @@ $eventDatabaseListener = function (Document $project, Document $document, Respon } }; -$usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { +$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { $value = 1; if ($event === Database::EVENT_DOCUMENT_DELETE) { $value = -1; @@ -98,40 +98,40 @@ $usageDatabaseListener = function (string $event, Document $document, Usage $que switch (true) { case $document->getCollection() === 'teams': - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_TEAMS, $value); // per project break; case $document->getCollection() === 'users': - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_USERS, $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; case $document->getCollection() === 'sessions': // sessions - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_SESSIONS, $value); //per project break; case $document->getCollection() === 'databases': // databases - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DATABASES, $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections $parts = explode('_', $document->getCollection()); $databaseInternalId = $parts[1] ?? 0; - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_COLLECTIONS, $value) // per project ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value) ; if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; @@ -139,39 +139,39 @@ $usageDatabaseListener = function (string $event, Document $document, Usage $que $parts = explode('_', $document->getCollection()); $databaseInternalId = $parts[1] ?? 0; $collectionInternalId = $parts[3] ?? 0; - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DOCUMENTS, $value) // per project ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection break; case $document->getCollection() === 'buckets': //buckets - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_BUCKETS, $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; case str_starts_with($document->getCollection(), 'bucket_'): // files $parts = explode('_', $document->getCollection()); $bucketInternalId = $parts[1]; - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_FILES, $value) // per project ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket break; case $document->getCollection() === 'functions': - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_FUNCTIONS, $value); // per project if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForUsage + $queueForStatsUsage ->addReduce($document); } break; case $document->getCollection() === 'deployments': - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_DEPLOYMENTS, $value) // per project ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_FUNCTION_ID_DEPLOYMENTS), $value) // per function @@ -436,11 +436,11 @@ App::init() ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('queueForBuilds') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('dbForProject') ->inject('timelimit') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -550,8 +550,8 @@ App::init() $queueForRealtime = new Realtime(); $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( $project, $document, @@ -694,7 +694,7 @@ App::shutdown() ->inject('user') ->inject('queueForEvents') ->inject('queueForAudits') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('queueForBuilds') @@ -703,7 +703,7 @@ App::shutdown() ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -860,13 +860,13 @@ App::shutdown() $fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size']; } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_NETWORK_REQUESTS, 1) ->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize) ->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize()); } - $queueForUsage + $queueForStatsUsage ->setProject($project) ->trigger(); } diff --git a/app/http.php b/app/http.php index 608ac2ec12..b9aa69a7cc 100644 --- a/app/http.php +++ b/app/http.php @@ -17,7 +17,6 @@ use Utopia\Config\Config; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; -use Utopia\Database\Exception\Duplicate; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; @@ -156,6 +155,79 @@ function dispatch(Server $server, int $fd, int $type, $data = null): int include __DIR__ . '/controllers/general.php'; +function createDatabase(App $app, string $resourceKey, string $dbName, array $collections, mixed $pools, callable $extraSetup = null): void +{ + $max = 10; + $sleep = 1; + $attempts = 0; + + do { + try { + $attempts++; + $resource = $app->getResource($resourceKey); + /* @var $database Database */ + $database = is_callable($resource) ? $resource() : $resource; + break; // exit loop on success + } catch (\Exception $e) { + Console::warning(" └── Database not ready. Retrying connection ({$attempts})..."); + $pools->reclaim(); + if ($attempts >= $max) { + throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage()); + } + sleep($sleep); + } + } while ($attempts < $max); + + Console::success("[Setup] - $dbName database init started..."); + + // Attempt to create the database + try { + Console::info(" └── Creating database: $dbName..."); + $database->create(); + } catch (\Exception $e) { + Console::info(" └── Skip: metadata table already exists"); + } + + // Process collections + foreach ($collections as $key => $collection) { + if (($collection['$collection'] ?? '') !== Database::METADATA) { + continue; + } + + if (!$database->getCollection($key)->isEmpty()) { + continue; + } + + Console::info(" └── Creating collection: {$collection['$id']}..."); + + $attributes = array_map(fn ($attr) => new Document([ + '$id' => ID::custom($attr['$id']), + 'type' => $attr['type'], + 'size' => $attr['size'], + 'required' => $attr['required'], + 'signed' => $attr['signed'], + 'array' => $attr['array'], + 'filters' => $attr['filters'], + 'default' => $attr['default'] ?? null, + 'format' => $attr['format'] ?? '' + ]), $collection['attributes']); + + $indexes = array_map(fn ($index) => new Document([ + '$id' => ID::custom($index['$id']), + 'type' => $index['type'], + 'attributes' => $index['attributes'], + 'lengths' => $index['lengths'], + 'orders' => $index['orders'], + ]), $collection['indexes']); + + $database->createCollection($key, $attributes, $indexes); + } + + if ($extraSetup) { + $extraSetup($database); + } +} + $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $register) { $app = new App('UTC'); @@ -164,140 +236,75 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg /** @var Group $pools */ App::setResource('pools', fn () => $pools); - // wait for database to be ready - $attempts = 0; - $max = 10; - $sleep = 1; - - do { - try { - $attempts++; - $dbForPlatform = $app->getResource('dbForPlatform'); - /** @var Utopia\Database\Database $dbForPlatform */ - break; // leave the do-while if successful - } catch (\Throwable $e) { - Console::warning("Database not ready. Retrying connection ({$attempts})..."); - if ($attempts >= $max) { - throw new \Exception('Failed to connect to database: ' . $e->getMessage()); - } - sleep($sleep); - } - } while ($attempts < $max); - - Console::success('[Setup] - Server database init started...'); - - try { - Console::success('[Setup] - Creating console database...'); - $dbForPlatform->create(); - } catch (Duplicate) { - Console::success('[Setup] - Skip: metadata table already exists'); - } - - if ($dbForPlatform->getCollection(Audit::COLLECTION)->isEmpty()) { - $audit = new Audit($dbForPlatform); - $audit->setup(); - } - /** @var array $collections */ $collections = Config::getParam('collections', []); - $consoleCollections = $collections['console']; - foreach ($consoleCollections as $key => $collection) { - if (($collection['$collection'] ?? '') !== Database::METADATA) { - continue; - } - if (!$dbForPlatform->getCollection($key)->isEmpty()) { - continue; + + // create logs database first, `getLogsDB` is a callable. + createDatabase($app, 'getLogsDB', 'logs', $collections['logs'], $pools); + + // create appwrite database, `dbForPlatform` is a direct access call. + createDatabase($app, 'dbForPlatform', 'appwrite', $collections['console'], $pools, function (Database $dbForPlatform) use ($collections) { + if ($dbForPlatform->getCollection(Audit::COLLECTION)->isEmpty()) { + $audit = new Audit($dbForPlatform); + $audit->setup(); } - Console::success('[Setup] - Creating console collection: ' . $collection['$id'] . '...'); + if ($dbForPlatform->getDocument('buckets', 'default')->isEmpty() && + !$dbForPlatform->exists($dbForPlatform->getDatabase(), 'bucket_1')) { + Console::info(" └── Creating default bucket..."); + $dbForPlatform->createDocument('buckets', new Document([ + '$id' => ID::custom('default'), + '$collection' => ID::custom('buckets'), + 'name' => 'Default', + 'maximumFileSize' => (int) System::getEnv('_APP_STORAGE_LIMIT', 0), + 'allowedFileExtensions' => [], + 'enabled' => true, + 'compression' => 'gzip', + 'encryption' => true, + 'antivirus' => true, + 'fileSecurity' => true, + '$permissions' => [ + Permission::create(Role::any()), + Permission::read(Role::any()), + Permission::update(Role::any()), + Permission::delete(Role::any()), + ], + 'search' => 'buckets Default', + ])); - $attributes = \array_map(fn ($attribute) => new Document($attribute), $collection['attributes']); - $indexes = \array_map(fn (array $index) => new Document($index), $collection['indexes']); + $bucket = $dbForPlatform->getDocument('buckets', 'default'); - $dbForPlatform->createCollection($key, $attributes, $indexes); - } - - if ($dbForPlatform->getDocument('buckets', 'default')->isEmpty() && !$dbForPlatform->exists($dbForPlatform->getDatabase(), 'bucket_1')) { - Console::success('[Setup] - Creating default bucket...'); - $dbForPlatform->createDocument('buckets', new Document([ - '$id' => ID::custom('default'), - '$collection' => ID::custom('buckets'), - 'name' => 'Default', - 'maximumFileSize' => (int) System::getEnv('_APP_STORAGE_LIMIT', 0), // 10MB - 'allowedFileExtensions' => [], - 'enabled' => true, - 'compression' => 'gzip', - 'encryption' => true, - 'antivirus' => true, - 'fileSecurity' => true, - '$permissions' => [ - Permission::create(Role::any()), - Permission::read(Role::any()), - Permission::update(Role::any()), - Permission::delete(Role::any()), - ], - 'search' => 'buckets Default', - ])); - - $bucket = $dbForPlatform->getDocument('buckets', 'default'); - - Console::success('[Setup] - Creating files collection for default bucket...'); - $files = $collections['buckets']['files'] ?? []; - if (empty($files)) { - throw new Exception('Files collection is not configured.'); - } - - $attributes = \array_map(fn ($attribute) => new Document($attribute), $files['attributes']); - $indexes = \array_map(fn (array $index) => new Document($index), $files['indexes']); - - $dbForPlatform->createCollection('bucket_' . $bucket->getInternalId(), $attributes, $indexes); - } - - $projectCollections = $collections['projects']; - $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); - $sharedTablesV1 = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES_V1', '')); - $sharedTablesV2 = \array_diff($sharedTables, $sharedTablesV1); - - $cache = $app->getResource('cache'); - - foreach ($sharedTablesV2 as $hostname) { - $adapter = $pools - ->get($hostname) - ->pop() - ->getResource(); - - $dbForProject = (new Database($adapter, $cache)) - ->setDatabase('appwrite') - ->setSharedTables(true) - ->setTenant(null) - ->setNamespace(System::getEnv('_APP_DATABASE_SHARED_NAMESPACE', '')); - - try { - Console::success('[Setup] - Creating project database: ' . $hostname . '...'); - $dbForProject->create(); - } catch (Duplicate) { - Console::success('[Setup] - Skip: metadata table already exists'); - } - - foreach ($projectCollections as $key => $collection) { - if (($collection['$collection'] ?? '') !== Database::METADATA) { - continue; - } - if (!$dbForProject->getCollection($key)->isEmpty()) { - continue; + Console::info(" └── Creating files collection for default bucket..."); + $files = $collections['buckets']['files'] ?? []; + if (empty($files)) { + throw new Exception('Files collection is not configured.'); } - $attributes = \array_map(fn ($attribute) => new Document($attribute), $collection['attributes']); - $indexes = \array_map(fn (array $index) => new Document($index), $collection['indexes']); + $attributes = array_map(fn ($attr) => new Document([ + '$id' => ID::custom($attr['$id']), + 'type' => $attr['type'], + 'size' => $attr['size'], + 'required' => $attr['required'], + 'signed' => $attr['signed'], + 'array' => $attr['array'], + 'filters' => $attr['filters'], + 'default' => $attr['default'] ?? null, + 'format' => $attr['format'] ?? '' + ]), $files['attributes']); - Console::success('[Setup] - Creating project collection: ' . $collection['$id'] . '...'); + $indexes = array_map(fn ($index) => new Document([ + '$id' => ID::custom($index['$id']), + 'type' => $index['type'], + 'attributes' => $index['attributes'], + 'lengths' => $index['lengths'], + 'orders' => $index['orders'], + ]), $files['indexes']); - $dbForProject->createCollection($key, $attributes, $indexes); + $dbForPlatform->createCollection('bucket_' . $bucket->getInternalId(), $attributes, $indexes); } - } + }); $pools->reclaim(); - Console::success('[Setup] - Server database init completed...'); }); diff --git a/app/init.php b/app/init.php index 941e253260..0c6746de3c 100644 --- a/app/init.php +++ b/app/init.php @@ -32,7 +32,7 @@ use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; use Appwrite\Event\Realtime; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\Specification; @@ -237,8 +237,6 @@ const METRIC_WEBHOOKS_SENT = 'webhooks.events.sent'; const METRIC_WEBHOOKS_FAILED = 'webhooks.events.failed'; const METRIC_WEBHOOK_ID_SENT = '{webhookInternalId}.webhooks.events.sent'; const METRIC_WEBHOOK_ID_FAILED = '{webhookInternalId}.webhooks.events.failed'; - - const METRIC_AUTH_METHOD_PHONE = 'auth.method.phone'; const METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE = METRIC_AUTH_METHOD_PHONE . '.{countryCode}'; const METRIC_MESSAGES = 'messages'; @@ -269,6 +267,8 @@ const METRIC_FILES = 'files'; const METRIC_FILES_STORAGE = 'files.storage'; const METRIC_FILES_TRANSFORMATIONS = 'files.transformations'; const METRIC_BUCKET_ID_FILES_TRANSFORMATIONS = '{bucketInternalId}.files.transformations'; +const METRIC_FILES_IMAGES_TRANSFORMED = 'files.imagesTransformed'; +const METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED = '{bucketInternalId}.files.imagesTransformed'; const METRIC_BUCKET_ID_FILES = '{bucketInternalId}.files'; const METRIC_BUCKET_ID_FILES_STORAGE = '{bucketInternalId}.files.storage'; const METRIC_FUNCTIONS = 'functions'; @@ -301,6 +301,18 @@ const METRIC_FUNCTION_ID_EXECUTIONS_MB_SECONDS = '{functionInternalId}.execution const METRIC_NETWORK_REQUESTS = 'network.requests'; const METRIC_NETWORK_INBOUND = 'network.inbound'; const METRIC_NETWORK_OUTBOUND = 'network.outbound'; +const METRIC_MAU = 'users.mau'; +const METRIC_DAU = 'users.dau'; +const METRIC_WAU = 'users.wau'; +const METRIC_WEBHOOKS = 'webhooks'; +const METRIC_PLATFORMS = 'platforms'; +const METRIC_PROVIDERS = 'providers'; +const METRIC_TOPICS = 'topics'; +const METRIC_KEYS = 'keys'; +const METRIC_RESOURCE_TYPE_ID_BUILDS = '{resourceType}.{resourceInternalId}.builds'; +const METRIC_RESOURCE_TYPE_ID_BUILDS_STORAGE = '{resourceType}.{resourceInternalId}.builds.storage'; +const METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS = '{resourceType}.{resourceInternalId}.deployments'; +const METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE = '{resourceType}.{resourceInternalId}.deployments.storage'; // Resource types @@ -1176,6 +1188,9 @@ App::setResource('queueForWebhooks', function (Queue\Publisher $publisher) { App::setResource('queueForRealtime', function () { return new Realtime(); }, []); +App::setResource('queueForStatsUsage', function (Queue\Publisher $publisher) { + return new StatsUsage($publisher); +}, ['publisher']); App::setResource('queueForAudits', function (Queue\Publisher $publisher) { return new Audit($publisher); }, ['publisher']); @@ -1546,6 +1561,39 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform }; }, ['pools', 'dbForPlatform', 'cache']); +App::setResource('getLogsDB', function (Group $pools, Cache $cache) { + $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { + if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + return $database; + } + + $dbAdapter = $pools + ->get('logs') + ->pop() + ->getResource(); + + $database = new Database( + $dbAdapter, + $cache + ); + + $database + ->setSharedTables(true) + ->setNamespace('logsV1') + ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS) + ->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES); + + // set tenant + if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + } + + return $database; + }; +}, ['pools', 'cache']); + App::setResource('cache', function (Group $pools) { $list = Config::getParam('pools-cache', []); $adapters = []; diff --git a/app/views/install/compose.phtml b/app/views/install/compose.phtml index ad6d883c4c..d8f3f649e0 100644 --- a/app/views/install/compose.phtml +++ b/app/views/install/compose.phtml @@ -649,10 +649,69 @@ $image = $this->getParam('image', ''); - _APP_MAINTENANCE_RETENTION_USAGE_HOURLY - _APP_MAINTENANCE_RETENTION_SCHEDULES - appwrite-worker-usage: + appwrite-task-stats-resources: image: /: - entrypoint: worker-usage - container_name: appwrite-worker-usage + container_name: appwrite-task-stats-resources + entrypoint: stats-resources + <<: *x-logging + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_DATABASE_SHARED_TABLES + - _APP_STATS_RESOURCES_INTERVAL + + appwrite-worker-stats-resources: + image: /: + entrypoint: worker-stats-resources + container_name: appwrite-worker-stats-resources + <<: *x-logging + restart: unless-stopped + networks: + - appwrite + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_STATS_RESOURCES_INTERVAL + + appwrite-worker-stats-usage: + image: /: + entrypoint: worker-stats-usage + container_name: appwrite-worker-stats-usage <<: *x-logging restart: unless-stopped networks: @@ -677,11 +736,11 @@ $image = $this->getParam('image', ''); - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL - appwrite-worker-usage-dump: + appwrite-worker-stats-usage-dump: image: /: - entrypoint: worker-usage-dump + entrypoint: worker-stats-usage-dump <<: *x-logging - container_name: appwrite-worker-usage-dump + container_name: appwrite-worker-stats-usage-dump restart: unless-stopped networks: - appwrite diff --git a/app/worker.php b/app/worker.php index 854d9cb8ae..605474e9f1 100644 --- a/app/worker.php +++ b/app/worker.php @@ -13,8 +13,12 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\StatsUsage; +use Appwrite\Event\StatsUsageDump; +/** remove */ use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; +/** /remove */ use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; @@ -173,6 +177,39 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf }; }, ['pools', 'dbForPlatform', 'cache']); +Server::setResource('getLogsDB', function (Group $pools, Cache $cache) { + $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { + if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + return $database; + } + + $dbAdapter = $pools + ->get('logs') + ->pop() + ->getResource(); + + $database = new Database( + $dbAdapter, + $cache + ); + + $database + ->setSharedTables(true) + ->setNamespace('logsV1') + ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS) + ->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES); + + // set tenant + if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') { + $database->setTenant($project->getInternalId()); + } + + return $database; + }; +}, ['pools', 'cache']); + Server::setResource('abuseRetention', function () { return time() - (int) System::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400); }); @@ -240,6 +277,14 @@ Server::setResource('queueForUsageDump', function (Publisher $publisher) { return new UsageDump($publisher); }, ['publisher']); +Server::setResource('queueForStatsUsage', function (Publisher $publisher) { + return new StatsUsage($publisher); +}, ['publisher']); + +Server::setResource('queueForStatsUsageDump', function (Publisher $publisher) { + return new StatsUsageDump($publisher); +}, ['publisher']); + Server::setResource('queueForDatabase', function (Publisher $publisher) { return new EventDatabase($publisher); }, ['publisher']); diff --git a/bin/stats-resources b/bin/stats-resources new file mode 100644 index 0000000000..3104bab896 --- /dev/null +++ b/bin/stats-resources @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/cli.php stats-resources $@ \ No newline at end of file diff --git a/bin/worker-stats-resources b/bin/worker-stats-resources new file mode 100644 index 0000000000..9c5d2bebff --- /dev/null +++ b/bin/worker-stats-resources @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php stats-resources $@ \ No newline at end of file diff --git a/bin/worker-stats-usage b/bin/worker-stats-usage new file mode 100644 index 0000000000..2c267d805e --- /dev/null +++ b/bin/worker-stats-usage @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php stats-usage $@ \ No newline at end of file diff --git a/bin/worker-stats-usage-dump b/bin/worker-stats-usage-dump new file mode 100644 index 0000000000..98e3c2cac7 --- /dev/null +++ b/bin/worker-stats-usage-dump @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php stats-usage-dump $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 2fb19f7126..5391bbe397 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -722,10 +722,72 @@ services: - _APP_MAINTENANCE_DELAY - _APP_DATABASE_SHARED_TABLES - appwrite-worker-usage: - entrypoint: worker-usage + appwrite-task-stats-resources: + container_name: appwrite-task-stats-resources + entrypoint: stats-resources <<: *x-logging - container_name: appwrite-worker-usage + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_DATABASE_SHARED_TABLES + - _APP_STATS_RESOURCES_INTERVAL + + appwrite-worker-stats-resources: + entrypoint: worker-stats-resources + <<: *x-logging + container_name: appwrite-worker-stats-resources + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + - _APP_DATABASE_SHARED_TABLES + + appwrite-worker-stats-usage: + entrypoint: worker-stats-usage + <<: *x-logging + container_name: appwrite-worker-stats-usage image: appwrite-dev networks: - appwrite @@ -753,10 +815,10 @@ services: - _APP_USAGE_AGGREGATION_INTERVAL - _APP_DATABASE_SHARED_TABLES - appwrite-worker-usage-dump: - entrypoint: worker-usage-dump + appwrite-worker-stats-usage-dump: + entrypoint: worker-stats-usage-dump <<: *x-logging - container_name: appwrite-worker-usage-dump + container_name: appwrite-worker-stats-usage-dump image: appwrite-dev networks: - appwrite @@ -783,7 +845,8 @@ services: - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL - _APP_DATABASE_SHARED_TABLES - + - _APP_STATS_USAGE_DUAL_WRITING_DBS + appwrite-task-scheduler-functions: entrypoint: schedule-functions <<: *x-logging diff --git a/docs/references/health/get-queue-stats-resources.md b/docs/references/health/get-queue-stats-resources.md new file mode 100644 index 0000000000..5221327467 --- /dev/null +++ b/docs/references/health/get-queue-stats-resources.md @@ -0,0 +1 @@ +Get the number of metrics that are waiting to be processed in the Appwrite stats resources queue. \ No newline at end of file diff --git a/docs/references/health/get-queue-usage-dump.md b/docs/references/health/get-queue-stats-usage-dump.md similarity index 100% rename from docs/references/health/get-queue-usage-dump.md rename to docs/references/health/get-queue-stats-usage-dump.md diff --git a/docs/references/health/get-queue-usage.md b/docs/references/health/get-queue-stats-usage.md similarity index 100% rename from docs/references/health/get-queue-usage.md rename to docs/references/health/get-queue-stats-usage.md diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index f56aeeb757..0edffdf4dc 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -24,11 +24,22 @@ class Event public const FUNCTIONS_QUEUE_NAME = 'v1-functions'; public const FUNCTIONS_CLASS_NAME = 'FunctionsV1'; + /** remove */ public const USAGE_QUEUE_NAME = 'v1-usage'; public const USAGE_CLASS_NAME = 'UsageV1'; public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump'; public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1'; + /** /remove */ + + public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources'; + public const STATS_RESOURCES_CLASS_NAME = 'StatsResourcesV1'; + + public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage'; + public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1'; + + public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump'; + public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1'; public const WEBHOOK_QUEUE_NAME = 'v1-webhooks'; public const WEBHOOK_CLASS_NAME = 'WebhooksV1'; diff --git a/src/Appwrite/Event/StatsResources.php b/src/Appwrite/Event/StatsResources.php new file mode 100644 index 0000000000..e7a3df97e0 --- /dev/null +++ b/src/Appwrite/Event/StatsResources.php @@ -0,0 +1,29 @@ +setQueue(Event::STATS_RESOURCES_QUEUE_NAME) + ->setClass(Event::STATS_RESOURCES_CLASS_NAME); + } + + /** + * Prepare the payload for the usage event. + * + * @return array + */ + protected function preparePayload(): array + { + return [ + 'project' => $this->project + ]; + } +} diff --git a/src/Appwrite/Event/StatsUsage.php b/src/Appwrite/Event/StatsUsage.php new file mode 100644 index 0000000000..bed25419f6 --- /dev/null +++ b/src/Appwrite/Event/StatsUsage.php @@ -0,0 +1,65 @@ +setQueue(Event::STATS_USAGE_QUEUE_NAME) + ->setClass(Event::STATS_USAGE_CLASS_NAME); + } + + /** + * Add reduce. + * + * @param Document $document + * @return self + */ + public function addReduce(Document $document): self + { + $this->reduce[] = $document; + + return $this; + } + + /** + * Add metric. + * + * @param string $key + * @param int $value + * @return self + */ + public function addMetric(string $key, int $value): self + { + $this->metrics[] = [ + 'key' => $key, + 'value' => $value, + ]; + + return $this; + } + + /** + * Prepare the payload for the event + * + * @return array + */ + protected function preparePayload(): array + { + return [ + 'project' => $this->getProject(), + 'reduce' => $this->reduce, + 'metrics' => $this->metrics, + ]; + } +} diff --git a/src/Appwrite/Event/StatsUsageDump.php b/src/Appwrite/Event/StatsUsageDump.php new file mode 100644 index 0000000000..0573a88040 --- /dev/null +++ b/src/Appwrite/Event/StatsUsageDump.php @@ -0,0 +1,44 @@ +setQueue(Event::STATS_USAGE_DUMP_QUEUE_NAME) + ->setClass(Event::STATS_USAGE_DUMP_CLASS_NAME); + } + + /** + * Add Stats. + * + * @param array $stats + * @return self + */ + public function setStats(array $stats): self + { + $this->stats = $stats; + + return $this; + } + + /** + * Prepare the payload for the usage dump event. + * + * @return array + */ + protected function preparePayload(): array + { + return [ + 'stats' => $this->stats, + ]; + } +} diff --git a/src/Appwrite/Platform/Action.php b/src/Appwrite/Platform/Action.php new file mode 100644 index 0000000000..72c41582ea --- /dev/null +++ b/src/Appwrite/Platform/Action.php @@ -0,0 +1,90 @@ +disableValidation(); + $results = $database->find($collection, $newQueries); + $database->enableValidation(); + } catch (\Exception $e) { + if (!empty($this->logError)) { + call_user_func_array($this->logError, [$e, "CLI", "fetch_documents_namespace_{$database->getNamespace()}_collection{$collection}"]); + } + } + + if (empty($results)) { + return; + } + + $sum = count($results); + + if ($concurrent) { + $callables = []; + $errors = []; + + foreach ($results as $document) { + if (is_callable($callback)) { + $callables[] = Co\go(function () use ($document, $callback, &$errors) { + try { + $callback($document); + } catch (\Throwable $error) { + $errors[] = $error; + } + }); + } + } + + Co::join($callables); + + if (!empty($errors)) { + throw new \Error("Errors found in concurrent foreachDocument: " . \json_encode($errors)); + } + } else { + foreach ($results as $document) { + if (is_callable($callback)) { + $callback($document); + } + } + } + + $latestDocument = $results[array_key_last($results)]; + } + } +} diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index c09f961fc0..6a6cb3237a 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -13,6 +13,7 @@ use Appwrite\Platform\Tasks\ScheduleMessages; use Appwrite\Platform\Tasks\SDKs; use Appwrite\Platform\Tasks\Specs; use Appwrite\Platform\Tasks\SSL; +use Appwrite\Platform\Tasks\StatsResources; use Appwrite\Platform\Tasks\Upgrade; use Appwrite\Platform\Tasks\Vars; use Appwrite\Platform\Tasks\Version; @@ -38,6 +39,7 @@ class Tasks extends Service ->addAction(Upgrade::getName(), new Upgrade()) ->addAction(Vars::getName(), new Vars()) ->addAction(Version::getName(), new Version()) + ->addAction(StatsResources::getName(), new StatsResources()) ; } } diff --git a/src/Appwrite/Platform/Services/Workers.php b/src/Appwrite/Platform/Services/Workers.php index 0e79f4257c..e121ee35f7 100644 --- a/src/Appwrite/Platform/Services/Workers.php +++ b/src/Appwrite/Platform/Services/Workers.php @@ -11,8 +11,13 @@ use Appwrite\Platform\Workers\Functions; use Appwrite\Platform\Workers\Mails; use Appwrite\Platform\Workers\Messaging; use Appwrite\Platform\Workers\Migrations; +use Appwrite\Platform\Workers\StatsResources; +use Appwrite\Platform\Workers\StatsUsage; +use Appwrite\Platform\Workers\StatsUsageDump; +/** remove */ use Appwrite\Platform\Workers\Usage; use Appwrite\Platform\Workers\UsageDump; +/** /remove */ use Appwrite\Platform\Workers\Webhooks; use Utopia\Platform\Service; @@ -31,10 +36,14 @@ class Workers extends Service ->addAction(Mails::getName(), new Mails()) ->addAction(Messaging::getName(), new Messaging()) ->addAction(Webhooks::getName(), new Webhooks()) + ->addAction(StatsUsageDump::getName(), new StatsUsageDump()) + ->addAction(StatsUsage::getName(), new StatsUsage()) + ->addAction(Migrations::getName(), new Migrations()) + ->addAction(StatsResources::getName(), new StatsResources()) + /** Remove */ ->addAction(UsageDump::getName(), new UsageDump()) ->addAction(Usage::getName(), new Usage()) - ->addAction(Migrations::getName(), new Migrations()) - + /** /remove */ ; } } diff --git a/src/Appwrite/Platform/Tasks/StatsResources.php b/src/Appwrite/Platform/Tasks/StatsResources.php new file mode 100644 index 0000000000..ac3b9ead73 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/StatsResources.php @@ -0,0 +1,81 @@ +desc('Schedules projects for usage count') + ->inject('dbForPlatform') + ->inject('logError') + ->inject('queueForStatsResources') + ->callback([$this, 'action']); + } + + public function action(Database $dbForPlatform, callable $logError, EventStatsResources $queue): void + { + $this->logError = $logError; + $this->dbForPlatform = $dbForPlatform; + + Console::title("Stats resources V1"); + + Console::success('Stats resources: started'); + + $interval = (int) System::getEnv('_APP_STATS_RESOURCES_INTERVAL', '3600'); + Console::loop(function () use ($queue) { + Authorization::disable(); + Authorization::setDefaultStatus(false); + + $last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours')); + /** + * For each project that were accessed in last 24 hours + */ + $this->foreachDocument($this->dbForPlatform, 'projects', [ + Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours)) + ], function ($project) use ($queue) { + $queue + ->setProject($project) + ->trigger(); + Console::success('project: ' . $project->getId() . '(' . $project->getInternalId() . ')' . ' queued'); + }); + }, $interval); + + Console::log("Stats resources: exited"); + } +} diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 4f5d6eb694..e7cbbd5088 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -5,7 +5,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; @@ -50,12 +50,12 @@ class Builds extends Action ->inject('dbForPlatform') ->inject('queueForEvents') ->inject('queueForFunctions') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('cache') ->inject('dbForProject') ->inject('deviceForFunctions') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); } /** @@ -64,7 +64,7 @@ class Builds extends Action * @param Database $dbForPlatform * @param Event $queueForEvents * @param Func $queueForFunctions - * @param Usage $queueForUsage + * @param StatsUsage $queueForStatsUsage * @param Cache $cache * @param Database $dbForProject * @param Device $deviceForFunctions @@ -72,7 +72,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -93,7 +93,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); + $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -105,7 +105,7 @@ class Builds extends Action * @param Device $deviceForFunctions * @param Func $queueForFunctions * @param Event $queueForEvents - * @param Usage $queueForUsage + * @param StatsUsage $queueForStatsUsage * @param Database $dbForPlatform * @param Database $dbForProject * @param GitHub $github @@ -118,7 +118,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -706,20 +706,20 @@ class Builds extends Action /** Trigger usage queue */ if ($build->getAttribute('status') === 'ready') { - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_BUILDS_SUCCESS, 1) // per project ->addMetric(METRIC_BUILDS_COMPUTE_SUCCESS, (int)$build->getAttribute('duration', 0) * 1000) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_SUCCESS), 1) // per function ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_SUCCESS), (int)$build->getAttribute('duration', 0) * 1000); } elseif ($build->getAttribute('status') === 'failed') { - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_BUILDS_FAILED, 1) // per project ->addMetric(METRIC_BUILDS_COMPUTE_FAILED, (int)$build->getAttribute('duration', 0) * 1000) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_FAILED), 1) // per function ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_FAILED), (int)$build->getAttribute('duration', 0) * 1000); } - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_BUILDS, 1) // per project ->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0)) ->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000) diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 72a3334f2f..0a7c39c02f 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -5,7 +5,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Execution; use Exception; @@ -46,13 +46,13 @@ class Functions extends Action ->inject('dbForProject') ->inject('queueForFunctions') ->inject('queueForEvents') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -137,7 +137,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForUsage: $queueForUsage, + queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, @@ -177,7 +177,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForUsage: $queueForUsage, + queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, @@ -199,7 +199,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, - queueForUsage: $queueForUsage, + queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, @@ -284,7 +284,7 @@ class Functions extends Action * @param Log $log * @param Database $dbForProject * @param Func $queueForFunctions - * @param Usage $queueForUsage + * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project * @param Document $function @@ -308,7 +308,7 @@ class Functions extends Action Log $log, Database $dbForProject, Func $queueForFunctions, - Usage $queueForUsage, + StatsUsage $queueForStatsUsage, Event $queueForEvents, Document $project, Document $function, @@ -552,7 +552,7 @@ class Functions extends Action $errorCode = $th->getCode(); } finally { /** Trigger usage queue */ - $queueForUsage + $queueForStatsUsage ->setProject($project) ->addMetric(METRIC_EXECUTIONS, 1) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1) diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index aee60a2bb5..cf2b8bfc84 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Workers; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Messaging\Status as MessageStatus; use Swoole\Runtime; use Utopia\CLI\Console; @@ -70,8 +70,8 @@ class Messaging extends Action ->inject('log') ->inject('dbForProject') ->inject('deviceForFiles') - ->inject('queueForUsage') - ->callback(fn (Message $message, Document $project, Log $log, Database $dbForProject, Device $deviceForFiles, Usage $queueForUsage) => $this->action($message, $project, $log, $dbForProject, $deviceForFiles, $queueForUsage)); + ->inject('queueForStatsUsage') + ->callback(fn (Message $message, Document $project, Log $log, Database $dbForProject, Device $deviceForFiles, StatsUsage $queueForStatsUsage) => $this->action($message, $project, $log, $dbForProject, $deviceForFiles, $queueForStatsUsage)); } /** @@ -80,7 +80,7 @@ class Messaging extends Action * @param Log $log * @param Database $dbForProject * @param Device $deviceForFiles - * @param Usage $queueForUsage + * @param StatsUsage $queueForStatsUsage * @return void * @throws \Exception */ @@ -90,7 +90,7 @@ class Messaging extends Action Log $log, Database $dbForProject, Device $deviceForFiles, - Usage $queueForUsage + StatsUsage $queueForStatsUsage ): void { Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP); $payload = $message->getPayload() ?? []; @@ -111,7 +111,7 @@ class Messaging extends Action case MESSAGE_SEND_TYPE_EXTERNAL: $message = $dbForProject->getDocument('messages', $payload['messageId']); - $this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForUsage); + $this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage); break; default: throw new \Exception('Unknown message type: ' . $type); @@ -123,7 +123,7 @@ class Messaging extends Action Document $message, Device $deviceForFiles, Document $project, - Usage $queueForUsage + StatsUsage $queueForStatsUsage ): void { $topicIds = $message->getAttribute('topics', []); $targetIds = $message->getAttribute('targets', []); @@ -229,8 +229,8 @@ class Messaging extends Action /** * @var array $results */ - $results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForUsage) { - return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForUsage) { + $results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) { + return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) { if (\array_key_exists($providerId, $providers)) { $provider = $providers[$providerId]; } else { @@ -257,8 +257,8 @@ class Messaging extends Action $adapter->getMaxMessagesPerRequest() ); - return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForUsage) { - return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForUsage) { + return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) { + return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) { $deliveredTotal = 0; $deliveryErrors = []; $messageData = clone $message; @@ -298,7 +298,7 @@ class Messaging extends Action $deliveryErrors[] = 'Failed sending to targets with error: ' . $e->getMessage(); } finally { $errorTotal = \count($deliveryErrors); - $queueForUsage + $queueForStatsUsage ->setProject($project) ->addMetric(METRIC_MESSAGES, ($deliveredTotal + $errorTotal)) ->addMetric(METRIC_MESSAGES_SENT, $deliveredTotal) diff --git a/src/Appwrite/Platform/Workers/StatsResources.php b/src/Appwrite/Platform/Workers/StatsResources.php new file mode 100644 index 0000000000..e3c76ecb9a --- /dev/null +++ b/src/Appwrite/Platform/Workers/StatsResources.php @@ -0,0 +1,373 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + /** + * @var array $documents + * + * Array of documents to batch write + * + */ + protected array $documents = []; + + public static function getName(): string + { + return 'stats-resources'; + } + + + /** + * @throws Exception + */ + public function __construct() + { + $this + ->desc('Stats resources worker') + ->inject('message') + ->inject('project') + ->inject('getProjectDB') + ->inject('getLogsDB') + ->inject('dbForPlatform') + ->inject('logError') + ->callback([$this, 'action']); + } + + /** + * @param Message $message + * @param Document $project + * @param callable $getProjectDB + * @return void + * @throws \Utopia\Database\Exception + * @throws Exception + */ + public function action(Message $message, Document $project, callable $getProjectDB, callable $getLogsDB, Database $dbForPlatform, callable $logError): void + { + $this->logError = $logError; + + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + if (empty($project->getAttribute('database'))) { + var_dump($payload); + return; + } + + // Reset documents for each job + $this->documents = []; + + $this->countForProject($dbForPlatform, $getLogsDB, $getProjectDB, $project); + } + + + protected function countForProject(Database $dbForPlatform, callable $getLogsDB, callable $getProjectDB, Document $project): void + { + Console::info('Begining count for: ' . $project->getId()); + + $dbForLogs = null; + $dbForProject = null; + try { + /** @var \Utopia\Database\Database $dbForLogs */ + $dbForLogs = call_user_func($getLogsDB, $project); + /** @var \Utopia\Database\Database $dbForProject */ + $dbForProject = call_user_func($getProjectDB, $project); + } catch (Throwable $th) { + Console::error('Unable to get database'); + Console::error($th->getMessage()); + return; + } + + try { + + $region = $project->getAttribute('region'); + + $platforms = $dbForPlatform->count('platforms', [ + Query::equal('projectInternalId', [$project->getInternalId()]) + ]); + $webhooks = $dbForPlatform->count('webhooks', [ + Query::equal('projectInternalId', [$project->getInternalId()]) + ]); + $keys = $dbForPlatform->count('keys', [ + Query::equal('projectInternalId', [$project->getInternalId()]) + ]); + $databases = $dbForProject->count('databases'); + $buckets = $dbForProject->count('buckets'); + $users = $dbForProject->count('users'); + + $last30Days = (new \DateTime())->sub(\DateInterval::createFromDateString('30 days'))->format('Y-m-d 00:00:00'); + $usersMAU = $dbForProject->count('users', [ + Query::greaterThanEqual('accessedAt', $last30Days) + ]); + $last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours'))->format('Y-m-d h:m:00'); + $usersDAU = $dbForProject->count('users', [ + Query::greaterThanEqual('accessedAt', $last24Hours) + ]); + $last7Days = (new \DateTime())->sub(\DateInterval::createFromDateString('7 days'))->format('Y-m-d 00:00:00'); + $usersWAU = $dbForProject->count('users', [ + Query::greaterThanEqual('accessedAt', $last7Days) + ]); + $teams = $dbForProject->count('teams'); + $functions = $dbForProject->count('functions'); + $messages = $dbForProject->count('messages'); + $providers = $dbForProject->count('providers'); + $topics = $dbForProject->count('topics'); + + $metrics = [ + METRIC_DATABASES => $databases, + METRIC_BUCKETS => $buckets, + METRIC_USERS => $users, + METRIC_FUNCTIONS => $functions, + METRIC_TEAMS => $teams, + METRIC_MESSAGES => $messages, + METRIC_MAU => $usersMAU, + METRIC_DAU => $usersDAU, + METRIC_WAU => $usersWAU, + METRIC_WEBHOOKS => $webhooks, + METRIC_PLATFORMS => $platforms, + METRIC_PROVIDERS => $providers, + METRIC_TOPICS => $topics, + METRIC_KEYS => $keys, + ]; + + foreach ($metrics as $metric => $value) { + $this->createStatsDocuments($region, $metric, $value); + } + + try { + $this->countForBuckets($dbForProject, $dbForLogs, $region); + } catch (Throwable $th) { + call_user_func_array($this->logError, [$th, "StatsResources", "count_for_buckets_{$project->getId()}"]); + } + + try { + $this->countForDatabase($dbForProject, $dbForLogs, $region); + } catch (Throwable $th) { + call_user_func_array($this->logError, [$th, "StatsResources", "count_for_database_{$project->getId()}"]); + } + + try { + $this->countForFunctions($dbForProject, $dbForLogs, $region); + } catch (Throwable $th) { + call_user_func_array($this->logError, [$th, "StatsResources", "count_for_functions_{$project->getId()}"]); + } + + $this->writeDocuments($dbForLogs, $project); + } catch (Throwable $th) { + call_user_func_array($this->logError, [$th, "StatsResources", "count_for_project_{$project->getId()}"]); + } + + Console::info('End of count for: ' . $project->getId()); + } + + protected function countForBuckets(Database $dbForProject, Database $dbForLogs, string $region) + { + $totalFiles = 0; + $totalStorage = 0; + $this->foreachDocument($dbForProject, 'buckets', [], function ($bucket) use ($dbForProject, $dbForLogs, $region, &$totalFiles, &$totalStorage) { + $files = $dbForProject->count('bucket_' . $bucket->getInternalId()); + + $metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES); + $this->createStatsDocuments($region, $metric, $files); + + $storage = $dbForProject->sum('bucket_' . $bucket->getInternalId(), 'sizeActual'); + $metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE); + $this->createStatsDocuments($region, $metric, $storage); + + $totalStorage += $storage; + $totalFiles += $files; + }); + + $this->createStatsDocuments($region, METRIC_FILES, $totalFiles); + $this->createStatsDocuments($region, METRIC_FILES_STORAGE, $totalStorage); + } + + /** + * Need separate function to count per period data + */ + protected function countImageTransformations(Database $dbForProject, Database $dbForLogs, string $region) + { + $totalImageTransformations = 0; + $totalDailyImageTransformations = 0; + $totalHourlyImageTransformations = 0; + $this->foreachDocument($dbForProject, 'buckets', [], function ($bucket) use ($dbForProject, $dbForLogs, $region, &$totalDailyImageTransformations, &$totalHourlyImageTransformations, &$totalImageTransformations) { + $imageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [ + Query::isNotNull('transformedAt') + ]); + $metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED); + $this->createStatsDocuments($region, $metric, $imageTransformations, 'inf'); + $totalImageTransformations += $imageTransformations; + + // hourly + $time = \date($this->periods['1h'], \time()); + $start = $time; + $end = (new \DateTime($start))->format('Y-m-d H:59:59'); + $hourlyImageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [ + Query::greaterThanEqual('transformedAt', $start), + Query::lessThanEqual('transformedAt', $end), + ]); + $metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED); + $this->createStatsDocuments($region, $metric, $hourlyImageTransformations, '1h'); + $totalHourlyImageTransformations += $hourlyImageTransformations; + + // daily + $time = \date($this->periods['1d'], \time()); + $start = $time; + $end = (new \DateTime($start))->format('Y-m-d 11:59:59'); + + $dailyImageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [ + Query::greaterThanEqual('transformedAt', $start), + Query::lessThanEqual('transformedAt', $end), + ]); + $metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED); + $this->createStatsDocuments($region, $metric, $dailyImageTransformations, '1d'); + $totalDailyImageTransformations += $dailyImageTransformations; + + }); + + $this->createStatsDocuments($region, METRIC_FILES_IMAGES_TRANSFORMED, $totalImageTransformations, 'inf'); + + } + + protected function countForDatabase(Database $dbForProject, Database $dbForLogs, string $region) + { + $totalCollections = 0; + $totalDocuments = 0; + + $this->foreachDocument($dbForProject, 'databases', [], function ($database) use ($dbForProject, $dbForLogs, $region, &$totalCollections, &$totalDocuments) { + $collections = $dbForProject->count('database_' . $database->getInternalId()); + + $metric = str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS); + $this->createStatsDocuments($region, $metric, $collections); + + $documents = $this->countForCollections($dbForProject, $dbForLogs, $database, $region); + + $totalDocuments += $documents; + $totalCollections += $collections; + }); + + $this->createStatsDocuments($region, METRIC_COLLECTIONS, $totalCollections); + $this->createStatsDocuments($region, METRIC_DOCUMENTS, $totalDocuments); + } + protected function countForCollections(Database $dbForProject, Database $dbForLogs, Document $database, string $region): int + { + $databaseDocuments = 0; + $this->foreachDocument($dbForProject, 'database_' . $database->getInternalId(), [], function ($collection) use ($dbForProject, $dbForLogs, $database, $region, &$totalCollections, &$databaseDocuments) { + $documents = $dbForProject->count('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); + + $metric = str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS); + $this->createStatsDocuments($region, $metric, $documents); + + $databaseDocuments += $documents; + }); + + $metric = str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_DOCUMENTS); + $this->createStatsDocuments($region, $metric, $databaseDocuments); + + return $databaseDocuments; + } + + protected function countForFunctions(Database $dbForProject, Database $dbForLogs, string $region) + { + $deploymentsStorage = $dbForProject->sum('deployments', 'size'); + $buildsStorage = $dbForProject->sum('builds', 'size'); + $this->createStatsDocuments($region, METRIC_DEPLOYMENTS_STORAGE, $deploymentsStorage); + $this->createStatsDocuments($region, METRIC_BUILDS_STORAGE, $buildsStorage); + + $deployments = $dbForProject->count('deployments'); + $builds = $dbForProject->count('builds'); + $this->createStatsDocuments($region, METRIC_DEPLOYMENTS, $deployments); + $this->createStatsDocuments($region, METRIC_BUILDS, $builds); + + + $this->foreachDocument($dbForProject, 'functions', [], function (Document $function) use ($dbForProject, $dbForLogs, $region) { + $functionDeploymentsStorage = $dbForProject->sum('deployments', 'size', [ + Query::equal('resourceInternalId', [$function->getInternalId()]), + Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]), + ]); + $this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $functionDeploymentsStorage); + + $functionDeployments = $dbForProject->count('deployments', [ + Query::equal('resourceInternalId', [$function->getInternalId()]), + Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]), + ]); + $this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $functionDeployments); + + /** + * As deployments and builds have 1-1 relationship, + * the count for one should match the other + */ + $this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_BUILDS), $functionDeployments); + + $functionBuildsStorage = 0; + + $this->foreachDocument($dbForProject, 'deployments', [ + Query::equal('resourceInternalId', [$function->getInternalId()]), + Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]), + ], function (Document $deployment) use ($dbForProject, &$functionBuildsStorage): void { + $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); + $functionBuildsStorage += $build->getAttribute('size', 0); + }); + + $this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_BUILDS_STORAGE), $functionBuildsStorage); + }); + } + + protected function createStatsDocuments(string $region, string $metric, int $value, ?string $period = null) + { + if ($period === null) { + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : \date($format, \time()); + $id = \md5("{$time}_{$period}_{$metric}"); + + $this->documents[] = new Document([ + '$id' => $id, + 'metric' => $metric, + 'period' => $period, + 'region' => $region, + 'value' => $value, + ]); + } + } else { + $time = 'inf' === $period ? null : \date($this->periods[$period], \time()); + $id = \md5("{$time}_{$period}_{$metric}"); + $this->documents[] = new Document([ + '$id' => $id, + 'metric' => $metric, + 'period' => $period, + 'region' => $region, + 'value' => $value, + ]); + } + } + + protected function writeDocuments(Database $dbForLogs, Document $project): void + { + $dbForLogs->createOrUpdateDocuments( + 'stats', + $this->documents + ); + $this->documents = []; + Console::success('Stats written to logs db for project: ' . $project->getId() . '(' . $project->getInternalId() . ')'); + } +} diff --git a/src/Appwrite/Platform/Workers/StatsUsage.php b/src/Appwrite/Platform/Workers/StatsUsage.php new file mode 100644 index 0000000000..c4d8b0e8d2 --- /dev/null +++ b/src/Appwrite/Platform/Workers/StatsUsage.php @@ -0,0 +1,246 @@ +desc('Stats usage worker') + ->inject('message') + ->inject('getProjectDB') + ->inject('queueForStatsUsageDump') + ->callback([$this, 'action']); + + $this->lastTriggeredTime = time(); + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @param StatsUsageDump $queueForStatsUsageDump + * @return void + * @throws \Utopia\Database\Exception + * @throws Exception + */ + public function action(Message $message, callable $getProjectDB, StatsUsageDump $queueForStatsUsageDump): void + { + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + //Todo Figure out way to preserve keys when the container is being recreated @shimonewman + + $aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20'); + $project = new Document($payload['project'] ?? []); + $projectId = $project->getInternalId(); + foreach ($payload['reduce'] ?? [] as $document) { + if (empty($document)) { + continue; + } + + $this->reduce( + project: $project, + document: new Document($document), + metrics: $payload['metrics'], + getProjectDB: $getProjectDB + ); + } + + $this->stats[$projectId]['project'] = $project; + $this->stats[$projectId]['receivedAt'] = DateTime::now(); + foreach ($payload['metrics'] ?? [] as $metric) { + $this->keys++; + if (!isset($this->stats[$projectId]['keys'][$metric['key']])) { + $this->stats[$projectId]['keys'][$metric['key']] = $metric['value']; + continue; + } + + $this->stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + + // If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats) + if ( + $this->keys >= self::KEYS_THRESHOLD || + (time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0) + ) { + Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys'); + + $queueForStatsUsageDump + ->setStats($this->stats) + ->trigger(); + + $this->stats = []; + $this->keys = 0; + $this->lastTriggeredTime = time(); + } + } + + /** + * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. + * When we remove a parent document we need to deduct his children aggregation from the project scope. + * @param Document $project + * @param Document $document + * @param array $metrics + * @param callable $getProjectDB + * @return void + */ + private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void + { + $dbForProject = $getProjectDB($project); + + try { + switch (true) { + case $document->getCollection() === 'users': // users + $sessions = count($document->getAttribute(METRIC_SESSIONS, 0)); + if (!empty($sessions)) { + $metrics[] = [ + 'key' => METRIC_SESSIONS, + 'value' => ($sessions * -1), + ]; + } + break; + case $document->getCollection() === 'databases': // databases + $collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS))); + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS))); + if (!empty($collections['value'])) { + $metrics[] = [ + 'key' => METRIC_COLLECTIONS, + 'value' => ($collections['value'] * -1), + ]; + } + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS))); + + if (!empty($documents['value'])) { + $metrics[] = [ + 'key' => METRIC_DOCUMENTS, + 'value' => ($documents['value'] * -1), + ]; + $metrics[] = [ + 'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), + 'value' => ($documents['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'buckets': + $files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES))); + $storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE))); + + if (!empty($files['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES, + 'value' => ($files['value'] * -1), + ]; + } + + if (!empty($storage['value'])) { + $metrics[] = [ + 'key' => METRIC_FILES_STORAGE, + 'value' => ($storage['value'] * -1), + ]; + } + break; + + case $document->getCollection() === 'functions': + $deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS))); + $deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE))); + $builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS))); + $buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE))); + $buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE))); + $executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS))); + $executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE))); + + if (!empty($deployments['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS, + 'value' => ($deployments['value'] * -1), + ]; + } + + if (!empty($deploymentsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_DEPLOYMENTS_STORAGE, + 'value' => ($deploymentsStorage['value'] * -1), + ]; + } + + if (!empty($builds['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS, + 'value' => ($builds['value'] * -1), + ]; + } + + if (!empty($buildsStorage['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_STORAGE, + 'value' => ($buildsStorage['value'] * -1), + ]; + } + + if (!empty($buildsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_BUILDS_COMPUTE, + 'value' => ($buildsCompute['value'] * -1), + ]; + } + + if (!empty($executions['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS, + 'value' => ($executions['value'] * -1), + ]; + } + + if (!empty($executionsCompute['value'])) { + $metrics[] = [ + 'key' => METRIC_EXECUTIONS_COMPUTE, + 'value' => ($executionsCompute['value'] * -1), + ]; + } + break; + default: + break; + } + } catch (\Throwable $e) { + console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); + } + } +} diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php new file mode 100644 index 0000000000..38ebd578a5 --- /dev/null +++ b/src/Appwrite/Platform/Workers/StatsUsageDump.php @@ -0,0 +1,350 @@ + true, + METRIC_BUCKETS => true, + METRIC_USERS => true, + METRIC_FUNCTIONS => true, + METRIC_TEAMS => true, + METRIC_MESSAGES => true, + METRIC_MAU => true, + METRIC_WEBHOOKS => true, + METRIC_PLATFORMS => true, + METRIC_PROVIDERS => true, + METRIC_TOPICS => true, + METRIC_KEYS => true, + METRIC_FILES => true, + METRIC_FILES_STORAGE => true, + METRIC_DEPLOYMENTS_STORAGE => true, + METRIC_BUILDS_STORAGE => true, + METRIC_DEPLOYMENTS => true, + METRIC_BUILDS => true, + METRIC_COLLECTIONS => true, + METRIC_DOCUMENTS => true, + ]; + + /** + * Skip metrics associated with parent IDs + * these need to be checked individually with `str_ends_with` + */ + protected array $skipParentIdMetrics = [ + '.files', + '.files.storage', + '.collections', + '.documents', + '.deployments', + '.deployments.storage', + '.builds', + '.builds.storage', + ]; + + /** + * @var callable + */ + protected mixed $getLogsDB; + + protected array $periods = [ + '1h' => 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + public static function getName(): string + { + return 'stats-usage-dump'; + } + + /** + * @throws \Exception + */ + public function __construct() + { + $this + ->inject('message') + ->inject('getProjectDB') + ->inject('getLogsDB') + ->inject('register') + ->callback([$this, 'action']); + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @param callable $getLogsDB + * @return void + * @throws Exception + * @throws \Utopia\Database\Exception + */ + public function action(Message $message, callable $getProjectDB, callable $getLogsDB, Registry $register): void + { + $this->getLogsDB = $getLogsDB; + $this->register = $register; + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + + $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; + $receivedAt = $stats['receivedAt'] ?? 'NONE'; + if ($numberOfKeys === 0) { + continue; + } + + console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + + try { + /** @var \Utopia\Database\Database $dbForProject */ + $dbForProject = $getProjectDB($project); + foreach ($stats['keys'] ?? [] as $key => $value) { + if ($value == 0) { + continue; + } + + if (str_contains($key, METRIC_DATABASES_STORAGE)) { + try { + $this->handleDatabaseStorage($key, $dbForProject, $project); + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage()); + } + continue; + } + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + $document = new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => System::getEnv('_APP_REGION', 'default'), + ]); + $documentClone = new Document($document->getArrayCopy()); + $dbForProject->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); + + $this->writeToLogsDB($project, $documentClone); + } + } + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + } + } + } + + private function handleDatabaseStorage(string $key, Database $dbForProject, Document $project): void + { + $data = explode('.', $key); + $start = microtime(true); + + $updateMetric = function (Database $dbForProject, Document $project, int $value, string $key, string $period, string|null $time) { + $id = \md5("{$time}_{$period}_{$key}"); + + $document = new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => System::getEnv('_APP_REGION', 'default'), + ]); + $documentClone = new Document($document->getArrayCopy()); + $dbForProject->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); + $this->writeToLogsDB($project, $documentClone); + }; + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + $value = 0; + $previousValue = 0; + try { + $previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0); + } catch (\Exception $e) { + // No previous value + } + + switch (count($data)) { + // Collection Level + case self::METRIC_COLLECTION_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']'); + $databaseInternalId = $data[0]; + $collectionInternalId = $data[1]; + + try { + $value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId); + } catch (\Exception $e) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { + throw $e; + } + } + + // Compare with previous value + $diff = $value - $previousValue; + + if ($diff === 0) { + break; + } + + // Update Collection + $updateMetric($dbForProject, $project, $diff, $key, $period, $time); + + // Update Database + $databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE); + $updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time); + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); + break; + // Database Level + case self::METRIC_DATABASE_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']'); + $databaseInternalId = $data[0]; + + $collections = []; + try { + $collections = $dbForProject->find('database_' . $databaseInternalId); + } catch (\Exception $e) { + // Database not found + if ($e->getMessage() !== 'Collection not found') { + throw $e; + } + } + + foreach ($collections as $collection) { + try { + $value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId()); + } catch (\Exception $e) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { + throw $e; + } + } + } + + $diff = $value - $previousValue; + + if ($diff === 0) { + break; + } + + // Update Database + $databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE); + $updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time); + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); + break; + // Project Level + case self::METRIC_PROJECT_LEVEL_STORAGE: + Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']'); + // Get all project databases + $databases = $dbForProject->find('database'); + + // Recalculate all databases + foreach ($databases as $database) { + $collections = $dbForProject->find('database_' . $database->getInternalId()); + + foreach ($collections as $collection) { + try { + $value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId()); + } catch (\Exception $e) { + // Collection not found + if ($e->getMessage() !== 'Collection not found') { + throw $e; + } + } + } + } + + $diff = $value - $previousValue; + + // Update Project + $projectKey = METRIC_DATABASES_STORAGE; + $updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time); + break; + } + } + + $end = microtime(true); + + console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds'); + } + + protected function writeToLogsDB(Document $project, Document $document) + { + if (!System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', false)) { + Console::log('Dual Writing is disabled. Skipping...'); + return; + } + + if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) { + return; + } + foreach ($this->skipParentIdMetrics as $skipMetric) { + if (str_ends_with($document->getAttribute('metric'), $skipMetric)) { + return; + } + } + + /** @var \Utopia\Database\Database $dbForLogs*/ + $dbForLogs = call_user_func($this->getLogsDB, $project); + + try { + $dbForLogs->createOrUpdateDocumentsWithIncrease( + 'stats', + 'value', + [$document] + ); + Console::success('Usage logs pushed to Logs DB'); + } catch (\Throwable $th) { + Console::error($th->getMessage()); + } + + $this->register->get('pools')->get('logs')->reclaim(); + } +} diff --git a/src/Appwrite/Platform/Workers/Webhooks.php b/src/Appwrite/Platform/Workers/Webhooks.php index a76e4f17b0..c903dafdae 100644 --- a/src/Appwrite/Platform/Workers/Webhooks.php +++ b/src/Appwrite/Platform/Workers/Webhooks.php @@ -3,7 +3,7 @@ namespace Appwrite\Platform\Workers; use Appwrite\Event\Mail; -use Appwrite\Event\Usage; +use Appwrite\Event\StatsUsage; use Appwrite\Template\Template; use Exception; use Utopia\Database\Database; @@ -35,9 +35,9 @@ class Webhooks extends Action ->inject('project') ->inject('dbForPlatform') ->inject('queueForMails') - ->inject('queueForUsage') + ->inject('queueForStatsUsage') ->inject('log') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForMails, $queueForUsage, $log)); + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForMails, $queueForStatsUsage, $log)); } /** @@ -49,7 +49,7 @@ class Webhooks extends Action * @return void * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage, Log $log): void { $this->errors = []; $payload = $message->getPayload() ?? []; @@ -66,7 +66,7 @@ class Webhooks extends Action foreach ($project->getAttribute('webhooks', []) as $webhook) { if (array_intersect($webhook->getAttribute('events', []), $events)) { - $this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails, $queueForUsage); + $this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails, $queueForStatsUsage); } } @@ -85,7 +85,7 @@ class Webhooks extends Action * @param Mail $queueForMails * @return void */ - private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage): void + private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage): void { if ($webhook->getAttribute('enabled') !== true) { return; @@ -168,7 +168,7 @@ class Webhooks extends Action $dbForPlatform->purgeCachedDocument('projects', $project->getId()); $this->errors[] = $logs; - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_WEBHOOKS_FAILED, 1) ->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_FAILED), 1) ; @@ -178,13 +178,13 @@ class Webhooks extends Action $webhook->setAttribute('attempts', 0); // Reset attempts on success $dbForPlatform->updateDocument('webhooks', $webhook->getId(), $webhook); $dbForPlatform->purgeCachedDocument('projects', $project->getId()); - $queueForUsage + $queueForStatsUsage ->addMetric(METRIC_WEBHOOKS_SENT, 1) ->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_SENT), 1) ; } - $queueForUsage + $queueForStatsUsage ->setProject($project) ->trigger(); } diff --git a/tests/e2e/Services/Health/HealthCustomServerTest.php b/tests/e2e/Services/Health/HealthCustomServerTest.php index f2c6a2e5c2..04b1408cd0 100644 --- a/tests/e2e/Services/Health/HealthCustomServerTest.php +++ b/tests/e2e/Services/Health/HealthCustomServerTest.php @@ -494,12 +494,12 @@ class HealthCustomServerTest extends Scope return []; } - public function testUsageSuccess() + public function testStatsResources() { /** * Test for SUCCESS */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/usage', array_merge([ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-resources', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), []); @@ -511,19 +511,19 @@ class HealthCustomServerTest extends Scope /** * Test for FAILURE */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/usage?threshold=0', array_merge([ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-resources?threshold=0', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), []); $this->assertEquals(503, $response['headers']['status-code']); } - public function testUsageDumpSuccess() + public function testUsageSuccess() { /** * Test for SUCCESS */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/usage-dump', array_merge([ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), []); @@ -535,7 +535,31 @@ class HealthCustomServerTest extends Scope /** * Test for FAILURE */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue/usage-dump?threshold=0', array_merge([ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage?threshold=0', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), []); + $this->assertEquals(503, $response['headers']['status-code']); + } + + public function testStatsUsageDumpSuccess() + { + /** + * Test for SUCCESS + */ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), []); + + $this->assertEquals(200, $response['headers']['status-code']); + $this->assertIsInt($response['body']['size']); + $this->assertLessThan(100, $response['body']['size']); + + /** + * Test for FAILURE + */ + $response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump?threshold=0', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), []); diff --git a/tests/resources/docker/docker-compose.yml b/tests/resources/docker/docker-compose.yml index 94d506056c..e549ac27a5 100644 --- a/tests/resources/docker/docker-compose.yml +++ b/tests/resources/docker/docker-compose.yml @@ -89,9 +89,9 @@ services: - _APP_FUNCTIONS_MEMORY_SWAP - _APP_EXECUTOR_HOST - appwrite-worker-usage: - entrypoint: worker-usage - container_name: appwrite-worker-usage + appwrite-worker-stats-usage: + entrypoint: worker-stats-usage + container_name: appwrite-worker-stats-usage build: context: . restart: unless-stopped