Merge pull request #9272 from appwrite/feat-logs-db

Feat: logs DB
This commit is contained in:
Christy Jacob 2025-02-10 23:49:48 +05:30 committed by GitHub
commit 2f95b0e2e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
43 changed files with 2039 additions and 304 deletions

2
.env
View file

@ -86,6 +86,7 @@ _APP_MAINTENANCE_RETENTION_EXECUTION=1209600
_APP_MAINTENANCE_RETENTION_ABUSE=86400
_APP_MAINTENANCE_RETENTION_AUDIT=1209600
_APP_USAGE_AGGREGATION_INTERVAL=30
_APP_STATS_RESOURCES_INTERVAL=3600
_APP_MAINTENANCE_RETENTION_USAGE_HOURLY=8640000
_APP_MAINTENANCE_RETENTION_SCHEDULES=86400
_APP_USAGE_STATS=enabled
@ -110,3 +111,4 @@ _APP_MESSAGE_PUSH_TEST_DSN=
_APP_WEBHOOK_MAX_FAILED_ATTEMPTS=10
_APP_PROJECT_REGIONS=default
_APP_FUNCTIONS_CREATION_ABUSE_LIMIT=5000
_APP_STATS_USAGE_DUAL_WRITING_DBS=database_db_main

View file

@ -367,7 +367,7 @@ In file `app/controllers/shared/api.php` On the database listener, add to an exi
```php
case $document->getCollection() === 'teams':
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_TEAMS, $value); // per project
break;
```
@ -379,10 +379,10 @@ In that case you need also to handle children removal using addReduce() method c
```php
case $document->getCollection() === 'buckets': //buckets
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
@ -428,16 +428,16 @@ public function __construct()
->inject('dbForProject')
->inject('queueForFunctions')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('log')
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log));
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log));
}
```
and then trigger the queue with the new metric like so:
```php
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUILDS, 1)
->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)

View file

@ -85,6 +85,10 @@ RUN chmod +x /usr/local/bin/doctor && \
chmod +x /usr/local/bin/worker-messaging && \
chmod +x /usr/local/bin/worker-migrations && \
chmod +x /usr/local/bin/worker-webhooks && \
chmod +x /usr/local/bin/worker-stats-usage && \
chmod +x /usr/local/bin/worker-stats-usage-dump && \
chmod +x /usr/local/bin/stats-resources && \
chmod +x /usr/local/bin/worker-stats-resources && \
chmod +x /usr/local/bin/worker-usage && \
chmod +x /usr/local/bin/worker-usage-dump

View file

@ -5,6 +5,8 @@ require_once __DIR__ . '/init.php';
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete;
use Appwrite\Event\Func;
use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Platform\Appwrite;
use Appwrite\Runtimes\Runtimes;
use Utopia\Cache\Adapter\Sharding;
@ -160,6 +162,45 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
};
}, ['pools', 'dbForPlatform', 'cache']);
CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$database
->setSharedTables(true)
->setNamespace('logsV1')
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS)
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES);
// set tenant
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
}
return $database;
};
}, ['pools', 'cache']);
CLI::setResource('queueForStatsUsage', function (Connection $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
CLI::setResource('queueForStatsResources', function (Publisher $publisher) {
return new StatsResources($publisher);
}, ['publisher']);
CLI::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
}, ['pools']);

View file

@ -5,6 +5,7 @@ $common = include __DIR__ . '/collections/common.php';
$projects = include __DIR__ . '/collections/projects.php';
$databases = include __DIR__ . '/collections/databases.php';
$platform = include __DIR__ . '/collections/platform.php';
$logs = include __DIR__ . '/collections/logs.php';
// see - http.php#245
// $collections['buckets']['files'];
@ -27,6 +28,7 @@ $collections = [
'databases' => $databases,
'projects' => array_merge($projects, $common),
'console' => array_merge($platform, $common),
'logs' => $logs,
];
return $collections;

View file

@ -0,0 +1,94 @@
<?php
use Utopia\Database\Database;
use Utopia\Database\Helpers\ID;
$logsCollection = [];
$logsCollection['stats'] = [
'$collection' => ID::custom(Database::METADATA),
'$id' => ID::custom('stats'),
'name' => 'stats',
'attributes' => [
[
'$id' => ID::custom('metric'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 255,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('region'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 255,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('value'),
'type' => Database::VAR_INTEGER,
'format' => '',
'size' => 8,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('time'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('period'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 4,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
],
'indexes' => [
[
'$id' => ID::custom('_key_time'),
'type' => Database::INDEX_KEY,
'attributes' => ['time'],
'lengths' => [],
'orders' => [Database::ORDER_DESC],
],
[
'$id' => ID::custom('_key_period_time'),
'type' => Database::INDEX_KEY,
'attributes' => ['period', 'time'],
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_metric_period_time'),
'type' => Database::INDEX_UNIQUE,
'attributes' => ['metric', 'period', 'time'],
'lengths' => [],
'orders' => [Database::ORDER_DESC],
],
],
];
return $logsCollection;

View file

@ -17,7 +17,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Hooks\Hooks;
use Appwrite\Network\Validator\Email;
@ -2432,9 +2432,9 @@ App::post('/v1/account/tokens/phone')
->inject('queueForMessaging')
->inject('locale')
->inject('timelimit')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('plan')
->action(function (string $userId, string $phone, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, Usage $queueForUsage, array $plan) {
->action(function (string $userId, string $phone, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) {
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
}
@ -2583,11 +2583,11 @@ App::post('/v1/account/tokens/phone')
$countryCode = $helper->parse($phone)->getCountryCode();
if (!empty($countryCode)) {
$queueForUsage
$queueForStatsUsage
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
}
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
->setProject($project)
->trigger();
@ -3678,9 +3678,9 @@ App::post('/v1/account/verification/phone')
->inject('project')
->inject('locale')
->inject('timelimit')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('plan')
->action(function (Request $request, Response $response, Document $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, Usage $queueForUsage, array $plan) {
->action(function (Request $request, Response $response, Document $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) {
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
}
@ -3775,11 +3775,11 @@ App::post('/v1/account/verification/phone')
$countryCode = $helper->parse($phone)->getCountryCode();
if (!empty($countryCode)) {
$queueForUsage
$queueForStatsUsage
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
}
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
->setProject($project)
->trigger();
@ -4310,9 +4310,9 @@ App::post('/v1/account/mfa/challenge')
->inject('queueForMessaging')
->inject('queueForMails')
->inject('timelimit')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('plan')
->action(function (string $factor, Response $response, Database $dbForProject, Document $user, Locale $locale, Document $project, Request $request, Event $queueForEvents, Messaging $queueForMessaging, Mail $queueForMails, callable $timelimit, Usage $queueForUsage, array $plan) {
->action(function (string $factor, Response $response, Database $dbForProject, Document $user, Locale $locale, Document $project, Request $request, Event $queueForEvents, Messaging $queueForMessaging, Mail $queueForMails, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) {
$expire = DateTime::addSeconds(new \DateTime(), Auth::TOKEN_EXPIRATION_CONFIRM);
$code = Auth::codeGenerator();
@ -4383,11 +4383,11 @@ App::post('/v1/account/mfa/challenge')
$countryCode = $helper->parse($phone)->getCountryCode();
if (!empty($countryCode)) {
$queueForUsage
$queueForStatsUsage
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
}
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
->setProject($project)
->trigger();

View file

@ -4,7 +4,7 @@ use Appwrite\Auth\Auth;
use Appwrite\Detector\Detector;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Event;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Network\Validator\Email;
use Appwrite\SDK\AuthType;
@ -476,8 +476,8 @@ App::post('/v1/databases')
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')
->inject('queueForUsage')
->action(function (string $databaseId, string $name, bool $enabled, Response $response, Database $dbForProject, Event $queueForEvents, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, string $name, bool $enabled, Response $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage) {
$databaseId = $databaseId == 'unique()' ? ID::unique() : $databaseId;
@ -527,7 +527,7 @@ App::post('/v1/databases')
}
$queueForEvents->setParam('databaseId', $database->getId());
$queueForUsage->addMetric(str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_STORAGE), 1); // per database
$queueForStatsUsage->addMetric(str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_STORAGE), 1); // per database
$response
->setStatusCode(Response::STATUS_CODE_CREATED)
@ -797,8 +797,8 @@ App::delete('/v1/databases/:databaseId')
->inject('dbForProject')
->inject('queueForDatabase')
->inject('queueForEvents')
->inject('queueForUsage')
->action(function (string $databaseId, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, StatsUsage $queueForStatsUsage) {
$database = $dbForProject->getDocument('databases', $databaseId);
@ -821,7 +821,7 @@ App::delete('/v1/databases/:databaseId')
->setParam('databaseId', $database->getId())
->setPayload($response->output($database, Response::MODEL_DATABASE));
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_STORAGE, 1); // Global, deletion forces full recalculation
$response->noContent();
@ -2618,8 +2618,8 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/attributes/:key
->inject('dbForProject')
->inject('queueForDatabase')
->inject('queueForEvents')
->inject('queueForUsage')
->action(function (string $databaseId, string $collectionId, string $key, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, string $collectionId, string $key, Response $response, Database $dbForProject, EventDatabase $queueForDatabase, Event $queueForEvents, StatsUsage $queueForStatsUsage) {
$db = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
@ -2716,7 +2716,7 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/attributes/:key
->setContext('database', $db)
->setPayload($response->output($attribute, $model));
$queueForUsage
$queueForStatsUsage
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$db->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection
$response->noContent();
@ -3134,9 +3134,9 @@ App::post('/v1/databases/:databaseId/collections/:collectionId/documents')
->inject('dbForProject')
->inject('user')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('mode')
->action(function (string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, Response $response, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode) {
->action(function (string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, Response $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, string $mode) {
$data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array
@ -3339,7 +3339,7 @@ App::post('/v1/databases/:databaseId/collections/:collectionId/documents')
$processDocument($collection, $document);
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $operations)
->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), $operations)
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection
@ -3392,8 +3392,8 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents')
->inject('response')
->inject('dbForProject')
->inject('mode')
->inject('queueForUsage')
->action(function (string $databaseId, string $collectionId, array $queries, Response $response, Database $dbForProject, string $mode, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, string $collectionId, array $queries, Response $response, Database $dbForProject, string $mode, StatsUsage $queueForStatsUsage) {
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
$isAPIKey = Auth::isAppUser(Authorization::getRoles());
$isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles());
@ -3505,7 +3505,7 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents')
$processDocument($collection, $document);
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_READS, $operations)
->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations)
;
@ -3571,8 +3571,8 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen
->inject('response')
->inject('dbForProject')
->inject('mode')
->inject('queueForUsage')
->action(function (string $databaseId, string $collectionId, string $documentId, array $queries, Response $response, Database $dbForProject, string $mode, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, string $collectionId, string $documentId, array $queries, Response $response, Database $dbForProject, string $mode, StatsUsage $queueForStatsUsage) {
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
$isAPIKey = Auth::isAppUser(Authorization::getRoles());
$isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles());
@ -3648,7 +3648,7 @@ App::get('/v1/databases/:databaseId/collections/:collectionId/documents/:documen
$processDocument($collection, $document);
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_READS, $operations)
->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_READS), $operations)
;
@ -3805,8 +3805,8 @@ App::patch('/v1/databases/:databaseId/collections/:collectionId/documents/:docum
->inject('dbForProject')
->inject('queueForEvents')
->inject('mode')
->inject('queueForUsage')
->action(function (string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, string $mode, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, string $mode, StatsUsage $queueForStatsUsage) {
$data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array
@ -3946,7 +3946,7 @@ App::patch('/v1/databases/:databaseId/collections/:collectionId/documents/:docum
$setCollection($collection, $newDocument);
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $operations)
->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), $operations)
;
@ -4058,9 +4058,9 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/documents/:docu
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('mode')
->action(function (string $databaseId, string $collectionId, string $documentId, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, Usage $queueForUsage, string $mode) {
->action(function (string $databaseId, string $collectionId, string $documentId, ?\DateTime $requestTimestamp, Response $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage, string $mode) {
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
$isAPIKey = Auth::isAppUser(Authorization::getRoles());
@ -4129,7 +4129,7 @@ App::delete('/v1/databases/:databaseId/collections/:collectionId/documents/:docu
$processDocument($collection, $document);
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, 1)
->addMetric(str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_OPERATIONS_WRITES), 1)
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_STORAGE), 1); // per collection

View file

@ -6,7 +6,7 @@ use Appwrite\Event\Build;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Validator\FunctionEvent;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
@ -1900,10 +1900,10 @@ App::post('/v1/functions/:functionId/executions')
->inject('dbForPlatform')
->inject('user')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('geodb')
->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) {
->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb) {
$async = \strval($async) === 'true' || \strval($async) === '1';
if (!$async && !is_null($scheduledAt)) {
@ -2230,7 +2230,7 @@ App::post('/v1/functions/:functionId/executions')
throw $th;
}
} finally {
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1)
->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000)) // per project

View file

@ -692,15 +692,15 @@ App::get('/v1/health/queue/functions')
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
App::get('/v1/health/queue/usage')
->desc('Get usage queue')
App::get('/v1/health/queue/stats-resources')
->desc('Get stats resources queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
name: 'getQueueUsage',
description: '/docs/references/health/get-queue-usage.md',
name: 'getQueueStatsResources',
description: '/docs/references/health/get-queue-stats-resources.md',
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -715,7 +715,7 @@ App::get('/v1/health/queue/usage')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::USAGE_QUEUE_NAME));
$size = $publisher->getQueueSize(new Queue(Event::STATS_RESOURCES_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -724,15 +724,15 @@ App::get('/v1/health/queue/usage')
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
});
App::get('/v1/health/queue/usage-dump')
->desc('Get usage dump queue')
App::get('/v1/health/queue/stats-usage')
->desc('Get stats usage queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
name: 'getQueueUsageDump',
description: '/docs/references/health/get-queue-usage-dump.md',
name: 'getQueueUsage',
description: '/docs/references/health/get-queue-stats-usage.md',
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -747,7 +747,39 @@ App::get('/v1/health/queue/usage-dump')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::USAGE_DUMP_QUEUE_NAME));
$size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
});
App::get('/v1/health/queue/stats-usage-dump')
->desc('Get usage dump queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
name: 'getQueueStatsUsageDump',
description: '/docs/references/health/get-queue-stats-usage-dump.md',
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
model: Response::MODEL_HEALTH_QUEUE,
)
],
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_DUMP_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -920,8 +952,9 @@ App::get('/v1/health/queue/failed/:name')
Event::AUDITS_QUEUE_NAME,
Event::MAILS_QUEUE_NAME,
Event::FUNCTIONS_QUEUE_NAME,
Event::USAGE_QUEUE_NAME,
Event::USAGE_DUMP_QUEUE_NAME,
Event::STATS_RESOURCES_QUEUE_NAME,
Event::STATS_USAGE_QUEUE_NAME,
Event::STATS_USAGE_DUMP_QUEUE_NAME,
Event::WEBHOOK_QUEUE_NAME,
Event::CERTIFICATES_QUEUE_NAME,
Event::BUILDS_QUEUE_NAME,

View file

@ -6,7 +6,7 @@ use Appwrite\Auth\Auth;
use Appwrite\ClamAV\Network;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\SDK\AuthType;
@ -942,8 +942,8 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/preview')
->inject('mode')
->inject('deviceForFiles')
->inject('deviceForLocal')
->inject('queueForUsage')
->action(function (string $bucketId, string $fileId, int $width, int $height, string $gravity, int $quality, int $borderWidth, string $borderColor, int $borderRadius, float $opacity, int $rotation, string $background, string $output, Request $request, Response $response, Document $project, Database $dbForProject, string $mode, Device $deviceForFiles, Device $deviceForLocal, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (string $bucketId, string $fileId, int $width, int $height, string $gravity, int $quality, int $borderWidth, string $borderColor, int $borderRadius, float $opacity, int $rotation, string $background, string $output, Request $request, Response $response, Document $project, Database $dbForProject, string $mode, Device $deviceForFiles, Device $deviceForLocal, StatsUsage $queueForStatsUsage) {
if (!\extension_loaded('imagick')) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Imagick extension is missing');
@ -1071,7 +1071,7 @@ App::get('/v1/storage/buckets/:bucketId/files/:fileId/preview')
$contentType = (\array_key_exists($output, $outputs)) ? $outputs[$output] : $outputs['jpg'];
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_FILES_TRANSFORMATIONS, 1)
->addMetric(str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_TRANSFORMATIONS), 1)
;

View file

@ -8,7 +8,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Network\Validator\Email;
use Appwrite\Platform\Workers\Deletes;
@ -466,9 +466,9 @@ App::post('/v1/teams/:teamId/memberships')
->inject('queueForMessaging')
->inject('queueForEvents')
->inject('timelimit')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('plan')
->action(function (string $teamId, string $email, string $userId, string $phone, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, Messaging $queueForMessaging, Event $queueForEvents, callable $timelimit, Usage $queueForUsage, array $plan) {
->action(function (string $teamId, string $email, string $userId, string $phone, array $roles, string $url, string $name, Response $response, Document $project, Document $user, Database $dbForProject, Locale $locale, Mail $queueForMails, Messaging $queueForMessaging, Event $queueForEvents, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan) {
$isAppUser = Auth::isAppUser(Authorization::getRoles());
$isPrivilegedUser = Auth::isPrivilegedUser(Authorization::getRoles());
@ -757,11 +757,11 @@ App::post('/v1/teams/:teamId/memberships')
$countryCode = $helper->parse($phone)->getCountryCode();
if (!empty($countryCode)) {
$queueForUsage
$queueForStatsUsage
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
}
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
->setProject($project)
->trigger();

View file

@ -7,7 +7,7 @@ use Appwrite\Auth\Auth;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Network\Validator\Origin;
use Appwrite\SDK\AuthType;
@ -50,7 +50,7 @@ Config::setParam('domainVerification', false);
Config::setParam('cookieDomain', 'localhost');
Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE);
function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname)
function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname)
{
$utopia->getRoute()?->label('error', __DIR__ . '/../views/general/error.phtml');
@ -434,7 +434,7 @@ function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, Sw
$fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size'];
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_NETWORK_REQUESTS, 1)
->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize)
->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize())
@ -499,13 +499,13 @@ App::init()
->inject('localeCodes')
->inject('clients')
->inject('geodb')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForEvents')
->inject('queueForCertificates')
->inject('queueForFunctions')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, Usage $queueForUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) {
/*
* Appwrite Router
*/
@ -513,7 +513,7 @@ App::init()
$mainDomain = System::getEnv('_APP_DOMAIN', '');
// Only run Router when external domain
if ($host !== $mainDomain || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
return;
}
}
@ -732,12 +732,12 @@ App::options()
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
/*
* Appwrite Router
*/
@ -745,7 +745,7 @@ App::options()
$mainDomain = System::getEnv('_APP_DOMAIN', '');
// Only run Router when external domain
if ($host !== $mainDomain || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
return;
}
}
@ -770,8 +770,8 @@ App::error()
->inject('project')
->inject('logger')
->inject('log')
->inject('queueForUsage')
->action(function (Throwable $error, App $utopia, Request $request, Response $response, Document $project, ?Logger $logger, Log $log, Usage $queueForUsage) {
->inject('queueForStatsUsage')
->action(function (Throwable $error, App $utopia, Request $request, Response $response, Document $project, ?Logger $logger, Log $log, StatsUsage $queueForStatsUsage) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$route = $utopia->getRoute();
$class = \get_class($error);
@ -882,13 +882,13 @@ App::error()
$fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size'];
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_NETWORK_REQUESTS, 1)
->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize)
->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize());
}
$queueForUsage
$queueForStatsUsage
->setProject($project)
->trigger();
}
@ -1040,12 +1040,12 @@ App::get('/robots.txt')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
$host = $request->getHostname() ?? '';
$mainDomain = System::getEnv('_APP_DOMAIN', '');
@ -1053,7 +1053,7 @@ App::get('/robots.txt')
$template = new View(__DIR__ . '/../views/general/robots.phtml');
$response->text($template->render(false));
} else {
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
}
});
@ -1068,12 +1068,12 @@ App::get('/humans.txt')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
$host = $request->getHostname() ?? '';
$mainDomain = System::getEnv('_APP_DOMAIN', '');
@ -1081,7 +1081,7 @@ App::get('/humans.txt')
$template = new View(__DIR__ . '/../views/general/humans.phtml');
$response->text($template->render(false));
} else {
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
}
});

View file

@ -12,7 +12,7 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Messaging;
use Appwrite\Event\Realtime;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
@ -90,7 +90,7 @@ $eventDatabaseListener = function (Document $project, Document $document, Respon
}
};
$usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) {
$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) {
$value = 1;
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$value = -1;
@ -98,40 +98,40 @@ $usageDatabaseListener = function (string $event, Document $document, Usage $que
switch (true) {
case $document->getCollection() === 'teams':
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_TEAMS, $value); // per project
break;
case $document->getCollection() === 'users':
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_USERS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'sessions': // sessions
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_SESSIONS, $value); //per project
break;
case $document->getCollection() === 'databases': // databases
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DATABASES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value)
;
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
@ -139,39 +139,39 @@ $usageDatabaseListener = function (string $event, Document $document, Usage $que
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$collectionInternalId = $parts[3] ?? 0;
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DOCUMENTS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection
break;
case $document->getCollection() === 'buckets': //buckets
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'bucket_'): // files
$parts = explode('_', $document->getCollection());
$bucketInternalId = $parts[1];
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_FILES, $value) // per project
->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket
break;
case $document->getCollection() === 'functions':
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_FUNCTIONS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForUsage
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'deployments':
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_DEPLOYMENTS, $value) // per project
->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_FUNCTION_ID_DEPLOYMENTS), $value) // per function
@ -436,11 +436,11 @@ App::init()
->inject('queueForDeletes')
->inject('queueForDatabase')
->inject('queueForBuilds')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('dbForProject')
->inject('timelimit')
->inject('mode')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
$route = $utopia->getRoute();
@ -550,8 +550,8 @@ App::init()
$queueForRealtime = new Realtime();
$dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener(
$project,
$document,
@ -694,7 +694,7 @@ App::shutdown()
->inject('user')
->inject('queueForEvents')
->inject('queueForAudits')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('queueForDeletes')
->inject('queueForDatabase')
->inject('queueForBuilds')
@ -703,7 +703,7 @@ App::shutdown()
->inject('queueForWebhooks')
->inject('queueForRealtime')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -860,13 +860,13 @@ App::shutdown()
$fileSize = (\is_array($file['size']) && isset($file['size'][0])) ? $file['size'][0] : $file['size'];
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_NETWORK_REQUESTS, 1)
->addMetric(METRIC_NETWORK_INBOUND, $request->getSize() + $fileSize)
->addMetric(METRIC_NETWORK_OUTBOUND, $response->getSize());
}
$queueForUsage
$queueForStatsUsage
->setProject($project)
->trigger();
}

View file

@ -17,7 +17,6 @@ use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
@ -156,6 +155,79 @@ function dispatch(Server $server, int $fd, int $type, $data = null): int
include __DIR__ . '/controllers/general.php';
function createDatabase(App $app, string $resourceKey, string $dbName, array $collections, mixed $pools, callable $extraSetup = null): void
{
$max = 10;
$sleep = 1;
$attempts = 0;
do {
try {
$attempts++;
$resource = $app->getResource($resourceKey);
/* @var $database Database */
$database = is_callable($resource) ? $resource() : $resource;
break; // exit loop on success
} catch (\Exception $e) {
Console::warning(" └── Database not ready. Retrying connection ({$attempts})...");
$pools->reclaim();
if ($attempts >= $max) {
throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage());
}
sleep($sleep);
}
} while ($attempts < $max);
Console::success("[Setup] - $dbName database init started...");
// Attempt to create the database
try {
Console::info(" └── Creating database: $dbName...");
$database->create();
} catch (\Exception $e) {
Console::info(" └── Skip: metadata table already exists");
}
// Process collections
foreach ($collections as $key => $collection) {
if (($collection['$collection'] ?? '') !== Database::METADATA) {
continue;
}
if (!$database->getCollection($key)->isEmpty()) {
continue;
}
Console::info(" └── Creating collection: {$collection['$id']}...");
$attributes = array_map(fn ($attr) => new Document([
'$id' => ID::custom($attr['$id']),
'type' => $attr['type'],
'size' => $attr['size'],
'required' => $attr['required'],
'signed' => $attr['signed'],
'array' => $attr['array'],
'filters' => $attr['filters'],
'default' => $attr['default'] ?? null,
'format' => $attr['format'] ?? ''
]), $collection['attributes']);
$indexes = array_map(fn ($index) => new Document([
'$id' => ID::custom($index['$id']),
'type' => $index['type'],
'attributes' => $index['attributes'],
'lengths' => $index['lengths'],
'orders' => $index['orders'],
]), $collection['indexes']);
$database->createCollection($key, $attributes, $indexes);
}
if ($extraSetup) {
$extraSetup($database);
}
}
$http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $register) {
$app = new App('UTC');
@ -164,140 +236,75 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
/** @var Group $pools */
App::setResource('pools', fn () => $pools);
// wait for database to be ready
$attempts = 0;
$max = 10;
$sleep = 1;
do {
try {
$attempts++;
$dbForPlatform = $app->getResource('dbForPlatform');
/** @var Utopia\Database\Database $dbForPlatform */
break; // leave the do-while if successful
} catch (\Throwable $e) {
Console::warning("Database not ready. Retrying connection ({$attempts})...");
if ($attempts >= $max) {
throw new \Exception('Failed to connect to database: ' . $e->getMessage());
}
sleep($sleep);
}
} while ($attempts < $max);
Console::success('[Setup] - Server database init started...');
try {
Console::success('[Setup] - Creating console database...');
$dbForPlatform->create();
} catch (Duplicate) {
Console::success('[Setup] - Skip: metadata table already exists');
}
if ($dbForPlatform->getCollection(Audit::COLLECTION)->isEmpty()) {
$audit = new Audit($dbForPlatform);
$audit->setup();
}
/** @var array $collections */
$collections = Config::getParam('collections', []);
$consoleCollections = $collections['console'];
foreach ($consoleCollections as $key => $collection) {
if (($collection['$collection'] ?? '') !== Database::METADATA) {
continue;
}
if (!$dbForPlatform->getCollection($key)->isEmpty()) {
continue;
// create logs database first, `getLogsDB` is a callable.
createDatabase($app, 'getLogsDB', 'logs', $collections['logs'], $pools);
// create appwrite database, `dbForPlatform` is a direct access call.
createDatabase($app, 'dbForPlatform', 'appwrite', $collections['console'], $pools, function (Database $dbForPlatform) use ($collections) {
if ($dbForPlatform->getCollection(Audit::COLLECTION)->isEmpty()) {
$audit = new Audit($dbForPlatform);
$audit->setup();
}
Console::success('[Setup] - Creating console collection: ' . $collection['$id'] . '...');
if ($dbForPlatform->getDocument('buckets', 'default')->isEmpty() &&
!$dbForPlatform->exists($dbForPlatform->getDatabase(), 'bucket_1')) {
Console::info(" └── Creating default bucket...");
$dbForPlatform->createDocument('buckets', new Document([
'$id' => ID::custom('default'),
'$collection' => ID::custom('buckets'),
'name' => 'Default',
'maximumFileSize' => (int) System::getEnv('_APP_STORAGE_LIMIT', 0),
'allowedFileExtensions' => [],
'enabled' => true,
'compression' => 'gzip',
'encryption' => true,
'antivirus' => true,
'fileSecurity' => true,
'$permissions' => [
Permission::create(Role::any()),
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'search' => 'buckets Default',
]));
$attributes = \array_map(fn ($attribute) => new Document($attribute), $collection['attributes']);
$indexes = \array_map(fn (array $index) => new Document($index), $collection['indexes']);
$bucket = $dbForPlatform->getDocument('buckets', 'default');
$dbForPlatform->createCollection($key, $attributes, $indexes);
}
if ($dbForPlatform->getDocument('buckets', 'default')->isEmpty() && !$dbForPlatform->exists($dbForPlatform->getDatabase(), 'bucket_1')) {
Console::success('[Setup] - Creating default bucket...');
$dbForPlatform->createDocument('buckets', new Document([
'$id' => ID::custom('default'),
'$collection' => ID::custom('buckets'),
'name' => 'Default',
'maximumFileSize' => (int) System::getEnv('_APP_STORAGE_LIMIT', 0), // 10MB
'allowedFileExtensions' => [],
'enabled' => true,
'compression' => 'gzip',
'encryption' => true,
'antivirus' => true,
'fileSecurity' => true,
'$permissions' => [
Permission::create(Role::any()),
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
'search' => 'buckets Default',
]));
$bucket = $dbForPlatform->getDocument('buckets', 'default');
Console::success('[Setup] - Creating files collection for default bucket...');
$files = $collections['buckets']['files'] ?? [];
if (empty($files)) {
throw new Exception('Files collection is not configured.');
}
$attributes = \array_map(fn ($attribute) => new Document($attribute), $files['attributes']);
$indexes = \array_map(fn (array $index) => new Document($index), $files['indexes']);
$dbForPlatform->createCollection('bucket_' . $bucket->getInternalId(), $attributes, $indexes);
}
$projectCollections = $collections['projects'];
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$sharedTablesV1 = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES_V1', ''));
$sharedTablesV2 = \array_diff($sharedTables, $sharedTablesV1);
$cache = $app->getResource('cache');
foreach ($sharedTablesV2 as $hostname) {
$adapter = $pools
->get($hostname)
->pop()
->getResource();
$dbForProject = (new Database($adapter, $cache))
->setDatabase('appwrite')
->setSharedTables(true)
->setTenant(null)
->setNamespace(System::getEnv('_APP_DATABASE_SHARED_NAMESPACE', ''));
try {
Console::success('[Setup] - Creating project database: ' . $hostname . '...');
$dbForProject->create();
} catch (Duplicate) {
Console::success('[Setup] - Skip: metadata table already exists');
}
foreach ($projectCollections as $key => $collection) {
if (($collection['$collection'] ?? '') !== Database::METADATA) {
continue;
}
if (!$dbForProject->getCollection($key)->isEmpty()) {
continue;
Console::info(" └── Creating files collection for default bucket...");
$files = $collections['buckets']['files'] ?? [];
if (empty($files)) {
throw new Exception('Files collection is not configured.');
}
$attributes = \array_map(fn ($attribute) => new Document($attribute), $collection['attributes']);
$indexes = \array_map(fn (array $index) => new Document($index), $collection['indexes']);
$attributes = array_map(fn ($attr) => new Document([
'$id' => ID::custom($attr['$id']),
'type' => $attr['type'],
'size' => $attr['size'],
'required' => $attr['required'],
'signed' => $attr['signed'],
'array' => $attr['array'],
'filters' => $attr['filters'],
'default' => $attr['default'] ?? null,
'format' => $attr['format'] ?? ''
]), $files['attributes']);
Console::success('[Setup] - Creating project collection: ' . $collection['$id'] . '...');
$indexes = array_map(fn ($index) => new Document([
'$id' => ID::custom($index['$id']),
'type' => $index['type'],
'attributes' => $index['attributes'],
'lengths' => $index['lengths'],
'orders' => $index['orders'],
]), $files['indexes']);
$dbForProject->createCollection($key, $attributes, $indexes);
$dbForPlatform->createCollection('bucket_' . $bucket->getInternalId(), $attributes, $indexes);
}
}
});
$pools->reclaim();
Console::success('[Setup] - Server database init completed...');
});

View file

@ -32,7 +32,7 @@ use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Functions\Specification;
@ -237,8 +237,6 @@ const METRIC_WEBHOOKS_SENT = 'webhooks.events.sent';
const METRIC_WEBHOOKS_FAILED = 'webhooks.events.failed';
const METRIC_WEBHOOK_ID_SENT = '{webhookInternalId}.webhooks.events.sent';
const METRIC_WEBHOOK_ID_FAILED = '{webhookInternalId}.webhooks.events.failed';
const METRIC_AUTH_METHOD_PHONE = 'auth.method.phone';
const METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE = METRIC_AUTH_METHOD_PHONE . '.{countryCode}';
const METRIC_MESSAGES = 'messages';
@ -269,6 +267,8 @@ const METRIC_FILES = 'files';
const METRIC_FILES_STORAGE = 'files.storage';
const METRIC_FILES_TRANSFORMATIONS = 'files.transformations';
const METRIC_BUCKET_ID_FILES_TRANSFORMATIONS = '{bucketInternalId}.files.transformations';
const METRIC_FILES_IMAGES_TRANSFORMED = 'files.imagesTransformed';
const METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED = '{bucketInternalId}.files.imagesTransformed';
const METRIC_BUCKET_ID_FILES = '{bucketInternalId}.files';
const METRIC_BUCKET_ID_FILES_STORAGE = '{bucketInternalId}.files.storage';
const METRIC_FUNCTIONS = 'functions';
@ -301,6 +301,18 @@ const METRIC_FUNCTION_ID_EXECUTIONS_MB_SECONDS = '{functionInternalId}.execution
const METRIC_NETWORK_REQUESTS = 'network.requests';
const METRIC_NETWORK_INBOUND = 'network.inbound';
const METRIC_NETWORK_OUTBOUND = 'network.outbound';
const METRIC_MAU = 'users.mau';
const METRIC_DAU = 'users.dau';
const METRIC_WAU = 'users.wau';
const METRIC_WEBHOOKS = 'webhooks';
const METRIC_PLATFORMS = 'platforms';
const METRIC_PROVIDERS = 'providers';
const METRIC_TOPICS = 'topics';
const METRIC_KEYS = 'keys';
const METRIC_RESOURCE_TYPE_ID_BUILDS = '{resourceType}.{resourceInternalId}.builds';
const METRIC_RESOURCE_TYPE_ID_BUILDS_STORAGE = '{resourceType}.{resourceInternalId}.builds.storage';
const METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS = '{resourceType}.{resourceInternalId}.deployments';
const METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE = '{resourceType}.{resourceInternalId}.deployments.storage';
// Resource types
@ -1176,6 +1188,9 @@ App::setResource('queueForWebhooks', function (Queue\Publisher $publisher) {
App::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
App::setResource('queueForStatsUsage', function (Queue\Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
App::setResource('queueForAudits', function (Queue\Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
@ -1546,6 +1561,39 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
};
}, ['pools', 'dbForPlatform', 'cache']);
App::setResource('getLogsDB', function (Group $pools, Cache $cache) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$database
->setSharedTables(true)
->setNamespace('logsV1')
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS)
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES);
// set tenant
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
}
return $database;
};
}, ['pools', 'cache']);
App::setResource('cache', function (Group $pools) {
$list = Config::getParam('pools-cache', []);
$adapters = [];

View file

@ -649,10 +649,69 @@ $image = $this->getParam('image', '');
- _APP_MAINTENANCE_RETENTION_USAGE_HOURLY
- _APP_MAINTENANCE_RETENTION_SCHEDULES
appwrite-worker-usage:
appwrite-task-stats-resources:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: worker-usage
container_name: appwrite-worker-usage
container_name: appwrite-task-stats-resources
entrypoint: stats-resources
<<: *x-logging
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_CONFIG
- _APP_DATABASE_SHARED_TABLES
- _APP_STATS_RESOURCES_INTERVAL
appwrite-worker-stats-resources:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: worker-stats-resources
container_name: appwrite-worker-stats-resources
<<: *x-logging
restart: unless-stopped
networks:
- appwrite
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_CONFIG
- _APP_STATS_RESOURCES_INTERVAL
appwrite-worker-stats-usage:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: worker-stats-usage
container_name: appwrite-worker-stats-usage
<<: *x-logging
restart: unless-stopped
networks:
@ -677,11 +736,11 @@ $image = $this->getParam('image', '');
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-worker-usage-dump:
appwrite-worker-stats-usage-dump:
image: <?php echo $organization; ?>/<?php echo $image; ?>:<?php echo $version."\n"; ?>
entrypoint: worker-usage-dump
entrypoint: worker-stats-usage-dump
<<: *x-logging
container_name: appwrite-worker-usage-dump
container_name: appwrite-worker-stats-usage-dump
restart: unless-stopped
networks:
- appwrite

View file

@ -13,8 +13,12 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\StatsUsageDump;
/** remove */
use Appwrite\Event\Usage;
use Appwrite\Event\UsageDump;
/** /remove */
use Appwrite\Platform\Appwrite;
use Swoole\Runtime;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
@ -173,6 +177,39 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf
};
}, ['pools', 'dbForPlatform', 'cache']);
Server::setResource('getLogsDB', function (Group $pools, Cache $cache) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$database
->setSharedTables(true)
->setNamespace('logsV1')
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS)
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES);
// set tenant
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
}
return $database;
};
}, ['pools', 'cache']);
Server::setResource('abuseRetention', function () {
return time() - (int) System::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400);
});
@ -240,6 +277,14 @@ Server::setResource('queueForUsageDump', function (Publisher $publisher) {
return new UsageDump($publisher);
}, ['publisher']);
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
Server::setResource('queueForStatsUsageDump', function (Publisher $publisher) {
return new StatsUsageDump($publisher);
}, ['publisher']);
Server::setResource('queueForDatabase', function (Publisher $publisher) {
return new EventDatabase($publisher);
}, ['publisher']);

3
bin/stats-resources Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/cli.php stats-resources $@

View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/worker.php stats-resources $@

3
bin/worker-stats-usage Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/worker.php stats-usage $@

View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/worker.php stats-usage-dump $@

View file

@ -722,10 +722,72 @@ services:
- _APP_MAINTENANCE_DELAY
- _APP_DATABASE_SHARED_TABLES
appwrite-worker-usage:
entrypoint: worker-usage
appwrite-task-stats-resources:
container_name: appwrite-task-stats-resources
entrypoint: stats-resources
<<: *x-logging
container_name: appwrite-worker-usage
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_CONFIG
- _APP_DATABASE_SHARED_TABLES
- _APP_STATS_RESOURCES_INTERVAL
appwrite-worker-stats-resources:
entrypoint: worker-stats-resources
<<: *x-logging
container_name: appwrite-worker-stats-resources
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
- _APP_DATABASE_SHARED_TABLES
appwrite-worker-stats-usage:
entrypoint: worker-stats-usage
<<: *x-logging
container_name: appwrite-worker-stats-usage
image: appwrite-dev
networks:
- appwrite
@ -753,10 +815,10 @@ services:
- _APP_USAGE_AGGREGATION_INTERVAL
- _APP_DATABASE_SHARED_TABLES
appwrite-worker-usage-dump:
entrypoint: worker-usage-dump
appwrite-worker-stats-usage-dump:
entrypoint: worker-stats-usage-dump
<<: *x-logging
container_name: appwrite-worker-usage-dump
container_name: appwrite-worker-stats-usage-dump
image: appwrite-dev
networks:
- appwrite
@ -783,7 +845,8 @@ services:
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
- _APP_DATABASE_SHARED_TABLES
- _APP_STATS_USAGE_DUAL_WRITING_DBS
appwrite-task-scheduler-functions:
entrypoint: schedule-functions
<<: *x-logging

View file

@ -0,0 +1 @@
Get the number of metrics that are waiting to be processed in the Appwrite stats resources queue.

View file

@ -24,11 +24,22 @@ class Event
public const FUNCTIONS_QUEUE_NAME = 'v1-functions';
public const FUNCTIONS_CLASS_NAME = 'FunctionsV1';
/** remove */
public const USAGE_QUEUE_NAME = 'v1-usage';
public const USAGE_CLASS_NAME = 'UsageV1';
public const USAGE_DUMP_QUEUE_NAME = 'v1-usage-dump';
public const USAGE_DUMP_CLASS_NAME = 'UsageDumpV1';
/** /remove */
public const STATS_RESOURCES_QUEUE_NAME = 'v1-stats-resources';
public const STATS_RESOURCES_CLASS_NAME = 'StatsResourcesV1';
public const STATS_USAGE_QUEUE_NAME = 'v1-stats-usage';
public const STATS_USAGE_CLASS_NAME = 'StatsUsageV1';
public const STATS_USAGE_DUMP_QUEUE_NAME = 'v1-stats-usage-dump';
public const STATS_USAGE_DUMP_CLASS_NAME = 'StatsUsageDumpV1';
public const WEBHOOK_QUEUE_NAME = 'v1-webhooks';
public const WEBHOOK_CLASS_NAME = 'WebhooksV1';

View file

@ -0,0 +1,29 @@
<?php
namespace Appwrite\Event;
use Utopia\Queue\Publisher;
class StatsResources extends Event
{
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);
$this
->setQueue(Event::STATS_RESOURCES_QUEUE_NAME)
->setClass(Event::STATS_RESOURCES_CLASS_NAME);
}
/**
* Prepare the payload for the usage event.
*
* @return array
*/
protected function preparePayload(): array
{
return [
'project' => $this->project
];
}
}

View file

@ -0,0 +1,65 @@
<?php
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Publisher;
class StatsUsage extends Event
{
protected array $metrics = [];
protected array $reduce = [];
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);
$this
->setQueue(Event::STATS_USAGE_QUEUE_NAME)
->setClass(Event::STATS_USAGE_CLASS_NAME);
}
/**
* Add reduce.
*
* @param Document $document
* @return self
*/
public function addReduce(Document $document): self
{
$this->reduce[] = $document;
return $this;
}
/**
* Add metric.
*
* @param string $key
* @param int $value
* @return self
*/
public function addMetric(string $key, int $value): self
{
$this->metrics[] = [
'key' => $key,
'value' => $value,
];
return $this;
}
/**
* Prepare the payload for the event
*
* @return array
*/
protected function preparePayload(): array
{
return [
'project' => $this->getProject(),
'reduce' => $this->reduce,
'metrics' => $this->metrics,
];
}
}

View file

@ -0,0 +1,44 @@
<?php
namespace Appwrite\Event;
use Utopia\Queue\Publisher;
class StatsUsageDump extends Event
{
protected array $stats;
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);
$this
->setQueue(Event::STATS_USAGE_DUMP_QUEUE_NAME)
->setClass(Event::STATS_USAGE_DUMP_CLASS_NAME);
}
/**
* Add Stats.
*
* @param array $stats
* @return self
*/
public function setStats(array $stats): self
{
$this->stats = $stats;
return $this;
}
/**
* Prepare the payload for the usage dump event.
*
* @return array
*/
protected function preparePayload(): array
{
return [
'stats' => $this->stats,
];
}
}

View file

@ -0,0 +1,90 @@
<?php
namespace Appwrite\Platform;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Database\Query;
use Utopia\Platform\Action as UtopiaAction;
class Action extends UtopiaAction
{
/**
* Log Error Callback
*
* @var callable
*/
protected mixed $logError;
/**
* Foreach Document
* Call provided callback for each document in the collection
*
* @param string $projectId
* @param string $collection
* @param array $queries
* @param callable $callback
*
* @return void
*/
protected function foreachDocument(Database $database, string $collection, array $queries = [], callable $callback = null, int $limit = 1000, bool $concurrent = false): void
{
$results = [];
$sum = $limit;
$latestDocument = null;
while ($sum === $limit) {
$newQueries = $queries;
try {
if ($latestDocument !== null) {
array_unshift($newQueries, Query::cursorAfter($latestDocument));
}
$newQueries[] = Query::limit($limit);
$database->disableValidation();
$results = $database->find($collection, $newQueries);
$database->enableValidation();
} catch (\Exception $e) {
if (!empty($this->logError)) {
call_user_func_array($this->logError, [$e, "CLI", "fetch_documents_namespace_{$database->getNamespace()}_collection{$collection}"]);
}
}
if (empty($results)) {
return;
}
$sum = count($results);
if ($concurrent) {
$callables = [];
$errors = [];
foreach ($results as $document) {
if (is_callable($callback)) {
$callables[] = Co\go(function () use ($document, $callback, &$errors) {
try {
$callback($document);
} catch (\Throwable $error) {
$errors[] = $error;
}
});
}
}
Co::join($callables);
if (!empty($errors)) {
throw new \Error("Errors found in concurrent foreachDocument: " . \json_encode($errors));
}
} else {
foreach ($results as $document) {
if (is_callable($callback)) {
$callback($document);
}
}
}
$latestDocument = $results[array_key_last($results)];
}
}
}

View file

@ -13,6 +13,7 @@ use Appwrite\Platform\Tasks\ScheduleMessages;
use Appwrite\Platform\Tasks\SDKs;
use Appwrite\Platform\Tasks\Specs;
use Appwrite\Platform\Tasks\SSL;
use Appwrite\Platform\Tasks\StatsResources;
use Appwrite\Platform\Tasks\Upgrade;
use Appwrite\Platform\Tasks\Vars;
use Appwrite\Platform\Tasks\Version;
@ -38,6 +39,7 @@ class Tasks extends Service
->addAction(Upgrade::getName(), new Upgrade())
->addAction(Vars::getName(), new Vars())
->addAction(Version::getName(), new Version())
->addAction(StatsResources::getName(), new StatsResources())
;
}
}

View file

@ -11,8 +11,13 @@ use Appwrite\Platform\Workers\Functions;
use Appwrite\Platform\Workers\Mails;
use Appwrite\Platform\Workers\Messaging;
use Appwrite\Platform\Workers\Migrations;
use Appwrite\Platform\Workers\StatsResources;
use Appwrite\Platform\Workers\StatsUsage;
use Appwrite\Platform\Workers\StatsUsageDump;
/** remove */
use Appwrite\Platform\Workers\Usage;
use Appwrite\Platform\Workers\UsageDump;
/** /remove */
use Appwrite\Platform\Workers\Webhooks;
use Utopia\Platform\Service;
@ -31,10 +36,14 @@ class Workers extends Service
->addAction(Mails::getName(), new Mails())
->addAction(Messaging::getName(), new Messaging())
->addAction(Webhooks::getName(), new Webhooks())
->addAction(StatsUsageDump::getName(), new StatsUsageDump())
->addAction(StatsUsage::getName(), new StatsUsage())
->addAction(Migrations::getName(), new Migrations())
->addAction(StatsResources::getName(), new StatsResources())
/** Remove */
->addAction(UsageDump::getName(), new UsageDump())
->addAction(Usage::getName(), new Usage())
->addAction(Migrations::getName(), new Migrations())
/** /remove */
;
}
}

View file

@ -0,0 +1,81 @@
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\StatsResources as EventStatsResources;
use Appwrite\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\System\System;
/**
* Usage count
*
* Runs every hour, schedules project
* for aggregating resource count
*/
class StatsResources extends Action
{
/**
* Log Error Callback
*
* @var callable
*/
protected mixed $logError;
/**
* Console DB
*
* @var Database
*/
protected Database $dbForPlatform;
public static function getName()
{
return 'stats-resources';
}
public function __construct()
{
$this
->desc('Schedules projects for usage count')
->inject('dbForPlatform')
->inject('logError')
->inject('queueForStatsResources')
->callback([$this, 'action']);
}
public function action(Database $dbForPlatform, callable $logError, EventStatsResources $queue): void
{
$this->logError = $logError;
$this->dbForPlatform = $dbForPlatform;
Console::title("Stats resources V1");
Console::success('Stats resources: started');
$interval = (int) System::getEnv('_APP_STATS_RESOURCES_INTERVAL', '3600');
Console::loop(function () use ($queue) {
Authorization::disable();
Authorization::setDefaultStatus(false);
$last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours'));
/**
* For each project that were accessed in last 24 hours
*/
$this->foreachDocument($this->dbForPlatform, 'projects', [
Query::greaterThanEqual('accessedAt', DateTime::format($last24Hours))
], function ($project) use ($queue) {
$queue
->setProject($project)
->trigger();
Console::success('project: ' . $project->getId() . '(' . $project->getInternalId() . ')' . ' queued');
});
}, $interval);
Console::log("Stats resources: exited");
}
}

View file

@ -5,7 +5,7 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Response\Model\Deployment;
use Appwrite\Vcs\Comment;
@ -50,12 +50,12 @@ class Builds extends Action
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForFunctions')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('cache')
->inject('dbForProject')
->inject('deviceForFunctions')
->inject('log')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
}
/**
@ -64,7 +64,7 @@ class Builds extends Action
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Usage $queueForUsage
* @param StatsUsage $queueForStatsUsage
* @param Cache $cache
* @param Database $dbForProject
* @param Device $deviceForFunctions
@ -72,7 +72,7 @@ class Builds extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -93,7 +93,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
break;
default:
@ -105,7 +105,7 @@ class Builds extends Action
* @param Device $deviceForFunctions
* @param Func $queueForFunctions
* @param Event $queueForEvents
* @param Usage $queueForUsage
* @param StatsUsage $queueForStatsUsage
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param GitHub $github
@ -118,7 +118,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -706,20 +706,20 @@ class Builds extends Action
/** Trigger usage queue */
if ($build->getAttribute('status') === 'ready') {
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUILDS_SUCCESS, 1) // per project
->addMetric(METRIC_BUILDS_COMPUTE_SUCCESS, (int)$build->getAttribute('duration', 0) * 1000)
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_SUCCESS), 1) // per function
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_SUCCESS), (int)$build->getAttribute('duration', 0) * 1000);
} elseif ($build->getAttribute('status') === 'failed') {
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUILDS_FAILED, 1) // per project
->addMetric(METRIC_BUILDS_COMPUTE_FAILED, (int)$build->getAttribute('duration', 0) * 1000)
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_FAILED), 1) // per function
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE_FAILED), (int)$build->getAttribute('duration', 0) * 1000);
}
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_BUILDS, 1) // per project
->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)

View file

@ -5,7 +5,7 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Response\Model\Execution;
use Exception;
@ -46,13 +46,13 @@ class Functions extends Action
->inject('dbForProject')
->inject('queueForFunctions')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('log')
->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked));
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
}
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -137,7 +137,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForUsage: $queueForUsage,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
@ -177,7 +177,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForUsage: $queueForUsage,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
@ -199,7 +199,7 @@ class Functions extends Action
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
queueForUsage: $queueForUsage,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
function: $function,
@ -284,7 +284,7 @@ class Functions extends Action
* @param Log $log
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Usage $queueForUsage
* @param StatsUsage $queueForStatsUsage
* @param Event $queueForEvents
* @param Document $project
* @param Document $function
@ -308,7 +308,7 @@ class Functions extends Action
Log $log,
Database $dbForProject,
Func $queueForFunctions,
Usage $queueForUsage,
StatsUsage $queueForStatsUsage,
Event $queueForEvents,
Document $project,
Document $function,
@ -552,7 +552,7 @@ class Functions extends Action
$errorCode = $th->getCode();
} finally {
/** Trigger usage queue */
$queueForUsage
$queueForStatsUsage
->setProject($project)
->addMetric(METRIC_EXECUTIONS, 1)
->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1)

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Status as MessageStatus;
use Swoole\Runtime;
use Utopia\CLI\Console;
@ -70,8 +70,8 @@ class Messaging extends Action
->inject('log')
->inject('dbForProject')
->inject('deviceForFiles')
->inject('queueForUsage')
->callback(fn (Message $message, Document $project, Log $log, Database $dbForProject, Device $deviceForFiles, Usage $queueForUsage) => $this->action($message, $project, $log, $dbForProject, $deviceForFiles, $queueForUsage));
->inject('queueForStatsUsage')
->callback(fn (Message $message, Document $project, Log $log, Database $dbForProject, Device $deviceForFiles, StatsUsage $queueForStatsUsage) => $this->action($message, $project, $log, $dbForProject, $deviceForFiles, $queueForStatsUsage));
}
/**
@ -80,7 +80,7 @@ class Messaging extends Action
* @param Log $log
* @param Database $dbForProject
* @param Device $deviceForFiles
* @param Usage $queueForUsage
* @param StatsUsage $queueForStatsUsage
* @return void
* @throws \Exception
*/
@ -90,7 +90,7 @@ class Messaging extends Action
Log $log,
Database $dbForProject,
Device $deviceForFiles,
Usage $queueForUsage
StatsUsage $queueForStatsUsage
): void {
Runtime::setHookFlags(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_TCP);
$payload = $message->getPayload() ?? [];
@ -111,7 +111,7 @@ class Messaging extends Action
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForUsage);
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
@ -123,7 +123,7 @@ class Messaging extends Action
Document $message,
Device $deviceForFiles,
Document $project,
Usage $queueForUsage
StatsUsage $queueForStatsUsage
): void {
$topicIds = $message->getAttribute('topics', []);
$targetIds = $message->getAttribute('targets', []);
@ -229,8 +229,8 @@ class Messaging extends Action
/**
* @var array<array> $results
*/
$results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForUsage) {
return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForUsage) {
$results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) {
return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) {
if (\array_key_exists($providerId, $providers)) {
$provider = $providers[$providerId];
} else {
@ -257,8 +257,8 @@ class Messaging extends Action
$adapter->getMaxMessagesPerRequest()
);
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForUsage) {
return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForUsage) {
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) {
return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $project, $queueForStatsUsage) {
$deliveredTotal = 0;
$deliveryErrors = [];
$messageData = clone $message;
@ -298,7 +298,7 @@ class Messaging extends Action
$deliveryErrors[] = 'Failed sending to targets with error: ' . $e->getMessage();
} finally {
$errorTotal = \count($deliveryErrors);
$queueForUsage
$queueForStatsUsage
->setProject($project)
->addMetric(METRIC_MESSAGES, ($deliveredTotal + $errorTotal))
->addMetric(METRIC_MESSAGES_SENT, $deliveredTotal)

View file

@ -0,0 +1,373 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Platform\Action;
use Exception;
use Throwable;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Queue\Message;
class StatsResources extends Action
{
/**
* Date format for different periods
*/
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
/**
* @var array $documents
*
* Array of documents to batch write
*
*/
protected array $documents = [];
public static function getName(): string
{
return 'stats-resources';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
->desc('Stats resources worker')
->inject('message')
->inject('project')
->inject('getProjectDB')
->inject('getLogsDB')
->inject('dbForPlatform')
->inject('logError')
->callback([$this, 'action']);
}
/**
* @param Message $message
* @param Document $project
* @param callable $getProjectDB
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, Document $project, callable $getProjectDB, callable $getLogsDB, Database $dbForPlatform, callable $logError): void
{
$this->logError = $logError;
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
if (empty($project->getAttribute('database'))) {
var_dump($payload);
return;
}
// Reset documents for each job
$this->documents = [];
$this->countForProject($dbForPlatform, $getLogsDB, $getProjectDB, $project);
}
protected function countForProject(Database $dbForPlatform, callable $getLogsDB, callable $getProjectDB, Document $project): void
{
Console::info('Begining count for: ' . $project->getId());
$dbForLogs = null;
$dbForProject = null;
try {
/** @var \Utopia\Database\Database $dbForLogs */
$dbForLogs = call_user_func($getLogsDB, $project);
/** @var \Utopia\Database\Database $dbForProject */
$dbForProject = call_user_func($getProjectDB, $project);
} catch (Throwable $th) {
Console::error('Unable to get database');
Console::error($th->getMessage());
return;
}
try {
$region = $project->getAttribute('region');
$platforms = $dbForPlatform->count('platforms', [
Query::equal('projectInternalId', [$project->getInternalId()])
]);
$webhooks = $dbForPlatform->count('webhooks', [
Query::equal('projectInternalId', [$project->getInternalId()])
]);
$keys = $dbForPlatform->count('keys', [
Query::equal('projectInternalId', [$project->getInternalId()])
]);
$databases = $dbForProject->count('databases');
$buckets = $dbForProject->count('buckets');
$users = $dbForProject->count('users');
$last30Days = (new \DateTime())->sub(\DateInterval::createFromDateString('30 days'))->format('Y-m-d 00:00:00');
$usersMAU = $dbForProject->count('users', [
Query::greaterThanEqual('accessedAt', $last30Days)
]);
$last24Hours = (new \DateTime())->sub(\DateInterval::createFromDateString('24 hours'))->format('Y-m-d h:m:00');
$usersDAU = $dbForProject->count('users', [
Query::greaterThanEqual('accessedAt', $last24Hours)
]);
$last7Days = (new \DateTime())->sub(\DateInterval::createFromDateString('7 days'))->format('Y-m-d 00:00:00');
$usersWAU = $dbForProject->count('users', [
Query::greaterThanEqual('accessedAt', $last7Days)
]);
$teams = $dbForProject->count('teams');
$functions = $dbForProject->count('functions');
$messages = $dbForProject->count('messages');
$providers = $dbForProject->count('providers');
$topics = $dbForProject->count('topics');
$metrics = [
METRIC_DATABASES => $databases,
METRIC_BUCKETS => $buckets,
METRIC_USERS => $users,
METRIC_FUNCTIONS => $functions,
METRIC_TEAMS => $teams,
METRIC_MESSAGES => $messages,
METRIC_MAU => $usersMAU,
METRIC_DAU => $usersDAU,
METRIC_WAU => $usersWAU,
METRIC_WEBHOOKS => $webhooks,
METRIC_PLATFORMS => $platforms,
METRIC_PROVIDERS => $providers,
METRIC_TOPICS => $topics,
METRIC_KEYS => $keys,
];
foreach ($metrics as $metric => $value) {
$this->createStatsDocuments($region, $metric, $value);
}
try {
$this->countForBuckets($dbForProject, $dbForLogs, $region);
} catch (Throwable $th) {
call_user_func_array($this->logError, [$th, "StatsResources", "count_for_buckets_{$project->getId()}"]);
}
try {
$this->countForDatabase($dbForProject, $dbForLogs, $region);
} catch (Throwable $th) {
call_user_func_array($this->logError, [$th, "StatsResources", "count_for_database_{$project->getId()}"]);
}
try {
$this->countForFunctions($dbForProject, $dbForLogs, $region);
} catch (Throwable $th) {
call_user_func_array($this->logError, [$th, "StatsResources", "count_for_functions_{$project->getId()}"]);
}
$this->writeDocuments($dbForLogs, $project);
} catch (Throwable $th) {
call_user_func_array($this->logError, [$th, "StatsResources", "count_for_project_{$project->getId()}"]);
}
Console::info('End of count for: ' . $project->getId());
}
protected function countForBuckets(Database $dbForProject, Database $dbForLogs, string $region)
{
$totalFiles = 0;
$totalStorage = 0;
$this->foreachDocument($dbForProject, 'buckets', [], function ($bucket) use ($dbForProject, $dbForLogs, $region, &$totalFiles, &$totalStorage) {
$files = $dbForProject->count('bucket_' . $bucket->getInternalId());
$metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES);
$this->createStatsDocuments($region, $metric, $files);
$storage = $dbForProject->sum('bucket_' . $bucket->getInternalId(), 'sizeActual');
$metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE);
$this->createStatsDocuments($region, $metric, $storage);
$totalStorage += $storage;
$totalFiles += $files;
});
$this->createStatsDocuments($region, METRIC_FILES, $totalFiles);
$this->createStatsDocuments($region, METRIC_FILES_STORAGE, $totalStorage);
}
/**
* Need separate function to count per period data
*/
protected function countImageTransformations(Database $dbForProject, Database $dbForLogs, string $region)
{
$totalImageTransformations = 0;
$totalDailyImageTransformations = 0;
$totalHourlyImageTransformations = 0;
$this->foreachDocument($dbForProject, 'buckets', [], function ($bucket) use ($dbForProject, $dbForLogs, $region, &$totalDailyImageTransformations, &$totalHourlyImageTransformations, &$totalImageTransformations) {
$imageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [
Query::isNotNull('transformedAt')
]);
$metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED);
$this->createStatsDocuments($region, $metric, $imageTransformations, 'inf');
$totalImageTransformations += $imageTransformations;
// hourly
$time = \date($this->periods['1h'], \time());
$start = $time;
$end = (new \DateTime($start))->format('Y-m-d H:59:59');
$hourlyImageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [
Query::greaterThanEqual('transformedAt', $start),
Query::lessThanEqual('transformedAt', $end),
]);
$metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED);
$this->createStatsDocuments($region, $metric, $hourlyImageTransformations, '1h');
$totalHourlyImageTransformations += $hourlyImageTransformations;
// daily
$time = \date($this->periods['1d'], \time());
$start = $time;
$end = (new \DateTime($start))->format('Y-m-d 11:59:59');
$dailyImageTransformations = $dbForProject->count('bucket_' . $bucket->getInternalId(), [
Query::greaterThanEqual('transformedAt', $start),
Query::lessThanEqual('transformedAt', $end),
]);
$metric = str_replace('{bucketInternalId}', $bucket->getInternalId(), METRIC_BUCKET_ID_FILES_IMAGES_TRANSFORMED);
$this->createStatsDocuments($region, $metric, $dailyImageTransformations, '1d');
$totalDailyImageTransformations += $dailyImageTransformations;
});
$this->createStatsDocuments($region, METRIC_FILES_IMAGES_TRANSFORMED, $totalImageTransformations, 'inf');
}
protected function countForDatabase(Database $dbForProject, Database $dbForLogs, string $region)
{
$totalCollections = 0;
$totalDocuments = 0;
$this->foreachDocument($dbForProject, 'databases', [], function ($database) use ($dbForProject, $dbForLogs, $region, &$totalCollections, &$totalDocuments) {
$collections = $dbForProject->count('database_' . $database->getInternalId());
$metric = str_replace('{databaseInternalId}', $database->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS);
$this->createStatsDocuments($region, $metric, $collections);
$documents = $this->countForCollections($dbForProject, $dbForLogs, $database, $region);
$totalDocuments += $documents;
$totalCollections += $collections;
});
$this->createStatsDocuments($region, METRIC_COLLECTIONS, $totalCollections);
$this->createStatsDocuments($region, METRIC_DOCUMENTS, $totalDocuments);
}
protected function countForCollections(Database $dbForProject, Database $dbForLogs, Document $database, string $region): int
{
$databaseDocuments = 0;
$this->foreachDocument($dbForProject, 'database_' . $database->getInternalId(), [], function ($collection) use ($dbForProject, $dbForLogs, $database, $region, &$totalCollections, &$databaseDocuments) {
$documents = $dbForProject->count('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
$metric = str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$database->getInternalId(), $collection->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS);
$this->createStatsDocuments($region, $metric, $documents);
$databaseDocuments += $documents;
});
$metric = str_replace(['{databaseInternalId}'], [$database->getInternalId()], METRIC_DATABASE_ID_DOCUMENTS);
$this->createStatsDocuments($region, $metric, $databaseDocuments);
return $databaseDocuments;
}
protected function countForFunctions(Database $dbForProject, Database $dbForLogs, string $region)
{
$deploymentsStorage = $dbForProject->sum('deployments', 'size');
$buildsStorage = $dbForProject->sum('builds', 'size');
$this->createStatsDocuments($region, METRIC_DEPLOYMENTS_STORAGE, $deploymentsStorage);
$this->createStatsDocuments($region, METRIC_BUILDS_STORAGE, $buildsStorage);
$deployments = $dbForProject->count('deployments');
$builds = $dbForProject->count('builds');
$this->createStatsDocuments($region, METRIC_DEPLOYMENTS, $deployments);
$this->createStatsDocuments($region, METRIC_BUILDS, $builds);
$this->foreachDocument($dbForProject, 'functions', [], function (Document $function) use ($dbForProject, $dbForLogs, $region) {
$functionDeploymentsStorage = $dbForProject->sum('deployments', 'size', [
Query::equal('resourceInternalId', [$function->getInternalId()]),
Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]),
]);
$this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $functionDeploymentsStorage);
$functionDeployments = $dbForProject->count('deployments', [
Query::equal('resourceInternalId', [$function->getInternalId()]),
Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]),
]);
$this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $functionDeployments);
/**
* As deployments and builds have 1-1 relationship,
* the count for one should match the other
*/
$this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_BUILDS), $functionDeployments);
$functionBuildsStorage = 0;
$this->foreachDocument($dbForProject, 'deployments', [
Query::equal('resourceInternalId', [$function->getInternalId()]),
Query::equal('resourceType', [RESOURCE_TYPE_FUNCTIONS]),
], function (Document $deployment) use ($dbForProject, &$functionBuildsStorage): void {
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
$functionBuildsStorage += $build->getAttribute('size', 0);
});
$this->createStatsDocuments($region, str_replace(['{resourceType}','{resourceInternalId}'], [RESOURCE_TYPE_FUNCTIONS,$function->getInternalId()], METRIC_RESOURCE_TYPE_ID_BUILDS_STORAGE), $functionBuildsStorage);
});
}
protected function createStatsDocuments(string $region, string $metric, int $value, ?string $period = null)
{
if ($period === null) {
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : \date($format, \time());
$id = \md5("{$time}_{$period}_{$metric}");
$this->documents[] = new Document([
'$id' => $id,
'metric' => $metric,
'period' => $period,
'region' => $region,
'value' => $value,
]);
}
} else {
$time = 'inf' === $period ? null : \date($this->periods[$period], \time());
$id = \md5("{$time}_{$period}_{$metric}");
$this->documents[] = new Document([
'$id' => $id,
'metric' => $metric,
'period' => $period,
'region' => $region,
'value' => $value,
]);
}
}
protected function writeDocuments(Database $dbForLogs, Document $project): void
{
$dbForLogs->createOrUpdateDocuments(
'stats',
$this->documents
);
$this->documents = [];
Console::success('Stats written to logs db for project: ' . $project->getId() . '(' . $project->getInternalId() . ')');
}
}

View file

@ -0,0 +1,246 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Event\StatsUsageDump;
use Exception;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
class StatsUsage extends Action
{
private array $stats = [];
private int $lastTriggeredTime = 0;
private int $keys = 0;
private const INFINITY_PERIOD = '_inf_';
private const KEYS_THRESHOLD = 10000;
public static function getName(): string
{
return 'stats-usage';
}
/**
* @throws Exception
*/
public function __construct()
{
$this
->desc('Stats usage worker')
->inject('message')
->inject('getProjectDB')
->inject('queueForStatsUsageDump')
->callback([$this, 'action']);
$this->lastTriggeredTime = time();
}
/**
* @param Message $message
* @param callable $getProjectDB
* @param StatsUsageDump $queueForStatsUsageDump
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, callable $getProjectDB, StatsUsageDump $queueForStatsUsageDump): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
//Todo Figure out way to preserve keys when the container is being recreated @shimonewman
$aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
$project = new Document($payload['project'] ?? []);
$projectId = $project->getInternalId();
foreach ($payload['reduce'] ?? [] as $document) {
if (empty($document)) {
continue;
}
$this->reduce(
project: $project,
document: new Document($document),
metrics: $payload['metrics'],
getProjectDB: $getProjectDB
);
}
$this->stats[$projectId]['project'] = $project;
$this->stats[$projectId]['receivedAt'] = DateTime::now();
foreach ($payload['metrics'] ?? [] as $metric) {
$this->keys++;
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
continue;
}
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
}
// If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
if (
$this->keys >= self::KEYS_THRESHOLD ||
(time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0)
) {
Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
$queueForStatsUsageDump
->setStats($this->stats)
->trigger();
$this->stats = [];
$this->keys = 0;
$this->lastTriggeredTime = time();
}
}
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope.
* @param Document $project
* @param Document $document
* @param array $metrics
* @param callable $getProjectDB
* @return void
*/
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
{
$dbForProject = $getProjectDB($project);
try {
switch (true) {
case $document->getCollection() === 'users': // users
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
if (!empty($sessions)) {
$metrics[] = [
'key' => METRIC_SESSIONS,
'value' => ($sessions * -1),
];
}
break;
case $document->getCollection() === 'databases': // databases
$collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS)));
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS)));
if (!empty($collections['value'])) {
$metrics[] = [
'key' => METRIC_COLLECTIONS,
'value' => ($collections['value'] * -1),
];
}
if (!empty($documents['value'])) {
$metrics[] = [
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
];
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS)));
if (!empty($documents['value'])) {
$metrics[] = [
'key' => METRIC_DOCUMENTS,
'value' => ($documents['value'] * -1),
];
$metrics[] = [
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
'value' => ($documents['value'] * -1),
];
}
break;
case $document->getCollection() === 'buckets':
$files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES)));
$storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE)));
if (!empty($files['value'])) {
$metrics[] = [
'key' => METRIC_FILES,
'value' => ($files['value'] * -1),
];
}
if (!empty($storage['value'])) {
$metrics[] = [
'key' => METRIC_FILES_STORAGE,
'value' => ($storage['value'] * -1),
];
}
break;
case $document->getCollection() === 'functions':
$deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
$deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
$builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
$buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
$buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
$executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
$executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
if (!empty($deployments['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS,
'value' => ($deployments['value'] * -1),
];
}
if (!empty($deploymentsStorage['value'])) {
$metrics[] = [
'key' => METRIC_DEPLOYMENTS_STORAGE,
'value' => ($deploymentsStorage['value'] * -1),
];
}
if (!empty($builds['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS,
'value' => ($builds['value'] * -1),
];
}
if (!empty($buildsStorage['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_STORAGE,
'value' => ($buildsStorage['value'] * -1),
];
}
if (!empty($buildsCompute['value'])) {
$metrics[] = [
'key' => METRIC_BUILDS_COMPUTE,
'value' => ($buildsCompute['value'] * -1),
];
}
if (!empty($executions['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS,
'value' => ($executions['value'] * -1),
];
}
if (!empty($executionsCompute['value'])) {
$metrics[] = [
'key' => METRIC_EXECUTIONS_COMPUTE,
'value' => ($executionsCompute['value'] * -1),
];
}
break;
default:
break;
}
} catch (\Throwable $e) {
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
}
}
}

View file

@ -0,0 +1,350 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Extend\Exception;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Registry\Registry;
use Utopia\System\System;
class StatsUsageDump extends Action
{
public const METRIC_COLLECTION_LEVEL_STORAGE = 4;
public const METRIC_DATABASE_LEVEL_STORAGE = 3;
public const METRIC_PROJECT_LEVEL_STORAGE = 2;
protected array $stats = [];
protected Registry $register;
/**
* Metrics to skip writing to logsDB
* As these metrics are calculated separately
* by logs DB
* @var array
*/
protected array $skipBaseMetrics = [
METRIC_DATABASES => true,
METRIC_BUCKETS => true,
METRIC_USERS => true,
METRIC_FUNCTIONS => true,
METRIC_TEAMS => true,
METRIC_MESSAGES => true,
METRIC_MAU => true,
METRIC_WEBHOOKS => true,
METRIC_PLATFORMS => true,
METRIC_PROVIDERS => true,
METRIC_TOPICS => true,
METRIC_KEYS => true,
METRIC_FILES => true,
METRIC_FILES_STORAGE => true,
METRIC_DEPLOYMENTS_STORAGE => true,
METRIC_BUILDS_STORAGE => true,
METRIC_DEPLOYMENTS => true,
METRIC_BUILDS => true,
METRIC_COLLECTIONS => true,
METRIC_DOCUMENTS => true,
];
/**
* Skip metrics associated with parent IDs
* these need to be checked individually with `str_ends_with`
*/
protected array $skipParentIdMetrics = [
'.files',
'.files.storage',
'.collections',
'.documents',
'.deployments',
'.deployments.storage',
'.builds',
'.builds.storage',
];
/**
* @var callable
*/
protected mixed $getLogsDB;
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
public static function getName(): string
{
return 'stats-usage-dump';
}
/**
* @throws \Exception
*/
public function __construct()
{
$this
->inject('message')
->inject('getProjectDB')
->inject('getLogsDB')
->inject('register')
->callback([$this, 'action']);
}
/**
* @param Message $message
* @param callable $getProjectDB
* @param callable $getLogsDB
* @return void
* @throws Exception
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, callable $getProjectDB, callable $getLogsDB, Registry $register): void
{
$this->getLogsDB = $getLogsDB;
$this->register = $register;
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
$receivedAt = $stats['receivedAt'] ?? 'NONE';
if ($numberOfKeys === 0) {
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
/** @var \Utopia\Database\Database $dbForProject */
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
if (str_contains($key, METRIC_DATABASES_STORAGE)) {
try {
$this->handleDatabaseStorage($key, $dbForProject, $project);
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] failed to calculate database storage for key [' . $key . '] ' . $e->getMessage());
}
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
$document = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
$documentClone = new Document($document->getArrayCopy());
$dbForProject->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
$this->writeToLogsDB($project, $documentClone);
}
}
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
}
private function handleDatabaseStorage(string $key, Database $dbForProject, Document $project): void
{
$data = explode('.', $key);
$start = microtime(true);
$updateMetric = function (Database $dbForProject, Document $project, int $value, string $key, string $period, string|null $time) {
$id = \md5("{$time}_{$period}_{$key}");
$document = new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => System::getEnv('_APP_REGION', 'default'),
]);
$documentClone = new Document($document->getArrayCopy());
$dbForProject->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
$this->writeToLogsDB($project, $documentClone);
};
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
$value = 0;
$previousValue = 0;
try {
$previousValue = ($dbForProject->getDocument('stats', $id))->getAttribute('value', 0);
} catch (\Exception $e) {
// No previous value
}
switch (count($data)) {
// Collection Level
case self::METRIC_COLLECTION_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Collection Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collectionInternalId = $data[1];
try {
$value = $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collectionInternalId);
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
// Compare with previous value
$diff = $value - $previousValue;
if ($diff === 0) {
break;
}
// Update Collection
$updateMetric($dbForProject, $project, $diff, $key, $period, $time);
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
// Database Level
case self::METRIC_DATABASE_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Database Level Storage Calculation [' . $key . ']');
$databaseInternalId = $data[0];
$collections = [];
try {
$collections = $dbForProject->find('database_' . $databaseInternalId);
} catch (\Exception $e) {
// Database not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
foreach ($collections as $collection) {
try {
$value += $dbForProject->getSizeOfCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
}
$diff = $value - $previousValue;
if ($diff === 0) {
break;
}
// Update Database
$databaseKey = str_replace(['{databaseInternalId}'], [$data[0]], METRIC_DATABASE_ID_STORAGE);
$updateMetric($dbForProject, $project, $diff, $databaseKey, $period, $time);
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
// Project Level
case self::METRIC_PROJECT_LEVEL_STORAGE:
Console::log('[' . DateTime::now() . '] Project Level Storage Calculation [' . $key . ']');
// Get all project databases
$databases = $dbForProject->find('database');
// Recalculate all databases
foreach ($databases as $database) {
$collections = $dbForProject->find('database_' . $database->getInternalId());
foreach ($collections as $collection) {
try {
$value += $dbForProject->getSizeOfCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId());
} catch (\Exception $e) {
// Collection not found
if ($e->getMessage() !== 'Collection not found') {
throw $e;
}
}
}
}
$diff = $value - $previousValue;
// Update Project
$projectKey = METRIC_DATABASES_STORAGE;
$updateMetric($dbForProject, $project, $diff, $projectKey, $period, $time);
break;
}
}
$end = microtime(true);
console::log('[' . DateTime::now() . '] DB Storage Calculation [' . $key . '] took ' . (($end - $start) * 1000) . ' milliseconds');
}
protected function writeToLogsDB(Document $project, Document $document)
{
if (!System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', false)) {
Console::log('Dual Writing is disabled. Skipping...');
return;
}
if (array_key_exists($document->getAttribute('metric'), $this->skipBaseMetrics)) {
return;
}
foreach ($this->skipParentIdMetrics as $skipMetric) {
if (str_ends_with($document->getAttribute('metric'), $skipMetric)) {
return;
}
}
/** @var \Utopia\Database\Database $dbForLogs*/
$dbForLogs = call_user_func($this->getLogsDB, $project);
try {
$dbForLogs->createOrUpdateDocumentsWithIncrease(
'stats',
'value',
[$document]
);
Console::success('Usage logs pushed to Logs DB');
} catch (\Throwable $th) {
Console::error($th->getMessage());
}
$this->register->get('pools')->get('logs')->reclaim();
}
}

View file

@ -3,7 +3,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Mail;
use Appwrite\Event\Usage;
use Appwrite\Event\StatsUsage;
use Appwrite\Template\Template;
use Exception;
use Utopia\Database\Database;
@ -35,9 +35,9 @@ class Webhooks extends Action
->inject('project')
->inject('dbForPlatform')
->inject('queueForMails')
->inject('queueForUsage')
->inject('queueForStatsUsage')
->inject('log')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForMails, $queueForUsage, $log));
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForMails, $queueForStatsUsage, $log));
}
/**
@ -49,7 +49,7 @@ class Webhooks extends Action
* @return void
* @throws Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage, Log $log): void
{
$this->errors = [];
$payload = $message->getPayload() ?? [];
@ -66,7 +66,7 @@ class Webhooks extends Action
foreach ($project->getAttribute('webhooks', []) as $webhook) {
if (array_intersect($webhook->getAttribute('events', []), $events)) {
$this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails, $queueForUsage);
$this->execute($events, $webhookPayload, $webhook, $user, $project, $dbForPlatform, $queueForMails, $queueForStatsUsage);
}
}
@ -85,7 +85,7 @@ class Webhooks extends Action
* @param Mail $queueForMails
* @return void
*/
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage): void
private function execute(array $events, string $payload, Document $webhook, Document $user, Document $project, Database $dbForPlatform, Mail $queueForMails, StatsUsage $queueForStatsUsage): void
{
if ($webhook->getAttribute('enabled') !== true) {
return;
@ -168,7 +168,7 @@ class Webhooks extends Action
$dbForPlatform->purgeCachedDocument('projects', $project->getId());
$this->errors[] = $logs;
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_WEBHOOKS_FAILED, 1)
->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_FAILED), 1)
;
@ -178,13 +178,13 @@ class Webhooks extends Action
$webhook->setAttribute('attempts', 0); // Reset attempts on success
$dbForPlatform->updateDocument('webhooks', $webhook->getId(), $webhook);
$dbForPlatform->purgeCachedDocument('projects', $project->getId());
$queueForUsage
$queueForStatsUsage
->addMetric(METRIC_WEBHOOKS_SENT, 1)
->addMetric(str_replace('{webhookInternalId}', $webhook->getInternalId(), METRIC_WEBHOOK_ID_SENT), 1)
;
}
$queueForUsage
$queueForStatsUsage
->setProject($project)
->trigger();
}

View file

@ -494,12 +494,12 @@ class HealthCustomServerTest extends Scope
return [];
}
public function testUsageSuccess()
public function testStatsResources()
{
/**
* Test for SUCCESS
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/usage', array_merge([
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-resources', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
@ -511,19 +511,19 @@ class HealthCustomServerTest extends Scope
/**
* Test for FAILURE
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/usage?threshold=0', array_merge([
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-resources?threshold=0', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
$this->assertEquals(503, $response['headers']['status-code']);
}
public function testUsageDumpSuccess()
public function testUsageSuccess()
{
/**
* Test for SUCCESS
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/usage-dump', array_merge([
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
@ -535,7 +535,31 @@ class HealthCustomServerTest extends Scope
/**
* Test for FAILURE
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/usage-dump?threshold=0', array_merge([
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage?threshold=0', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
$this->assertEquals(503, $response['headers']['status-code']);
}
public function testStatsUsageDumpSuccess()
{
/**
* Test for SUCCESS
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertIsInt($response['body']['size']);
$this->assertLessThan(100, $response['body']['size']);
/**
* Test for FAILURE
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue/stats-usage-dump?threshold=0', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);

View file

@ -89,9 +89,9 @@ services:
- _APP_FUNCTIONS_MEMORY_SWAP
- _APP_EXECUTOR_HOST
appwrite-worker-usage:
entrypoint: worker-usage
container_name: appwrite-worker-usage
appwrite-worker-stats-usage:
entrypoint: worker-stats-usage
container_name: appwrite-worker-stats-usage
build:
context: .
restart: unless-stopped