usage updates

This commit is contained in:
shimon 2024-02-13 16:08:40 +02:00
parent abb1007385
commit 219b28e9bf
5 changed files with 188 additions and 120 deletions

3
bin/worker-usage-dump Normal file
View file

@ -0,0 +1,3 @@
#!/bin/sh
php /usr/src/code/app/worker.php usage-dump $@

View file

@ -677,6 +677,38 @@ services:
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-worker-usage-dump:
entrypoint: worker-usage-dump
<<: *x-logging
container_name: appwrite-worker-usage-dump
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_USAGE_STATS
- _APP_LOGGING_PROVIDER
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-schedule:
entrypoint: schedule
<<: *x-logging

View file

@ -3,23 +3,23 @@
namespace Appwrite\Platform\Workers;
use Exception;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Platform\Action;
use Appwrite\Event\UsageDump;
use Utopia\Queue\Message;
class Usage extends Action
{
protected static array $stats = [];
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
private array $stats = [];
private int $lastTriggeredTime = 0;
private int $keys = 0;
private const INFINITY_PERIOD = '_inf_';
private const KEYS_THRESHOLD = 10000;
protected const INFINITY_PERIOD = '_inf_';
public static function getName(): string
{
return 'usage';
@ -35,26 +35,31 @@ class Usage extends Action
->desc('Usage worker')
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
->inject('queueForUsageDump')
->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) {
$this->action($message, $getProjectDB, $queueForUsageDump);
});
$this->lastTriggeredTime = time();
}
/**
* @param Message $message
* @param callable $getProjectDB
* @param UsageDump $queueForUsageDump
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, callable $getProjectDB): void
public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
//Todo Figure out way to preserve keys when the container is being recreated @shimonewman
$payload = $message->getPayload() ?? [];
$aggregationInterval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
$project = new Document($payload['project'] ?? []);
$projectId = $project->getInternalId();
foreach ($payload['reduce'] ?? [] as $document) {
@ -69,17 +74,35 @@ class Usage extends Action
getProjectDB: $getProjectDB
);
}
self::$stats[$projectId]['project'] = $project;
$this->stats[$projectId]['project'] = $project;
foreach ($payload['metrics'] ?? [] as $metric) {
if (!isset(self::$stats[$projectId]['keys'][$metric['key']])) {
self::$stats[$projectId]['keys'][$metric['key']] = $metric['value'];
$this->keys++;
if (!isset($this->stats[$projectId]['keys'][$metric['key']])) {
$this->stats[$projectId]['keys'][$metric['key']] = $metric['value'];
continue;
}
self::$stats[$projectId]['keys'][$metric['key']] += $metric['value'];
$this->stats[$projectId]['keys'][$metric['key']] += $metric['value'];
}
// if keys crossed threshold or X time passed since the last send and there are some keys in the array ($this->stats)
if (
$this->keys >= self::KEYS_THRESHOLD ||
(time() - $this->lastTriggeredTime > $aggregationInterval && $this->keys > 0)
) {
console::warning('[' . DateTime::now() . '] Aggregated ' . $this->keys . ' keys');
$queueForUsageDump
->setStats($this->stats)
->trigger();
$this->stats = [];
$this->keys = 0;
$this->lastTriggeredTime = time();
}
}
/**
* On Documents that tied by relations like functions>deployments>build || documents>collection>database || buckets>files.
* When we remove a parent document we need to deduct his children aggregation from the project scope.

View file

@ -0,0 +1,113 @@
<?php
namespace Appwrite\Platform\Workers;
use Appwrite\Extend\Exception;
use Utopia\App;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Queue\Message;
class UsageDump extends Action
{
protected array $stats = [];
protected array $periods = [
'1h' => 'Y-m-d H:00',
'1d' => 'Y-m-d 00:00',
'inf' => '0000-00-00 00:00'
];
public static function getName(): string
{
return 'usage-dump';
}
/**
* @throws \Exception
*/
public function __construct()
{
$this
->inject('message')
->inject('getProjectDB')
->callback(function (Message $message, callable $getProjectDB) {
$this->action($message, $getProjectDB);
})
;
}
/**
* @param Message $message
* @param callable $getProjectDB
* @return void
* @throws Exception
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, callable $getProjectDB): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
//Todo rename both usage workers @shimonewman
foreach ($payload['stats'] ?? [] as $stats) {
$project = new Document($stats['project'] ?? []);
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
if ($numberOfKeys === 0) {
continue;
}
console::log('[' . DateTime::now() . '] ProjectId [' . $project->getInternalId() . '] Database [' . $project['database'] . '] ' . $numberOfKeys . ' keys');
try {
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats_v2', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
}
}

View file

@ -1,103 +0,0 @@
<?php
namespace Appwrite\Platform\Workers;
use Utopia\App;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Swoole\Timer;
use Utopia\Database\DateTime;
class UsageHook extends Usage
{
public static function getName(): string
{
return 'usageHook';
}
public function __construct()
{
$this
->setType(Action::TYPE_WORKER_START)
->inject('register')
->inject('getProjectDB')
->callback(function ($register, callable $getProjectDB) {
$this->action($register, $getProjectDB);
})
;
}
/**
* @param $register
* @param $getProjectDB
* @return void
*/
public function action($register, $getProjectDB): void
{
$interval = (int) App::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '60000');
Timer::tick($interval, function () use ($register, $getProjectDB) {
$offset = count(self::$stats);
$projects = array_slice(self::$stats, 0, $offset, true);
array_splice(self::$stats, 0, $offset);
foreach ($projects as $data) {
$numberOfKeys = !empty($data['keys']) ? count($data['keys']) : 0;
$projectInternalId = $data['project']->getInternalId();
$database = $data['project']['database'] ?? '';
console::warning('Ticker started ' . DateTime::now());
if ($numberOfKeys === 0) {
continue;
}
try {
$dbForProject = $getProjectDB($data['project']);
foreach ($data['keys'] ?? [] as $key => $value) {
if ($value == 0) {
continue;
}
foreach ($this->periods as $period => $format) {
$time = 'inf' === $period ? null : date($format, time());
$id = \md5("{$time}_{$period}_{$key}");
try {
$dbForProject->createDocument('stats_v2', new Document([
'$id' => $id,
'period' => $period,
'time' => $time,
'metric' => $key,
'value' => $value,
'region' => App::getEnv('_APP_REGION', 'default'),
]));
} catch (Duplicate $th) {
if ($value < 0) {
$dbForProject->decreaseDocumentAttribute(
'stats_v2',
$id,
'value',
abs($value)
);
} else {
$dbForProject->increaseDocumentAttribute(
'stats_v2',
$id,
'value',
$value
);
}
}
}
}
} catch (\Exception $e) {
console::error(DateTime::now() . ' ' . $projectInternalId . ' ' . $e->getMessage());
}
}
});
}
}