diff --git a/bin/worker-usage-dump b/bin/worker-usage-dump new file mode 100644 index 0000000000..43ca87fcb3 --- /dev/null +++ b/bin/worker-usage-dump @@ -0,0 +1,3 @@ +#!/bin/sh + +php /usr/src/code/app/worker.php usage-dump $@ \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 395923681d..c4334bb45c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -677,6 +677,38 @@ services: - _APP_LOGGING_CONFIG - _APP_USAGE_AGGREGATION_INTERVAL + appwrite-worker-usage-dump: + entrypoint: worker-usage-dump + <<: *x-logging + container_name: appwrite-worker-usage-dump + image: appwrite-dev + networks: + - appwrite + volumes: + - ./app:/usr/src/code/app + - ./src:/usr/src/code/src + depends_on: + - redis + - mariadb + environment: + - _APP_ENV + - _APP_WORKER_PER_CORE + - _APP_OPENSSL_KEY_V1 + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_REDIS_HOST + - _APP_REDIS_PORT + - _APP_REDIS_USER + - _APP_REDIS_PASS + - _APP_USAGE_STATS + - _APP_LOGGING_PROVIDER + - _APP_LOGGING_CONFIG + - _APP_USAGE_AGGREGATION_INTERVAL + + appwrite-schedule: entrypoint: schedule <<: *x-logging diff --git a/src/Appwrite/Platform/Workers/Usage.php b/src/Appwrite/Platform/Workers/Usage.php index 3809d000f7..49f3344906 100644 --- a/src/Appwrite/Platform/Workers/Usage.php +++ b/src/Appwrite/Platform/Workers/Usage.php @@ -3,23 +3,23 @@ namespace Appwrite\Platform\Workers; use Exception; +use Utopia\App; use Utopia\CLI\Console; -use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Platform\Action; +use Appwrite\Event\UsageDump; use Utopia\Queue\Message; class Usage extends Action { - protected static array $stats = []; - protected array $periods = [ - '1h' => 'Y-m-d H:00', - '1d' => 'Y-m-d 00:00', - 'inf' => '0000-00-00 00:00' - ]; + private array $stats = []; + private int $lastTriggeredTime = 0; + private int $keys = 0; + private const INFINITY_PERIOD = '_inf_'; + private const KEYS_THRESHOLD = 10000; + - protected const INFINITY_PERIOD = '_inf_'; public static function getName(): string { return 'usage'; @@ -35,26 +35,31 @@ class Usage extends Action ->desc('Usage worker') ->inject('message') ->inject('getProjectDB') - ->callback(function (Message $message, callable $getProjectDB) { - $this->action($message, $getProjectDB); + ->inject('queueForUsageDump') + ->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) { + $this->action($message, $getProjectDB, $queueForUsageDump); }); + + $this->lastTriggeredTime = time(); } /** * @param Message $message * @param callable $getProjectDB + * @param UsageDump $queueForUsageDump * @return void * @throws \Utopia\Database\Exception * @throws Exception */ - public function action(Message $message, callable $getProjectDB): void + public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } + //Todo Figure out way to preserve keys when the container is being recreated @shimonewman - $payload = $message->getPayload() ?? []; + $aggregationInterval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20'); $project = new Document($payload['project'] ?? []); $projectId = $project->getInternalId(); foreach ($payload['reduce'] ?? [] as $document) { @@ -69,17 +74,35 @@ class Usage extends Action getProjectDB: $getProjectDB ); } - self::$stats[$projectId]['project'] = $project; + + $this->stats[$projectId]['project'] = $project; foreach ($payload['metrics'] ?? [] as $metric) { - if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) { - self::$stats[$projectId]['keys'][$metric['key']] = $metric['value']; + $this->keys++; + if (!isset($this->stats[$projectId]['keys'][$metric['key']])) { + $this->stats[$projectId]['keys'][$metric['key']] = $metric['value']; continue; } - self::$stats[$projectId]['keys'][$metric['key']] += $metric['value']; + + $this->stats[$projectId]['keys'][$metric['key']] += $metric['value']; + } + + // if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats) + if ( + $this->keys >= self::KEYS_THRESHOLD || + (time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0) + ) { + console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys'); + + $queueForUsageDump + ->setStats($this->stats) + ->trigger(); + + $this->stats = []; + $this->keys = 0; + $this->lastTriggeredTime = time(); } } - /** * On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files. * When we remove a parent document we need to deduct his children aggregation from the project scope. diff --git a/src/Appwrite/Platform/Workers/UsageDump.php b/src/Appwrite/Platform/Workers/UsageDump.php new file mode 100644 index 0000000000..f563578984 --- /dev/null +++ b/src/Appwrite/Platform/Workers/UsageDump.php @@ -0,0 +1,113 @@ + 'Y-m-d H:00', + '1d' => 'Y-m-d 00:00', + 'inf' => '0000-00-00 00:00' + ]; + + public static function getName(): string + { + return 'usage-dump'; + } + + /** + * @throws \Exception + */ + public function __construct() + { + + $this + ->inject('message') + ->inject('getProjectDB') + ->callback(function (Message $message, callable $getProjectDB) { + $this->action($message, $getProjectDB); + }) + ; + } + + /** + * @param Message $message + * @param callable $getProjectDB + * @return void + * @throws Exception + * @throws \Utopia\Database\Exception + */ + public function action(Message $message, callable $getProjectDB): void + { + + $payload = $message->getPayload() ?? []; + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + //Todo rename both usage workers @shimonewman + foreach ($payload['stats'] ?? [] as $stats) { + $project = new Document($stats['project'] ?? []); + $numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0; + + if ($numberOfKeys === 0) { + continue; + } + + console::log('[' . DateTime::now() . '] ProjectId [' . $project->getInternalId() . '] Database [' . $project['database'] . '] ' . $numberOfKeys . ' keys'); + + try { + $dbForProject = $getProjectDB($project); + foreach ($stats['keys'] ?? [] as $key => $value) { + if ($value == 0) { + continue; + } + + foreach ($this->periods as $period => $format) { + $time = 'inf' === $period ? null : date($format, time()); + $id = \md5("{$time}_{$period}_{$key}"); + + try { + $dbForProject->createDocument('stats_v2', new Document([ + '$id' => $id, + 'period' => $period, + 'time' => $time, + 'metric' => $key, + 'value' => $value, + 'region' => App::getEnv('_APP_REGION', 'default'), + ])); + } catch (Duplicate $th) { + if ($value < 0) { + $dbForProject->decreaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + abs($value) + ); + } else { + $dbForProject->increaseDocumentAttribute( + 'stats_v2', + $id, + 'value', + $value + ); + } + } + } + } + } catch (\Exception $e) { + console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + } + } + } +} diff --git a/src/Appwrite/Platform/Workers/UsageHook.php b/src/Appwrite/Platform/Workers/UsageHook.php deleted file mode 100644 index 4781b1e892..0000000000 --- a/src/Appwrite/Platform/Workers/UsageHook.php +++ /dev/null @@ -1,103 +0,0 @@ -setType(Action::TYPE_WORKER_START) - ->inject('register') - ->inject('getProjectDB') - ->callback(function ($register, callable $getProjectDB) { - $this->action($register, $getProjectDB); - }) - ; - } - - /** - * @param $register - * @param $getProjectDB - * @return void - */ - public function action($register, $getProjectDB): void - { - - $interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000'); - Timer::tick($interval, function () use ($register, $getProjectDB) { - - $offset = count(self::$stats); - $projects = array_slice(self::$stats, 0, $offset, true); - array_splice(self::$stats, 0, $offset); - foreach ($projects as $data) { - $numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0; - $projectInternalId = $data['project']->getInternalId(); - $database = $data['project']['database'] ?? ''; - - console::warning('Ticker started ' . DateTime::now()); - - if ($numberOfKeys === 0) { - continue; - } - - try { - $dbForProject = $getProjectDB($data['project']); - foreach ($data['keys'] ?? [] as $key => $value) { - if ($value == 0) { - continue; - } - - foreach ($this->periods as $period => $format) { - $time = 'inf' === $period ? null : date($format, time()); - $id = \md5("{$time}_{$period}_{$key}"); - - try { - $dbForProject->createDocument('stats_v2', new Document([ - '$id' => $id, - 'period' => $period, - 'time' => $time, - 'metric' => $key, - 'value' => $value, - 'region' => App::getEnv('_APP_REGION', 'default'), - ])); - } catch (Duplicate $th) { - if ($value < 0) { - $dbForProject->decreaseDocumentAttribute( - 'stats_v2', - $id, - 'value', - abs($value) - ); - } else { - $dbForProject->increaseDocumentAttribute( - 'stats_v2', - $id, - 'value', - $value - ); - } - } - } - } - } catch (\Exception $e) { - console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage()); - } - } - }); - } -}