diff --git a/app/init.php b/app/init.php index b350cb9c5c..2620abe7e1 100644 --- a/app/init.php +++ b/app/init.php @@ -378,7 +378,7 @@ App::setResource('user', function($mode, $project, $console, $request, $response /** @var Appwrite\Database\Document $project */ /** @var Appwrite\Database\Database $consoleDB */ /** @var Appwrite\Database\Database $projectDB */ - /** @var bool $mode */ + /** @var string $mode */ Authorization::setDefaultStatus(true); diff --git a/app/realtime.php b/app/realtime.php index 8bd7e2d6e2..1e161a2adb 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -4,12 +4,15 @@ require_once __DIR__ . '/init.php'; use Appwrite\Database\Pool\PDOPool; use Appwrite\Database\Pool\RedisPool; +use Appwrite\Event\Event; use Appwrite\Network\Validator\Origin; use Appwrite\Realtime\Realtime; use Appwrite\Utopia\Response; -use Swoole\Process; use Swoole\Http\Request; use Swoole\Http\Response as SwooleResponse; +use Swoole\Process; +use Swoole\Table; +use Swoole\Timer; use Swoole\WebSocket\Frame; use Swoole\WebSocket\Server; use Utopia\App; @@ -38,7 +41,46 @@ $server->set([ $subscriptions = []; $connections = []; -$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$connections, &$register) { +$stats = new Table(4096, 1); +$stats->column('projectId', Table::TYPE_STRING, 64); +$stats->column('connections', Table::TYPE_INT); +$stats->column('messages', Table::TYPE_INT); +$stats->create(); + +/** + * Sends usage stats every 10 seconds. + */ +Timer::tick(10000, function () use (&$stats) { + /** @var Table $stats */ + foreach ($stats as $projectId => $value) { + if (empty($value['connections']) && empty($value['messages'])) { + continue; + } + + $connections = $value['connections']; + $messages = $value['messages']; + + $usage = new Event('v1-usage', 'UsageV1'); + $usage + ->setParam('projectId', $projectId) + ->setParam('realtimeConnections', $connections) + ->setParam('realtimeMessages', $messages) + ->setParam('networkRequestSize', 0) + ->setParam('networkResponseSize', 0); + + $stats->set($projectId, array( + 'projectId' => $projectId, + 'messages' => 0, + 'connections' => 0 + )); + + if (App::getEnv('_APP_USAGE_STATS', 'enabled') == 'enabled') { + $usage->trigger(); + } + } +}); + +$server->on('workerStart', function ($server, $workerId) use (&$subscriptions, &$register, &$stats) { Console::success('Worker ' . $workerId . ' started succefully'); $attempts = 0; @@ -63,7 +105,7 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, & Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$subscriptions) { + $redis->subscribe(['realtime'], function ($redis, $channel, $payload) use ($server, $workerId, &$subscriptions, &$stats) { /** * Supported Resources: * - Collection @@ -98,6 +140,9 @@ $server->on('workerStart', function ($server, $workerId) use (&$subscriptions, & $server->close($receiver); } } + if (($num = count($receivers)) > 0) { + $stats->incr($event['project'], 'messages', $num); + } }); } catch (\Throwable $th) { Console::error('Pub/sub error: ' . $th->getMessage()); @@ -124,7 +169,7 @@ $server->on('start', function (Server $server) { }); }); -$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register) { +$server->on('open', function (Server $server, Request $request) use (&$connections, &$subscriptions, &$register, &$stats) { $app = new App('UTC'); $connection = $request->fd; $request = new SwooleRequest($request); @@ -135,11 +180,11 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio $register->set('db', function () use (&$db) { return $db; }); - + $register->set('cache', function () use (&$redis) { // Register cache connection return $redis; }); - + Console::info("Connection open (user: {$connection}, worker: {$server->getWorkerId()})"); App::setResource('request', function () use ($request) { @@ -213,6 +258,8 @@ $server->on('open', function (Server $server, Request $request) use (&$connectio Realtime::subscribe($project->getId(), $connection, $roles, $subscriptions, $connections, $channels); $server->push($connection, json_encode($channels)); + + $stats->incr($project->getId(), 'connections'); } catch (\Throwable $th) { $response = [ 'code' => $th->getCode(), diff --git a/app/views/install/compose.phtml b/app/views/install/compose.phtml index 27dfbc4d00..c51a67f81c 100644 --- a/app/views/install/compose.phtml +++ b/app/views/install/compose.phtml @@ -141,6 +141,7 @@ services: - _APP_DB_SCHEMA - _APP_DB_USER - _APP_DB_PASS + - _APP_USAGE_STATS appwrite-worker-usage: image: appwrite/appwrite: diff --git a/app/workers/usage.php b/app/workers/usage.php index b09ecce27e..74363dccb5 100644 --- a/app/workers/usage.php +++ b/app/workers/usage.php @@ -25,12 +25,13 @@ class UsageV1 extends Worker { global $register; + /** @var \Domnikl\Statsd\Client $statsd */ $statsd = $register->get('statsd', true); $projectId = $this->args['projectId']; - $networkRequestSize = $this->args['networkRequestSize']; - $networkResponseSize = $this->args['networkResponseSize']; + $networkRequestSize = $this->args['networkRequestSize'] ?? 0; + $networkResponseSize = $this->args['networkResponseSize'] ?? 0; $storage = $this->args['storage'] ?? null; @@ -42,7 +43,10 @@ class UsageV1 extends Worker $functionExecutionTime = $this->args['functionExecutionTime'] ?? null; $functionStatus = $this->args['functionStatus'] ?? null; - $tags = ",project={$projectId},version=".App::getEnv('_APP_VERSION', 'UNKNOWN').''; + $realtimeConnections = $this->args['realtimeConnections'] ?? null; + $realtimeMessages = $this->args['realtimeMessages'] ?? null; + + $tags = ",project={$projectId},version=".App::getEnv('_APP_VERSION', 'UNKNOWN'); // the global namespace is prepended to every key (optional) $statsd->setNamespace('appwrite.usage'); @@ -53,10 +57,17 @@ class UsageV1 extends Worker if($functionExecution >= 1) { $statsd->increment('executions.all'.$tags.',functionId='.$functionId.',functionStatus='.$functionStatus); - var_dump($tags.',functionId='.$functionId.',functionStatus='.$functionStatus); $statsd->count('executions.time'.$tags.',functionId='.$functionId, $functionExecutionTime); } + if($realtimeConnections >= 1) { + $statsd->count('realtime.clients'.$tags, $realtimeConnections); + } + + if($realtimeMessages >= 1) { + $statsd->count('realtime.message'.$tags, $realtimeMessages); + } + $statsd->count('network.inbound'.$tags, $networkRequestSize); $statsd->count('network.outbound'.$tags, $networkResponseSize); $statsd->count('network.all'.$tags, $networkRequestSize + $networkResponseSize); diff --git a/docker-compose.yml b/docker-compose.yml index 9d9577a9f5..bbca1fcb79 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -161,6 +161,7 @@ services: - _APP_DB_SCHEMA - _APP_DB_USER - _APP_DB_PASS + - _APP_USAGE_STATS appwrite-worker-usage: entrypoint: worker-usage