2023-10-25 08:38:44 +00:00
|
|
|
<?php
|
|
|
|
|
|
|
|
|
|
namespace Appwrite\Platform\Workers;
|
|
|
|
|
|
2024-03-06 17:34:21 +00:00
|
|
|
use Exception;
|
2025-03-27 08:25:48 +00:00
|
|
|
use Throwable;
|
2023-10-25 08:38:44 +00:00
|
|
|
use Utopia\CLI\Console;
|
2024-01-18 16:52:20 +00:00
|
|
|
use Utopia\Database\DateTime;
|
2023-10-25 08:38:44 +00:00
|
|
|
use Utopia\Database\Document;
|
2025-01-30 04:33:42 +00:00
|
|
|
use Utopia\Platform\Action;
|
2023-10-25 08:38:44 +00:00
|
|
|
use Utopia\Queue\Message;
|
2025-03-27 08:25:48 +00:00
|
|
|
use Utopia\Registry\Registry;
|
2024-04-01 11:02:47 +00:00
|
|
|
use Utopia\System\System;
|
2023-10-25 08:38:44 +00:00
|
|
|
|
2025-01-30 04:53:53 +00:00
|
|
|
class StatsUsage extends Action
|
2023-10-25 08:38:44 +00:00
|
|
|
{
|
2025-03-27 08:25:48 +00:00
|
|
|
/**
|
|
|
|
|
* In memory per project metrics calculation
|
|
|
|
|
*/
|
2024-01-28 09:28:59 +00:00
|
|
|
private array $stats = [];
|
|
|
|
|
private int $lastTriggeredTime = 0;
|
|
|
|
|
private int $keys = 0;
|
|
|
|
|
private const INFINITY_PERIOD = '_inf_';
|
2025-02-23 09:17:57 +00:00
|
|
|
private const BATCH_SIZE_DEVELOPMENT = 1;
|
|
|
|
|
private const BATCH_SIZE_PRODUCTION = 10_000;
|
2024-01-28 09:28:59 +00:00
|
|
|
|
2025-03-27 08:25:48 +00:00
|
|
|
/**
|
|
|
|
|
* Stats for batch write separated per project
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
private array $projects = [];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Array of stat documents to batch write to logsDB
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
private array $statDocuments = [];
|
|
|
|
|
|
|
|
|
|
protected Registry $register;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Metrics to skip writing to logsDB
|
|
|
|
|
* As these metrics are calculated separately
|
|
|
|
|
* by logs DB
|
|
|
|
|
* @var array
|
|
|
|
|
*/
|
|
|
|
|
protected array $skipBaseMetrics = [
|
|
|
|
|
METRIC_DATABASES => true,
|
|
|
|
|
METRIC_BUCKETS => true,
|
|
|
|
|
METRIC_USERS => true,
|
|
|
|
|
METRIC_FUNCTIONS => true,
|
|
|
|
|
METRIC_TEAMS => true,
|
|
|
|
|
METRIC_MESSAGES => true,
|
|
|
|
|
METRIC_MAU => true,
|
|
|
|
|
METRIC_WEBHOOKS => true,
|
|
|
|
|
METRIC_PLATFORMS => true,
|
|
|
|
|
METRIC_PROVIDERS => true,
|
|
|
|
|
METRIC_TOPICS => true,
|
|
|
|
|
METRIC_KEYS => true,
|
|
|
|
|
METRIC_FILES => true,
|
|
|
|
|
METRIC_FILES_STORAGE => true,
|
|
|
|
|
METRIC_DEPLOYMENTS_STORAGE => true,
|
|
|
|
|
METRIC_BUILDS_STORAGE => true,
|
|
|
|
|
METRIC_DEPLOYMENTS => true,
|
|
|
|
|
METRIC_BUILDS => true,
|
|
|
|
|
METRIC_COLLECTIONS => true,
|
|
|
|
|
METRIC_DOCUMENTS => true,
|
|
|
|
|
METRIC_DATABASES_STORAGE => true,
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Skip metrics associated with parent IDs
|
|
|
|
|
* these need to be checked individually with `str_ends_with`
|
|
|
|
|
*/
|
|
|
|
|
protected array $skipParentIdMetrics = [
|
|
|
|
|
'.files',
|
|
|
|
|
'.files.storage',
|
|
|
|
|
'.collections',
|
|
|
|
|
'.documents',
|
|
|
|
|
'.deployments',
|
|
|
|
|
'.deployments.storage',
|
|
|
|
|
'.builds',
|
|
|
|
|
'.builds.storage',
|
|
|
|
|
'.databases.storage'
|
|
|
|
|
];
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @var callable
|
|
|
|
|
*/
|
|
|
|
|
protected mixed $getLogsDB;
|
|
|
|
|
|
|
|
|
|
protected array $periods = [
|
|
|
|
|
'1h' => 'Y-m-d H:00',
|
|
|
|
|
'1d' => 'Y-m-d 00:00',
|
|
|
|
|
'inf' => '0000-00-00 00:00'
|
|
|
|
|
];
|
|
|
|
|
|
2023-10-25 08:38:44 +00:00
|
|
|
public static function getName(): string
|
|
|
|
|
{
|
2025-02-06 04:17:49 +00:00
|
|
|
return 'stats-usage';
|
2023-10-25 08:38:44 +00:00
|
|
|
}
|
|
|
|
|
|
2025-02-23 09:17:57 +00:00
|
|
|
private function getBatchSize(): int
|
|
|
|
|
{
|
|
|
|
|
return System::getEnv('_APP_ENV', 'development') === 'development'
|
|
|
|
|
? self::BATCH_SIZE_DEVELOPMENT
|
|
|
|
|
: self::BATCH_SIZE_PRODUCTION;
|
|
|
|
|
}
|
2023-10-25 08:38:44 +00:00
|
|
|
/**
|
|
|
|
|
* @throws Exception
|
|
|
|
|
*/
|
|
|
|
|
public function __construct()
|
|
|
|
|
{
|
2025-01-30 04:33:42 +00:00
|
|
|
|
2023-10-25 08:38:44 +00:00
|
|
|
$this
|
2025-02-06 04:17:49 +00:00
|
|
|
->desc('Stats usage worker')
|
|
|
|
|
->inject('message')
|
|
|
|
|
->inject('getProjectDB')
|
2025-03-27 08:25:48 +00:00
|
|
|
->inject('getLogsDB')
|
|
|
|
|
->inject('register')
|
2025-02-06 04:17:49 +00:00
|
|
|
->callback([$this, 'action']);
|
2025-01-30 04:33:42 +00:00
|
|
|
|
2024-01-28 09:28:59 +00:00
|
|
|
$this->lastTriggeredTime = time();
|
2023-10-25 08:38:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @param Message $message
|
2023-12-24 18:38:15 +00:00
|
|
|
* @param callable $getProjectDB
|
2025-03-27 08:25:48 +00:00
|
|
|
* @param callable $getLogsDB
|
|
|
|
|
* @param Registry $register
|
2023-10-25 08:38:44 +00:00
|
|
|
* @return void
|
|
|
|
|
* @throws \Utopia\Database\Exception
|
2023-12-24 18:38:15 +00:00
|
|
|
* @throws Exception
|
2023-10-25 08:38:44 +00:00
|
|
|
*/
|
2025-03-27 08:25:48 +00:00
|
|
|
public function action(Message $message, callable $getProjectDB, callable $getLogsDB, Registry $register): void
|
2023-10-25 08:38:44 +00:00
|
|
|
{
|
2025-03-27 08:25:48 +00:00
|
|
|
$this->getLogsDB = $getLogsDB;
|
|
|
|
|
$this->register = $register;
|
2023-10-25 08:38:44 +00:00
|
|
|
$payload = $message->getPayload() ?? [];
|
|
|
|
|
if (empty($payload)) {
|
|
|
|
|
throw new Exception('Missing payload');
|
|
|
|
|
}
|
2025-01-30 04:33:42 +00:00
|
|
|
//Todo Figure out way to preserve keys when the container is being recreated @shimonewman
|
2023-10-25 08:38:44 +00:00
|
|
|
|
2025-02-09 05:15:26 +00:00
|
|
|
$aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
|
2025-01-30 04:33:42 +00:00
|
|
|
$project = new Document($payload['project'] ?? []);
|
2023-10-25 08:38:44 +00:00
|
|
|
$projectId = $project->getInternalId();
|
|
|
|
|
foreach ($payload['reduce'] ?? [] as $document) {
|
|
|
|
|
if (empty($document)) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->reduce(
|
2023-12-24 18:38:15 +00:00
|
|
|
project: $project,
|
2023-10-25 08:38:44 +00:00
|
|
|
document: new Document($document),
|
|
|
|
|
metrics: $payload['metrics'],
|
2023-12-24 18:38:15 +00:00
|
|
|
getProjectDB: $getProjectDB
|
2023-10-25 08:38:44 +00:00
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-30 04:33:42 +00:00
|
|
|
$this->stats[$projectId]['project'] = $project;
|
2024-02-14 21:05:01 +00:00
|
|
|
$this->stats[$projectId]['receivedAt'] = DateTime::now();
|
2023-10-25 08:38:44 +00:00
|
|
|
foreach ($payload['metrics'] ?? [] as $metric) {
|
2024-03-06 17:34:21 +00:00
|
|
|
$this->keys++;
|
2024-01-28 09:28:59 +00:00
|
|
|
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
|
|
|
|
|
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
|
2023-10-25 08:38:44 +00:00
|
|
|
continue;
|
|
|
|
|
}
|
2024-01-28 09:28:59 +00:00
|
|
|
|
|
|
|
|
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
|
2023-10-25 08:38:44 +00:00
|
|
|
}
|
2024-02-09 11:22:48 +00:00
|
|
|
|
2024-02-21 12:29:28 +00:00
|
|
|
// If keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
|
2024-01-28 09:28:59 +00:00
|
|
|
if (
|
2025-02-23 09:17:57 +00:00
|
|
|
$this->keys >= $this->getBatchSize() ||
|
2025-01-30 04:33:42 +00:00
|
|
|
(time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0)
|
2024-01-28 09:28:59 +00:00
|
|
|
) {
|
2024-02-21 12:29:28 +00:00
|
|
|
Console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
|
2024-02-08 19:08:29 +00:00
|
|
|
|
2025-03-27 08:25:48 +00:00
|
|
|
$this->commitToDB($getProjectDB);
|
2024-01-28 09:28:59 +00:00
|
|
|
|
2024-01-30 11:24:57 +00:00
|
|
|
$this->stats = [];
|
2024-01-28 09:28:59 +00:00
|
|
|
$this->keys = 0;
|
|
|
|
|
$this->lastTriggeredTime = time();
|
2023-10-25 08:38:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-06 17:34:21 +00:00
|
|
|
/**
|
|
|
|
|
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
|
|
|
|
|
* When we remove a parent document we need to deduct his children aggregation from the project scope.
|
|
|
|
|
* @param Document $project
|
|
|
|
|
* @param Document $document
|
|
|
|
|
* @param array $metrics
|
|
|
|
|
* @param callable $getProjectDB
|
|
|
|
|
* @return void
|
|
|
|
|
*/
|
2023-12-24 18:38:15 +00:00
|
|
|
private function reduce(Document $project, Document $document, array &$metrics, callable $getProjectDB): void
|
2023-10-25 08:38:44 +00:00
|
|
|
{
|
2023-12-24 18:38:15 +00:00
|
|
|
$dbForProject = $getProjectDB($project);
|
2023-10-25 08:38:44 +00:00
|
|
|
|
2023-12-24 18:38:15 +00:00
|
|
|
try {
|
2023-10-25 08:38:44 +00:00
|
|
|
switch (true) {
|
|
|
|
|
case $document->getCollection() === 'users': // users
|
|
|
|
|
$sessions = count($document->getAttribute(METRIC_SESSIONS, 0));
|
|
|
|
|
if (!empty($sessions)) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_SESSIONS,
|
|
|
|
|
'value' => ($sessions * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
case $document->getCollection() === 'databases': // databases
|
2024-03-05 09:36:23 +00:00
|
|
|
$collections = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_COLLECTIONS)));
|
|
|
|
|
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{databaseInternalId}', $document->getInternalId(), METRIC_DATABASE_ID_DOCUMENTS)));
|
2023-10-25 08:38:44 +00:00
|
|
|
if (!empty($collections['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_COLLECTIONS,
|
|
|
|
|
'value' => ($collections['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($documents['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_DOCUMENTS,
|
|
|
|
|
'value' => ($documents['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
|
|
|
|
|
$parts = explode('_', $document->getCollection());
|
|
|
|
|
$databaseInternalId = $parts[1] ?? 0;
|
2024-03-05 09:36:23 +00:00
|
|
|
$documents = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $document->getInternalId()], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS)));
|
2023-10-25 08:38:44 +00:00
|
|
|
|
|
|
|
|
if (!empty($documents['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_DOCUMENTS,
|
|
|
|
|
'value' => ($documents['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS),
|
|
|
|
|
'value' => ($documents['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case $document->getCollection() === 'buckets':
|
2024-03-05 09:36:23 +00:00
|
|
|
$files = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES)));
|
|
|
|
|
$storage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{bucketInternalId}', $document->getInternalId(), METRIC_BUCKET_ID_FILES_STORAGE)));
|
2023-10-25 08:38:44 +00:00
|
|
|
|
|
|
|
|
if (!empty($files['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_FILES,
|
|
|
|
|
'value' => ($files['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($storage['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_FILES_STORAGE,
|
|
|
|
|
'value' => ($storage['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
case $document->getCollection() === 'functions':
|
2024-03-05 09:36:23 +00:00
|
|
|
$deployments = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS)));
|
|
|
|
|
$deploymentsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace(['{resourceType}', '{resourceInternalId}'], ['functions', $document->getInternalId()], METRIC_FUNCTION_ID_DEPLOYMENTS_STORAGE)));
|
|
|
|
|
$builds = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS)));
|
|
|
|
|
$buildsStorage = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_STORAGE)));
|
|
|
|
|
$buildsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_BUILDS_COMPUTE)));
|
|
|
|
|
$executions = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS)));
|
|
|
|
|
$executionsCompute = $dbForProject->getDocument('stats', md5(self::INFINITY_PERIOD . str_replace('{functionInternalId}', $document->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE)));
|
2023-10-25 08:38:44 +00:00
|
|
|
|
|
|
|
|
if (!empty($deployments['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_DEPLOYMENTS,
|
|
|
|
|
'value' => ($deployments['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($deploymentsStorage['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_DEPLOYMENTS_STORAGE,
|
|
|
|
|
'value' => ($deploymentsStorage['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($builds['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_BUILDS,
|
|
|
|
|
'value' => ($builds['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($buildsStorage['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_BUILDS_STORAGE,
|
|
|
|
|
'value' => ($buildsStorage['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($buildsCompute['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_BUILDS_COMPUTE,
|
|
|
|
|
'value' => ($buildsCompute['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($executions['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_EXECUTIONS,
|
|
|
|
|
'value' => ($executions['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!empty($executionsCompute['value'])) {
|
|
|
|
|
$metrics[] = [
|
2024-03-06 17:34:21 +00:00
|
|
|
'key' => METRIC_EXECUTIONS_COMPUTE,
|
|
|
|
|
'value' => ($executionsCompute['value'] * -1),
|
2023-10-25 08:38:44 +00:00
|
|
|
];
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
default:
|
|
|
|
|
break;
|
|
|
|
|
}
|
2024-02-08 01:17:54 +00:00
|
|
|
} catch (\Throwable $e) {
|
2023-12-24 18:38:15 +00:00
|
|
|
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
|
2023-10-25 08:38:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
2025-03-27 08:25:48 +00:00
|
|
|
|
|
|
|
|
public function commitToDb(callable $getProjectDB): void
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
foreach ($this->stats as $stats) {
|
2025-03-31 03:55:06 +00:00
|
|
|
$project = $stats['project'] ?? new Document([]);
|
2025-03-27 08:25:48 +00:00
|
|
|
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
|
|
|
|
|
$receivedAt = $stats['receivedAt'] ?? null;
|
|
|
|
|
if ($numberOfKeys === 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
foreach ($stats['keys'] ?? [] as $key => $value) {
|
|
|
|
|
if ($value == 0) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
foreach ($this->periods as $period => $format) {
|
|
|
|
|
$time = null;
|
|
|
|
|
|
|
|
|
|
if ($period !== 'inf') {
|
|
|
|
|
$time = !empty($receivedAt) ? (new \DateTime($receivedAt))->format($format) : date($format, time());
|
|
|
|
|
}
|
|
|
|
|
$id = \md5("{$time}_{$period}_{$key}");
|
|
|
|
|
|
|
|
|
|
$document = new Document([
|
|
|
|
|
'$id' => $id,
|
|
|
|
|
'period' => $period,
|
|
|
|
|
'time' => $time,
|
|
|
|
|
'metric' => $key,
|
|
|
|
|
'value' => $value,
|
|
|
|
|
'region' => System::getEnv('_APP_REGION', 'default'),
|
|
|
|
|
]);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
$this->projects[$project->getInternalId()]['project'] = new Document([
|
|
|
|
|
'$id' => $project->getId(),
|
|
|
|
|
'$internalId' => $project->getInternalId(),
|
|
|
|
|
'database' => $project->getAttribute('database'),
|
|
|
|
|
]);
|
|
|
|
|
$this->projects[$project->getInternalId()]['stats'][] = $document;
|
|
|
|
|
|
|
|
|
|
$this->prepareForLogsDB($project, $document);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (Exception $e) {
|
|
|
|
|
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
foreach ($this->projects as $internalId => $projectStats) {
|
|
|
|
|
if (empty($internalId)) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
/** @var \Utopia\Database\Database $dbForProject */
|
|
|
|
|
$dbForProject = $getProjectDB($projectStats['project']);
|
|
|
|
|
Console::log('Processing batch with ' . count($projectStats['stats']) . ' stats');
|
|
|
|
|
$dbForProject->createOrUpdateDocumentsWithIncrease('stats', 'value', $projectStats['stats']);
|
|
|
|
|
Console::success('Batch successfully written to DB');
|
|
|
|
|
|
|
|
|
|
unset($this->projects[$internalId]);
|
|
|
|
|
} catch (Throwable $e) {
|
|
|
|
|
Console::error('Error processing stats: ' . $e->getMessage());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
$this->writeToLogsDB();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected function prepareForLogsDB(Document $project, Document $stat)
|
|
|
|
|
{
|
|
|
|
|
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (array_key_exists($stat->getAttribute('metric'), $this->skipBaseMetrics)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
foreach ($this->skipParentIdMetrics as $skipMetric) {
|
|
|
|
|
if (str_ends_with($stat->getAttribute('metric'), $skipMetric)) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
$documentClone = new Document($stat->getArrayCopy());
|
|
|
|
|
$documentClone->setAttribute('$tenant', (int) $project->getInternalId());
|
|
|
|
|
$this->statDocuments[] = $documentClone;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected function writeToLogsDB(): void
|
|
|
|
|
{
|
|
|
|
|
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
|
|
|
|
|
Console::log('Dual Writing is disabled. Skipping...');
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** @var \Utopia\Database\Database $dbForLogs*/
|
|
|
|
|
$dbForLogs = call_user_func($this->getLogsDB);
|
|
|
|
|
$dbForLogs
|
|
|
|
|
->setTenant(null)
|
|
|
|
|
->setTenantPerDocument(true);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Console::log('Processing batch with ' . count($this->statDocuments) . ' stats');
|
|
|
|
|
$dbForLogs->createOrUpdateDocumentsWithIncrease(
|
|
|
|
|
'stats',
|
|
|
|
|
'value',
|
|
|
|
|
$this->statDocuments
|
|
|
|
|
);
|
|
|
|
|
Console::success('Usage logs pushed to Logs DB');
|
|
|
|
|
} catch (Throwable $th) {
|
|
|
|
|
Console::error($th->getMessage());
|
|
|
|
|
}
|
|
|
|
|
$this->register->get('pools')->get('logs')->reclaim();
|
|
|
|
|
}
|
2025-01-30 06:24:10 +00:00
|
|
|
}
|