appwrite/app/realtime.php

1411 lines
55 KiB
PHP
Raw Permalink Normal View History

2020-10-16 07:31:09 +00:00
<?php
2026-04-16 07:05:50 +00:00
use Appwrite\Event\Event as QueueEvent;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
2026-04-16 07:05:50 +00:00
use Appwrite\Event\Realtime as QueueRealtime;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
2021-06-28 14:34:28 +00:00
use Appwrite\Messaging\Adapter\Realtime;
2021-06-24 12:22:32 +00:00
use Appwrite\Network\Validator\Origin;
use Appwrite\Presences\State as PresenceState;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
2026-05-04 05:48:22 +00:00
use Appwrite\Realtime\Message\Dispatcher as MessageDispatcher;
use Appwrite\Realtime\Message\Handlers\Authentication as AuthenticationHandler;
use Appwrite\Realtime\Message\Handlers\Ping as PingHandler;
use Appwrite\Realtime\Message\Handlers\Presence as PresenceHandler;
use Appwrite\Realtime\Message\Handlers\Subscribe as SubscribeHandler;
use Appwrite\Realtime\Message\Handlers\Unsubscribe as UnsubscribeHandler;
2025-11-04 05:25:20 +00:00
use Appwrite\Utopia\Database\Documents\User;
2023-10-24 12:32:22 +00:00
use Appwrite\Utopia\Request;
2021-08-27 09:20:49 +00:00
use Appwrite\Utopia\Response;
use Swoole\Coroutine;
2021-06-24 12:22:32 +00:00
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Runtime;
use Swoole\Table;
use Swoole\Timer;
use Utopia\Abuse\Abuse;
2024-12-20 14:44:50 +00:00
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Cache\Adapter\Pool as CachePool;
2024-03-06 17:34:21 +00:00
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Adapter\Pool as DatabasePool;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Database;
2022-07-14 13:12:44 +00:00
use Utopia\Database\DateTime;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Document;
use Utopia\Database\Exception\Authorization as AuthorizationException;
use Utopia\Database\Exception\Query as QueryException;
use Utopia\Database\Exception\Timeout as TimeoutException;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\DI\Container;
2024-05-06 05:33:36 +00:00
use Utopia\DSN\DSN;
2024-03-06 17:34:21 +00:00
use Utopia\Logger\Log;
use Utopia\Pools\Group;
2026-04-16 07:05:50 +00:00
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Queue;
use Utopia\Registry\Registry;
2026-04-22 10:12:17 +00:00
use Utopia\Span\Span;
2024-04-01 11:08:46 +00:00
use Utopia\System\System;
2024-11-12 09:45:00 +00:00
use Utopia\Telemetry\Adapter\None as NoTelemetry;
2021-06-24 12:22:32 +00:00
use Utopia\WebSocket\Adapter;
2024-03-06 17:34:21 +00:00
use Utopia\WebSocket\Server;
2021-02-24 17:12:38 +00:00
require_once __DIR__ . '/init.php';
2026-04-27 13:03:47 +00:00
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
2026-04-27 13:03:47 +00:00
require_once __DIR__ . '/init/span.php';
}
2020-10-16 07:31:09 +00:00
2026-04-09 05:22:31 +00:00
/** @var Registry $register */
$register = $GLOBALS['register'] ?? throw new \RuntimeException('Registry not initialized');
$registerConnectionResources ??= require __DIR__ . '/init/realtime/connection.php';
2026-03-23 04:36:58 +00:00
2021-06-24 12:22:32 +00:00
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
2020-10-18 11:51:16 +00:00
// Log uncaught exceptions in one line instead of relying on Swoole's full backtrace dump
set_exception_handler(function (\Throwable $e) {
Console::error(sprintf(
'Realtime uncaught exception: %s in %s:%d',
$e->getMessage(),
$e->getFile(),
$e->getLine()
));
});
2026-05-14 07:16:58 +00:00
global $container;
if (!$container->has('pools')) {
$container->set('pools', function ($register) {
return $register->get('pools');
}, ['register']);
}
if (!$container->has('publisherForUsage')) {
$container->set('publisherForUsage', function (Group $pools): UsagePublisher {
$statsUsageConnection = System::getEnv('_APP_CONNECTIONS_QUEUE_STATS_USAGE', '');
$publisherPoolName = 'publisher';
if (!empty($statsUsageConnection)) {
try {
$pools->get('publisher_' . $statsUsageConnection);
$publisherPoolName = 'publisher_' . $statsUsageConnection;
} catch (Throwable) {
// Fallback to default publisher pool when custom one is unavailable.
}
}
return new UsagePublisher(
new BrokerPool(publisher: $pools->get($publisherPoolName)),
new Queue(System::getEnv(
'_APP_STATS_USAGE_QUEUE_NAME',
QueueEvent::STATS_USAGE_QUEUE_NAME
))
);
}, ['pools']);
}
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if (!function_exists('getConsoleDB')) {
2024-03-20 12:47:20 +00:00
function getConsoleDB(): Database
{
$ctx = Coroutine::getContext();
2022-10-19 08:35:30 +00:00
if (isset($ctx['dbForPlatform'])) {
return $ctx['dbForPlatform'];
}
2022-10-17 17:26:21 +00:00
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
2022-10-17 17:26:21 +00:00
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, getCache());
2024-03-20 12:47:20 +00:00
$database
->setDatabase(APP_DATABASE)
2024-03-20 12:47:20 +00:00
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', '_console');
2025-11-05 05:31:17 +00:00
$database->setDocumentType('users', User::class);
return $ctx['dbForPlatform'] = $database;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if (!function_exists('getProjectDB')) {
2024-03-20 12:47:20 +00:00
function getProjectDB(Document $project): Database
{
$ctx = Coroutine::getContext();
2022-10-17 17:26:21 +00:00
if (!isset($ctx['dbForProject'])) {
$ctx['dbForProject'] = [];
}
if (isset($ctx['dbForProject'][$project->getSequence()])) {
return $ctx['dbForProject'][$project->getSequence()];
}
global $register;
/** @var Group $pools */
2024-03-20 12:47:20 +00:00
$pools = $register->get('pools');
2022-10-17 17:26:21 +00:00
2024-03-20 12:47:20 +00:00
if ($project->isEmpty() || $project->getId() === 'console') {
return getConsoleDB();
}
2022-10-17 17:26:21 +00:00
2024-05-06 06:13:41 +00:00
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
2024-05-07 02:07:04 +00:00
// TODO: Temporary until all projects are using shared tables
2024-05-06 06:13:41 +00:00
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
2024-05-06 05:33:36 +00:00
$adapter = new DatabasePool($pools->get($dsn->getHost()));
2024-05-06 05:33:36 +00:00
$database = new Database($adapter, getCache());
2022-10-17 17:26:21 +00:00
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
2026-04-29 12:41:56 +00:00
$collections = Config::getParam('collections', []);
$projectCollections = $collections['projects'] ?? [];
$projectsGlobalCollections = array_keys($projectCollections);
$projectsGlobalCollections[] = 'audit';
$database
->setSharedTables(true)
2026-04-29 12:41:56 +00:00
->setGlobalCollections($projectsGlobalCollections)
->setTenant($project->getSequence())
2024-05-06 05:33:36 +00:00
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
2025-05-26 05:42:11 +00:00
->setNamespace('_' . $project->getSequence());
}
2024-03-07 13:52:13 +00:00
$database
->setDatabase(APP_DATABASE)
2024-03-20 12:47:20 +00:00
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
2022-10-17 17:26:21 +00:00
2025-11-05 05:31:17 +00:00
$database->setDocumentType('users', User::class);
return $ctx['dbForProject'][$project->getSequence()] = $database;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 13:14:23 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if (!function_exists('getCache')) {
2024-03-20 13:14:23 +00:00
function getCache(): Cache
{
$ctx = Coroutine::getContext();
2022-10-17 17:26:21 +00:00
if (isset($ctx['cache'])) {
return $ctx['cache'];
}
global $register;
$pools = $register->get('pools'); /** @var Group $pools */
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
$list = Config::getParam('pools-cache', []);
$adapters = [];
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
foreach ($list as $value) {
$adapters[] = new CachePool($pools->get($value));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
return $ctx['cache'] = new Cache(new Sharding($adapters));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-12-20 14:44:50 +00:00
// Allows overriding
if (!function_exists('getRedis')) {
function getRedis(): \Redis
{
$ctx = Coroutine::getContext();
if (isset($ctx['redis'])) {
return $ctx['redis'];
}
2024-12-20 14:44:50 +00:00
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
$port = System::getEnv('_APP_REDIS_PORT', 6379);
$pass = System::getEnv('_APP_REDIS_PASS', '');
$redis = new \Redis();
@$redis->pconnect($host, (int)$port);
if ($pass) {
$redis->auth($pass);
}
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
return $ctx['redis'] = $redis;
2024-12-20 14:44:50 +00:00
}
}
if (!function_exists('getTimelimit')) {
function getTimelimit(string $key = "", int $limit = 0, int $seconds = 1): TimeLimitRedis
2024-12-20 14:44:50 +00:00
{
$ctx = Coroutine::getContext();
if (isset($ctx['timelimit'])) {
return $ctx['timelimit'];
}
return $ctx['timelimit'] = new TimeLimitRedis($key, $limit, $seconds, getRedis());
2024-12-20 14:44:50 +00:00
}
}
2024-09-09 08:52:37 +00:00
if (!function_exists('getRealtime')) {
function getRealtime(): Realtime
{
$ctx = Coroutine::getContext();
if (isset($ctx['realtime'])) {
return $ctx['realtime'];
}
return $ctx['realtime'] = new Realtime();
2024-09-09 08:52:37 +00:00
}
}
2026-04-16 07:05:50 +00:00
2024-11-12 09:45:00 +00:00
if (!function_exists('getTelemetry')) {
function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
{
$ctx = Coroutine::getContext();
if (isset($ctx['telemetry'])) {
return $ctx['telemetry'];
}
return $ctx['telemetry'] = new NoTelemetry();
2024-11-12 09:45:00 +00:00
}
}
if (!function_exists('getQueueForEvents')) {
function getQueueForEvents(): QueueEvent
2026-04-16 07:05:50 +00:00
{
$ctx = Coroutine::getContext();
2026-04-16 07:05:50 +00:00
if (!isset($ctx['queueForEvents'])) {
global $register;
/** @var Group $pools */
$pools = $register->get('pools');
$ctx['queueForEvents'] = new QueueEvent(new BrokerPool(
publisher: $pools->get('publisher')
));
}
2026-04-16 07:05:50 +00:00
return $ctx['queueForEvents'];
}
}
if (!function_exists('getQueueForRealtime')) {
function getQueueForRealtime(): QueueRealtime
{
$ctx = Coroutine::getContext();
if (!isset($ctx['queueForRealtime'])) {
$ctx['queueForRealtime'] = new QueueRealtime();
}
return $ctx['queueForRealtime'];
2026-04-16 07:05:50 +00:00
}
}
2026-04-16 07:05:50 +00:00
if (!function_exists('triggerStats')) {
function triggerStats(array $event, string $projectId): void
{
}
}
if (!function_exists('checkForProjectUsage')) {
function checkForProjectUsage(Document $project): void
{
}
}
2024-09-09 08:52:37 +00:00
$realtime = getRealtime();
2026-04-17 06:31:56 +00:00
$presenceState = new PresenceState();
2021-06-24 12:22:32 +00:00
2026-05-04 05:48:22 +00:00
$messageDispatcher = (new MessageDispatcher())
->addHandler(new PingHandler())
->addHandler(new AuthenticationHandler())
->addHandler(new SubscribeHandler())
->addHandler(new UnsubscribeHandler())
->addHandler(new PresenceHandler());
2026-05-04 05:48:22 +00:00
2021-08-18 15:44:11 +00:00
/**
* Table for statistics across all workers.
*/
2021-06-24 12:22:32 +00:00
$stats = new Table(4096, 1);
$stats->column('projectId', Table::TYPE_STRING, 64);
2021-08-19 10:14:19 +00:00
$stats->column('teamId', Table::TYPE_STRING, 64);
2021-06-24 12:22:32 +00:00
$stats->column('connections', Table::TYPE_INT);
$stats->column('connectionsTotal', Table::TYPE_INT);
$stats->column('messages', Table::TYPE_INT);
$stats->create();
2021-07-01 10:31:48 +00:00
$containerId = uniqid();
2021-10-07 15:35:17 +00:00
$statsDocument = null;
$workerNumber = intval(System::getEnv('_APP_WORKERS_NUM', 0))
?: intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
2021-07-01 10:31:48 +00:00
2024-04-01 11:02:47 +00:00
$adapter = new Adapter\Swoole(port: System::getEnv('PORT', 80));
$adapter
->setPackageMaxLength(64000) // Default maximum Package Size (64kb)
->setWorkerNumber($workerNumber);
2021-06-24 12:22:32 +00:00
2021-08-18 15:44:11 +00:00
$server = new Server($adapter);
2021-06-28 14:34:28 +00:00
// Allows overriding
if (!function_exists('logError')) {
function logError(Throwable $error, string $action, array $tags = [], ?Document $project = null, ?Document $user = null, ?Authorization $authorization = null): void
{
global $register;
$logger = $register->get('realtimeLogger');
2021-06-24 12:22:32 +00:00
// Match HTTP semantics (app/controllers/general.php): AppwriteException uses its
// configured publish flag; everything else publishes only for code 0 or >= 500.
// Without this, expected client errors (e.g. Utopia DB Authorization) hit Sentry.
2026-05-18 16:23:06 +00:00
if ($error instanceof AppwriteException) {
$publish = $error->isPublishable();
} else {
$publish = $error->getCode() === 0 || $error->getCode() >= 500;
}
if ($logger && $publish) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace("realtime");
$log->setServer(System::getEnv('_APP_LOGGING_SERVICE_IDENTIFIER', \gethostname()));
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addTag('projectId', $project?->getId() ?: 'n/a');
$log->addTag('userId', $user?->getId() ?: 'n/a');
foreach ($tags as $key => $value) {
$log->addTag($key, $value ?: 'n/a');
}
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', $authorization?->getRoles() ?? []);
$log->setAction($action);
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
try {
$responseCode = $logger->addLog($log);
Console::info('Error log pushed with status code: ' . $responseCode);
} catch (Throwable $th) {
Console::error('Error pushing log: ' . $th->getMessage());
}
}
2021-12-27 10:35:51 +00:00
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
}
}
$server->error(logError(...));
2021-06-28 14:34:28 +00:00
2026-04-01 17:31:11 +00:00
$server->onStart(function () use ($stats, $containerId, &$statsDocument) {
2022-01-03 14:06:40 +00:00
sleep(5); // wait for the initial database schema to be ready
2022-08-10 05:42:20 +00:00
Console::success('Server started successfully');
2021-08-17 09:08:18 +00:00
2021-07-01 10:31:48 +00:00
/**
2021-08-18 15:44:11 +00:00
* Create document for this worker to share stats across Containers.
2021-07-01 10:31:48 +00:00
*/
2026-04-01 17:31:11 +00:00
go(function () use ($containerId, &$statsDocument) {
2022-06-21 16:00:23 +00:00
$attempts = 0;
2022-10-17 17:26:21 +00:00
$database = getConsoleDB();
2022-10-19 08:35:30 +00:00
2022-06-21 16:00:23 +00:00
do {
try {
$attempts++;
$document = new Document([
2022-08-14 14:22:38 +00:00
'$id' => ID::unique(),
2022-08-14 10:33:36 +00:00
'$collection' => ID::custom('realtime'),
'$permissions' => [],
2022-06-21 16:00:23 +00:00
'container' => $containerId,
2022-07-14 13:12:44 +00:00
'timestamp' => DateTime::now(),
2022-06-21 16:00:23 +00:00
'value' => '{}'
]);
$statsDocument = $database->getAuthorization()->skip(fn () => $database->createDocument('realtime', $document));
2022-06-21 16:00:23 +00:00
break;
2023-10-24 12:32:22 +00:00
} catch (Throwable) {
2022-06-21 16:00:23 +00:00
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
2021-07-01 10:31:48 +00:00
});
/**
* Save current connections to the Database every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
2026-04-01 17:31:11 +00:00
Timer::tick(5000, function () use ($stats, &$statsDocument) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
}
if (empty($payload) || empty($statsDocument)) {
return;
}
try {
$database = getConsoleDB();
$statsDocument
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
$database->getAuthorization()->skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), new Document([
'timestamp' => $statsDocument->getAttribute('timestamp'),
'value' => $statsDocument->getAttribute('value')
])));
} catch (Throwable $th) {
logError($th, "updateWorkerDocument");
}
});
}
2021-06-24 12:22:32 +00:00
});
$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) {
Console::success('Worker ' . $workerId . ' started successfully');
2021-06-24 12:22:32 +00:00
2024-11-12 09:45:00 +00:00
$telemetry = getTelemetry($workerId);
$realtimeDelayBuckets = [100, 250, 500, 750, 1000, 1500, 2000, 3000, 5000, 7500, 10000, 15000, 30000];
$workerTelemetryAttributes = ['workerId' => (string) $workerId];
2024-11-12 09:45:00 +00:00
$register->set('telemetry', fn () => $telemetry);
$register->set('telemetry.workerAttributes', fn () => $workerTelemetryAttributes);
$register->set('telemetry.workerCounter', fn () => $telemetry->createUpDownCounter('realtime.server.active_workers'));
$register->set('telemetry.workerClientCounter', fn () => $telemetry->createUpDownCounter('realtime.server.worker_clients'));
$register->set('telemetry.workerSubscriptionCounter', fn () => $telemetry->createUpDownCounter('realtime.server.worker_subscriptions'));
2024-11-12 09:45:00 +00:00
$register->set('telemetry.connectionCounter', fn () => $telemetry->createUpDownCounter('realtime.server.open_connections'));
$register->set('telemetry.connectionCreatedCounter', fn () => $telemetry->createCounter('realtime.server.connection.created'));
$register->set('telemetry.messageSentCounter', fn () => $telemetry->createCounter('realtime.server.message.sent'));
2026-04-17 12:32:04 +00:00
$register->set('telemetry.deliveryDelayHistogram', fn () => $telemetry->createHistogram(
name: 'realtime.server.delivery_delay',
unit: 'ms',
advisory: ['ExplicitBucketBoundaries' => $realtimeDelayBuckets],
));
$register->set('telemetry.arrivalDelayHistogram', fn () => $telemetry->createHistogram(
name: 'realtime.server.arrival_delay',
unit: 'ms',
advisory: ['ExplicitBucketBoundaries' => $realtimeDelayBuckets],
2026-04-17 12:32:04 +00:00
));
$register->get('telemetry.workerCounter')->add(1);
2024-11-12 09:45:00 +00:00
2021-06-24 12:22:32 +00:00
$attempts = 0;
$start = time();
2026-04-01 17:31:11 +00:00
Timer::tick(5000, function () use ($server, $realtime, $stats) {
2021-08-17 11:18:32 +00:00
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
// TODO: Remove this if check once it doesn't cause issues for cloud
if (System::getEnv('_APP_EDITION', 'self-hosted') === 'self-hosted') {
if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) {
$database = getConsoleDB();
$payload = [];
$list = $database->getAuthorization()->skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
/**
* Aggregate stats across containers.
*/
foreach ($list as $document) {
foreach (json_decode($document->getAttribute('value')) as $projectId => $value) {
if (array_key_exists($projectId, $payload)) {
$payload[$projectId] += $value;
} else {
$payload[$projectId] = $value;
}
}
}
foreach ($stats as $projectId => $value) {
if (!array_key_exists($projectId, $payload)) {
continue;
}
$event = [
'project' => 'console',
'roles' => ['team:' . $stats->get($projectId, 'teamId')],
'data' => [
'events' => ['stats.connections'],
'channels' => ['project'],
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => [
$projectId => $payload[$projectId]
]
]
];
$server->send(array_keys($realtime->getSubscribers($event)), json_encode([
'type' => 'event',
'data' => $event['data']
]));
}
}
}
2021-08-17 11:18:32 +00:00
/**
* Sending test message for SDK E2E tests every 5 seconds.
*/
2022-08-17 03:11:49 +00:00
if ($realtime->hasSubscriber('console', Role::guests()->toString(), 'tests')) {
2021-08-17 11:18:32 +00:00
$payload = ['response' => 'WS:/v1/realtime:passed'];
$event = [
'project' => 'console',
2022-08-17 03:11:49 +00:00
'roles' => [Role::guests()->toString()],
2021-08-17 11:18:32 +00:00
'data' => [
2022-05-17 12:09:04 +00:00
'events' => ['test.event'],
2021-08-27 11:26:26 +00:00
'channels' => ['tests'],
2022-10-18 17:10:50 +00:00
'timestamp' => DateTime::formatTz(DateTime::now()),
2021-08-27 11:26:26 +00:00
'payload' => $payload
2021-08-17 11:18:32 +00:00
]
];
2026-02-05 05:47:54 +00:00
$subscribers = $realtime->getSubscribers($event);
2026-02-03 06:13:23 +00:00
2026-02-05 07:48:40 +00:00
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ($subscribers as $id => $matched) {
2026-02-05 07:48:40 +00:00
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
foreach ($groups as $group) {
$data = $event['data'];
2026-02-05 07:48:40 +00:00
$data['subscriptions'] = $group['subscriptions'];
2026-02-05 07:48:40 +00:00
$server->send($group['ids'], json_encode([
'type' => 'event',
'data' => $data
]));
}
2021-06-24 12:22:32 +00:00
}
});
while ($attempts < 300) {
try {
if ($attempts > 0) {
Console::error('Pub/sub connection lost (lasted ' . (time() - $start) . ' seconds, worker: ' . $workerId . ').
Attempting restart in 5 seconds (attempt #' . $attempts . ')');
sleep(5); // 5 sec delay between connection attempts
}
$start = time();
$pubsub = new PubSubPool($register->get('pools')->get('pubsub'));
2024-10-31 12:12:03 +00:00
if ($pubsub->ping(true)) {
2021-06-24 12:22:32 +00:00
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
2021-06-24 12:22:32 +00:00
$event = json_decode($payload, true);
$eventTimestamp = $event['data']['timestamp'] ?? null;
if (\is_string($eventTimestamp)) {
try {
2026-04-20 06:43:53 +00:00
$eventDate = new \DateTimeImmutable($eventTimestamp, new \DateTimeZone('UTC'));
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$eventTimestampMs = (float) $eventDate->format('U.u') * 1000;
$nowTimestampMs = (float) $now->format('U.u') * 1000;
$arrivalDelayMs = (int) \max(0, $nowTimestampMs - $eventTimestampMs);
$register->get('telemetry.arrivalDelayHistogram')->record($arrivalDelayMs);
} catch (\Throwable) {
// Ignore invalid timestamp payloads.
}
}
2021-06-24 12:22:32 +00:00
if ($event['permissionsChanged'] && isset($event['userId'])) {
$projectId = $event['project'];
2021-06-24 12:22:32 +00:00
$userId = $event['userId'];
2021-06-28 14:34:28 +00:00
if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) {
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$subscriptionsBefore = \count($realtime->getSubscriptionMetadata($connection));
2022-10-17 17:26:21 +00:00
$consoleDatabase = getConsoleDB();
$project = $consoleDatabase->getAuthorization()->skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
2022-10-17 17:26:21 +00:00
$database = getProjectDB($project);
/** @var User $user */
2022-05-09 07:35:55 +00:00
$user = $database->getDocument('users', $userId);
$roles = $user->getRoles($database->getAuthorization());
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$previousUserId = $realtime->connections[$connection]['userId'] ?? '';
2026-02-03 06:13:23 +00:00
2026-02-05 05:47:54 +00:00
$meta = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
2026-02-05 05:47:54 +00:00
foreach ($meta as $subscriptionId => $subscription) {
$queries = Query::parseQueries($subscription['queries'] ?? []);
$channels = Realtime::rebindAccountChannels(
$subscription['channels'] ?? [],
$previousUserId,
$userId
);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$channels,
2026-04-27 12:24:48 +00:00
$queries,
$userId
);
}
// Restore authorization after subscribe
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
$subscriptionsAfter = \count($realtime->getSubscriptionMetadata($connection));
$subscriptionDelta = $subscriptionsAfter - $subscriptionsBefore;
if ($subscriptionDelta !== 0) {
$register->get('telemetry.workerSubscriptionCounter')->add($subscriptionDelta, $register->get('telemetry.workerAttributes'));
}
2022-05-09 07:35:55 +00:00
}
2021-06-24 12:22:32 +00:00
}
// Strip deleted presences from in-memory connection state so onClose doesn't
// re-fire delete events for rows already removed via HTTP DELETE.
$deletedPresenceId = Realtime::extractDeletedPresenceId($event);
if ($deletedPresenceId !== null) {
$realtime->removePresenceFromConnections(
(string) ($event['project'] ?? ''),
$deletedPresenceId,
);
}
2026-02-05 05:47:54 +00:00
$receivers = $realtime->getSubscribers($event);
2026-04-09 05:22:31 +00:00
if (System::getEnv('_APP_ENV', 'production') === 'development' && !empty($receivers)) {
2025-12-24 13:26:55 +00:00
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
2026-02-05 05:47:54 +00:00
Console::log("[Debug][Worker {$workerId}] Connection IDs: " . json_encode(array_keys($receivers)));
Console::log("[Debug][Worker {$workerId}] Matched: " . json_encode(array_values($receivers)));
2025-12-24 13:26:55 +00:00
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
2021-06-24 12:22:32 +00:00
2026-02-05 07:48:40 +00:00
// Group connections by matched subscription IDs for batch sending
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ($receivers as $id => $matched) {
2026-02-05 07:48:40 +00:00
$key = implode(',', array_keys($matched));
$groups[$key]['ids'][] = $id;
$groups[$key]['subscriptions'] = array_keys($matched);
}
$total = 0;
$outboundBytes = 0;
2026-02-05 07:48:40 +00:00
foreach ($groups as $group) {
$data = $event['data'];
2026-02-05 07:48:40 +00:00
$data['subscriptions'] = $group['subscriptions'];
2026-02-05 05:47:54 +00:00
$payloadJson = json_encode([
2026-02-05 05:47:54 +00:00
'type' => 'event',
'data' => $data
]);
$server->send($group['ids'], $payloadJson);
$count = count($group['ids']);
$total += $count;
$outboundBytes += strlen($payloadJson) * $count;
}
2021-06-24 12:22:32 +00:00
2026-02-05 05:47:54 +00:00
if ($total > 0) {
$register->get('telemetry.messageSentCounter')->add($total);
$stats->incr($event['project'], 'messages', $total);
2026-04-17 08:38:42 +00:00
$updatedAt = $event['data']['payload']['$updatedAt'] ?? null;
if (\is_string($updatedAt)) {
try {
2026-04-20 06:43:53 +00:00
$updatedAtDate = new \DateTimeImmutable($updatedAt, new \DateTimeZone('UTC'));
2026-04-17 08:38:42 +00:00
$now = new \DateTimeImmutable('now', new \DateTimeZone('UTC'));
$updatedAtTimestampMs = (float) $updatedAtDate->format('U.u') * 1000;
$nowTimestampMs = (float) $now->format('U.u') * 1000;
$delayMs = (int) \max(0, $nowTimestampMs - $updatedAtTimestampMs);
$register->get('telemetry.deliveryDelayHistogram')->record($delayMs);
} catch (\Throwable) {
// Ignore invalid timestamp payloads.
}
}
$projectId = $event['project'] ?? null;
if (!empty($projectId)) {
$metrics = [
METRIC_REALTIME_CONNECTIONS_MESSAGES_SENT => $total,
];
if ($outboundBytes > 0) {
$metrics[METRIC_REALTIME_OUTBOUND] = $outboundBytes;
}
triggerStats($metrics, $projectId);
}
2021-06-24 12:22:32 +00:00
}
});
2023-10-24 12:32:22 +00:00
} catch (Throwable $th) {
logError($th, "pubSubConnection");
2021-06-24 12:22:32 +00:00
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
2022-04-04 06:30:07 +00:00
sleep(DATABASE_RECONNECT_SLEEP);
2021-06-24 12:22:32 +00:00
continue;
}
}
Console::error('Failed to restart pub/sub...');
});
$server->onWorkerStop(function (int $workerId) use ($register) {
Console::warning('Worker ' . $workerId . ' stopping');
try {
$register->get('telemetry.workerCounter')->add(-1);
} catch (\Throwable $th) {
Console::error('Realtime onWorkerStop telemetry error: ' . $th->getMessage());
}
});
2026-04-10 03:55:00 +00:00
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $registerConnectionResources) {
global $container;
2021-06-24 12:22:32 +00:00
$request = new Request($request);
2021-08-27 09:20:49 +00:00
$response = new Response(new SwooleResponse());
2021-06-24 12:22:32 +00:00
2021-06-29 16:22:10 +00:00
Console::info("Connection open (user: {$connection})");
2021-06-24 12:22:32 +00:00
$connectionContainer = new Container($container);
2026-03-19 17:56:49 +00:00
$connectionContainer->set('request', fn () => $request);
2026-04-10 03:55:00 +00:00
$registerConnectionResources($connectionContainer);
2021-06-24 12:22:32 +00:00
$project = null;
$logUser = null;
$authorization = null;
$rawSize = $request->getSize();
$channelCount = 0;
$subscriptionCount = 0;
$outboundBytes = 0;
$responseCode = 200;
$subscriptionMode = 'message';
$success = false;
Span::init('realtime.open');
Span::add('realtime.connection.id', $connection);
Span::add('realtime.inbound_bytes', $rawSize);
if (!empty($request->getOrigin())) {
Span::add('realtime.origin', $request->getOrigin());
}
2021-06-24 12:22:32 +00:00
try {
2023-10-24 12:32:22 +00:00
/** @var Document $project */
2026-04-09 05:22:31 +00:00
$project = $connectionContainer->get('project');
$authorization = $connectionContainer->get('authorization');
2021-06-24 12:22:32 +00:00
/*
* Project Check
*/
if (empty($project->getId())) {
2023-10-24 12:32:22 +00:00
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID');
2021-06-24 12:22:32 +00:00
}
2026-04-09 05:22:31 +00:00
$timelimit = $connectionContainer->get('timelimit');
$user = $connectionContainer->get('user'); /** @var User $user */
$logUser = $user;
$apis = $project->getAttribute('apis', []);
// Websocket is what to check, but realtime is checked too for backwards compatibility
$websocketEnabled = $apis['websocket'] ?? $apis['realtime'] ?? true;
2024-03-04 22:12:54 +00:00
if (
!$websocketEnabled
&& !($user->isPrivileged($authorization->getRoles()) || $user->isKey($authorization->getRoles()))
2024-03-04 22:12:54 +00:00
) {
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
}
2026-02-26 10:37:26 +00:00
$projectRegion = $project->getAttribute('region', '');
$currentRegion = System::getEnv('_APP_REGION', 'default');
2026-02-26 10:37:26 +00:00
if (!empty($projectRegion) && $projectRegion !== $currentRegion) {
throw new AppwriteException(AppwriteException::GENERAL_ACCESS_FORBIDDEN, 'Project is not accessible in this region. Please make sure you are using the correct endpoint');
2026-02-26 10:37:26 +00:00
}
2021-06-24 12:22:32 +00:00
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address.
*/
2024-12-20 14:44:50 +00:00
$timelimit = $timelimit('url:{url},ip:{ip}', 128, 60);
$timelimit
2021-06-24 12:22:32 +00:00
->setParam('{ip}', $request->getIP())
->setParam('{url}', $request->getURI());
2024-12-20 14:44:50 +00:00
$abuse = new Abuse($timelimit);
2021-06-24 12:22:32 +00:00
2024-04-01 11:02:47 +00:00
if (System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled' && $abuse->check()) {
2023-10-24 12:32:22 +00:00
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many requests');
2021-06-24 12:22:32 +00:00
}
triggerStats([
METRIC_REALTIME_INBOUND => $rawSize,
], $project->getId());
2021-06-24 12:22:32 +00:00
/*
* Validate Client Domain - Check to avoid CSRF attack.
* Adding Appwrite API domains to allow XDOMAIN communication.
* Skip this check for non-web platforms which are not required to send an origin header.
*/
$origin = $request->getOrigin();
2026-04-09 05:22:31 +00:00
$originValidator = $connectionContainer->get('originValidator');
2021-06-24 12:22:32 +00:00
2025-06-30 14:18:05 +00:00
if (!empty($origin) && !$originValidator->isValid($origin) && $project->getId() !== 'console') {
2023-10-24 12:32:22 +00:00
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $originValidator->getDescription());
2021-06-24 12:22:32 +00:00
}
$roles = $user->getRoles($authorization);
2021-06-24 12:22:32 +00:00
2021-07-13 15:18:02 +00:00
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
$channelCount = \count($channels);
2021-06-24 12:22:32 +00:00
2026-04-10 05:43:03 +00:00
$updateStats = static function (string $projectId, ?string $teamId, string $payloadJson) use ($register, $stats): void {
$register->get('telemetry.connectionCounter')->add(1);
$register->get('telemetry.workerClientCounter')->add(1, $register->get('telemetry.workerAttributes'));
$register->get('telemetry.connectionCreatedCounter')->add(1);
$stats->set($projectId, [
'projectId' => $projectId,
'teamId' => $teamId
]);
$stats->incr($projectId, 'connections');
$stats->incr($projectId, 'connectionsTotal');
triggerStats([
METRIC_REALTIME_CONNECTIONS => 1,
METRIC_REALTIME_OUTBOUND => \strlen($payloadJson),
], $projectId);
};
2021-06-24 12:22:32 +00:00
/**
* Channels Check
*/
if (empty($channels)) {
// in case of message based 'subscribe' channels will be empty at first and only projectId and roles will be available
2026-04-09 12:34:01 +00:00
$sanitizedUser = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
$connectedPayloadJson = json_encode([
'type' => 'connected',
'data' => [
'channels' => [],
'subscriptions' => [],
2026-04-09 12:34:01 +00:00
'user' => $sanitizedUser
]
]);
$realtime->subscribe($project->getId(), $connection, '', $roles, [], [], $user->getId());
2026-04-09 12:34:01 +00:00
$realtime->connections[$connection]['authorization'] = $authorization;
$server->send([$connection], $connectedPayloadJson);
$outboundBytes += \strlen($connectedPayloadJson);
2026-04-10 05:43:03 +00:00
$updateStats($project->getId(), $project->getAttribute('teamId'), $connectedPayloadJson);
$subscriptionMode = 'message';
$success = true;
return;
2021-06-24 12:22:32 +00:00
}
2026-02-05 05:47:54 +00:00
$names = array_keys($channels);
$subscriptionMode = 'url';
2026-02-03 06:13:23 +00:00
try {
2026-02-05 05:47:54 +00:00
$subscriptions = Realtime::constructSubscriptions(
$names,
2026-02-03 06:13:23 +00:00
fn ($channel) => $request->getQuery($channel, null)
);
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}
2026-02-05 05:47:54 +00:00
$mapping = [];
foreach ($subscriptions as $index => $subscription) {
$subscriptionId = ID::unique();
$realtime->subscribe(
$project->getId(),
$connection,
$subscriptionId,
$roles,
$subscription['channels'],
$subscription['queries'],
$user->getId()
);
2026-02-05 05:47:54 +00:00
$mapping[$index] = $subscriptionId;
}
$subscriptionCount = \count($subscriptions);
if (!empty($subscriptions)) {
$register->get('telemetry.workerSubscriptionCounter')->add(\count($subscriptions), $register->get('telemetry.workerAttributes'));
}
$realtime->connections[$connection]['authorization'] = $authorization;
2022-07-25 12:37:29 +00:00
$user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT);
2021-08-27 09:20:49 +00:00
$connectedPayloadJson = json_encode([
2021-08-27 09:20:49 +00:00
'type' => 'connected',
'data' => [
2026-02-05 05:47:54 +00:00
'channels' => $names,
'subscriptions' => $mapping,
2021-08-27 09:20:49 +00:00
'user' => $user
]
]);
$server->send([$connection], $connectedPayloadJson);
$outboundBytes += \strlen($connectedPayloadJson);
2026-04-10 05:43:03 +00:00
$updateStats($project->getId(), $project->getAttribute('teamId'), $connectedPayloadJson);
$success = true;
2023-10-24 12:32:22 +00:00
} catch (Throwable $th) {
Span::error($th);
// Convert known Utopia DB exceptions to AppwriteException so isPublishable()
// suppresses expected client errors (permission denied, query timeout) from Sentry.
if ($th instanceof AuthorizationException) {
$th = new AppwriteException(AppwriteException::USER_UNAUTHORIZED, previous: $th);
} elseif ($th instanceof TimeoutException) {
$th = new AppwriteException(AppwriteException::DATABASE_TIMEOUT, previous: $th);
}
logError($th, 'realtime', project: $project, user: $logUser, authorization: $authorization);
// Handle SQL error code is 'HY000'
$code = $th->getCode();
2026-02-26 10:37:26 +00:00
if (!\is_int($code)) {
$code = 500;
}
$responseCode = $code;
$message = $th->getMessage();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
$realtimeViolation = $th instanceof AppwriteException && $th->getType() === AppwriteException::REALTIME_POLICY_VIOLATION;
2026-04-09 05:22:31 +00:00
if (($code === 0 || $code >= 500) && !$realtimeViolation && System::getEnv('_APP_ENV', 'production') !== 'development') {
$message = 'Error: Server Error';
}
2021-06-24 12:22:32 +00:00
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
2021-08-27 08:20:44 +00:00
]
2021-06-24 12:22:32 +00:00
];
2021-08-19 08:03:52 +00:00
$responsePayloadJson = json_encode($response);
$server->send([$connection], $responsePayloadJson);
$outboundBytes += \strlen($responsePayloadJson);
$server->close($connection, $code);
2026-04-09 05:22:31 +00:00
if (System::getEnv('_APP_ENV', 'production') === 'development') {
Console::error('[Error] Connection Error');
Console::error('[Error] Code: ' . $response['data']['code']);
Console::error('[Error] Message: ' . $response['data']['message']);
2021-08-19 08:03:52 +00:00
}
} finally {
Span::add('realtime.success', $success);
Span::add('realtime.response_code', $responseCode);
Span::add('realtime.subscription_mode', $subscriptionMode);
Span::add('realtime.channel_count', $channelCount);
Span::add('realtime.subscription_count', $subscriptionCount);
Span::add('realtime.outbound_bytes', $outboundBytes);
if (!empty($project?->getId())) {
Span::add('project.id', $project->getId());
}
if (!empty($logUser?->getId())) {
Span::add('user.id', $logUser->getId());
}
Span::current()?->finish();
2021-06-24 12:22:32 +00:00
}
});
$server->onMessage(function (int $connection, string $message) use ($container, $server, $realtime, $containerId, $register, $presenceState, $messageDispatcher) {
$project = null;
$authorization = null;
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
$rawSize = \strlen($message);
$messageType = 'invalid';
$outboundBytes = 0;
$responseCode = 200;
$success = false;
Span::init('realtime.message');
Span::add('realtime.connection.id', $connection);
Span::add('realtime.inbound_bytes', $rawSize);
Span::add('realtime.container.id', $containerId);
try {
2021-08-27 09:20:49 +00:00
$response = new Response(new SwooleResponse());
// Build a fresh Authorization per message. The connection-scoped instance is shared
// across coroutines, and `Authorization::skip()` toggles instance state — concurrent
// messages on the same connection (e.g. `authentication` + `presence` sent back-to-back)
// would interleave skip/restore and leak permission checks into supposedly-skipped lookups.
$authorization = new Authorization();
$connectionAuthorization = $realtime->connections[$connection]['authorization'] ?? null;
if ($connectionAuthorization !== null) {
foreach ($connectionAuthorization->getRoles() as $role) {
$authorization->addRole($role);
}
}
$connectionRoles = $realtime->connections[$connection]['roles'] ?? [];
foreach ($connectionRoles as $role) {
if ($authorization->hasRole($role)) {
continue;
}
$authorization->addRole($role);
}
2022-10-17 17:26:21 +00:00
$database = getConsoleDB();
$database->setAuthorization($authorization);
if (!empty($projectId) && $projectId !== 'console') {
// Negative-cache race: if any prior code path queried projects:$projectId
// before this project existed (e.g. a router probe during connection
// setup), the Database's shared cache may hold an empty result. Try the
// cached read first, and only purge/retry when the first lookup reports
// not-found so the shared cache remains effective for normal traffic.
try {
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
} catch (AppwriteException $e) {
if ($e->getCode() !== 404) {
throw $e;
}
$database->purgeCachedDocument('projects', $projectId);
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
}
$database = getProjectDB($project);
$database->setAuthorization($authorization);
2023-10-24 12:32:22 +00:00
} else {
$project = null;
}
if ($project !== null) {
checkForProjectUsage($project);
}
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection.
*/
$timeLimit = getTimelimit('url:{url},connection:{connection}', 32, 60);
$timeLimit
->setParam('{connection}', $connection)
->setParam('{container}', $containerId);
$abuse = new Abuse($timeLimit);
2024-04-01 11:02:47 +00:00
if ($abuse->check() && System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') === 'enabled') {
2023-10-24 12:32:22 +00:00
throw new Exception(Exception::REALTIME_TOO_MANY_MESSAGES, 'Too many messages.');
}
// Record realtime inbound bytes for this project
2026-05-04 10:29:25 +00:00
// not making this a part of the dispatcher as we need to get the inbound bytes as well even if we dont enter the dispatcher
if ($project !== null && !$project->isEmpty()) {
triggerStats([
METRIC_REALTIME_INBOUND => $rawSize,
], $project->getId());
}
$message = json_decode($message, true);
if (is_null($message) || (!array_key_exists('type', $message) && !array_key_exists('data', $message))) {
2023-10-24 12:32:22 +00:00
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message format is not valid.');
}
2026-04-22 11:43:35 +00:00
$messageType = $message['type'] ?? 'invalid';
2026-04-22 12:14:18 +00:00
if (!\is_scalar($messageType)) {
2026-04-22 11:31:08 +00:00
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.');
}
2026-05-04 10:29:25 +00:00
// Child of the global container: per-message values like $connection and $project
// live on this scope so concurrent message coroutines don't clobber each other,
// while globally-registered services (pools, ...) remain reachable via the parent.
2026-05-04 10:29:25 +00:00
$messageContainer = new Container($container);
$messageContainer->set('connectionId', fn () => $connection);
2026-05-04 05:48:22 +00:00
$messageContainer->set('server', fn () => $server);
$messageContainer->set('realtime', fn () => $realtime);
$messageContainer->set('register', fn () => $register);
$messageContainer->set('response', fn () => $response);
$messageContainer->set('presenceState', fn () => $presenceState);
$messageContainer->set('database', fn () => $database);
$messageContainer->set('authorization', fn () => $authorization);
$messageContainer->set('project', fn () => $project);
$messageContainer->set('projectId', fn () => $projectId);
$messageContainer->set('queueForEvents', fn () => getQueueForEvents());
$messageContainer->set('queueForRealtime', fn () => getQueueForRealtime());
$responsePayload = $messageDispatcher->dispatch($messageContainer, $message);
if ($responsePayload !== null) {
$responseJson = json_encode($responsePayload);
if ($responseJson === false) {
throw new \RuntimeException(
'Failed to encode realtime response payload: ' . json_last_error_msg()
);
}
$server->send([$connection], $responseJson);
$bytes = \strlen($responseJson);
$outboundBytes += $bytes;
if ($project !== null && !$project->isEmpty()) {
triggerStats([METRIC_REALTIME_OUTBOUND => $bytes], $project->getId());
}
}
2026-04-10 05:34:44 +00:00
$success = true;
2023-10-24 12:32:22 +00:00
} catch (Throwable $th) {
Span::error($th);
// Convert known Utopia DB exceptions to AppwriteException so isPublishable()
// suppresses expected client errors (permission denied, query timeout) from Sentry.
if ($th instanceof AuthorizationException) {
$th = new AppwriteException(AppwriteException::USER_UNAUTHORIZED, previous: $th);
} elseif ($th instanceof TimeoutException) {
$th = new AppwriteException(AppwriteException::DATABASE_TIMEOUT, previous: $th);
}
logError($th, 'realtimeMessage', project: $project, authorization: $authorization);
$code = $th->getCode();
2025-10-06 04:27:54 +00:00
if (!is_int($code)) {
$code = 500;
}
$responseCode = $code;
2025-10-06 04:27:54 +00:00
$message = $th->getMessage();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
2026-04-09 05:22:31 +00:00
if (($code === 0 || $code >= 500) && System::getEnv('_APP_ENV', 'production') !== 'development') {
$message = 'Error: Server Error';
}
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error',
'data' => [
'code' => $code,
'message' => $message
2021-08-27 08:20:44 +00:00
]
];
$responsePayloadJson = json_encode($response);
$server->send([$connection], $responsePayloadJson);
$outboundBytes += \strlen($responsePayloadJson);
if ($th->getCode() === 1008) {
$server->close($connection, $th->getCode());
}
} finally {
Span::add('realtime.success', $success);
Span::add('realtime.response_code', $responseCode);
Span::add('realtime.outbound_bytes', $outboundBytes);
Span::add('project.id', $project?->getId() ?? $projectId);
Span::add('user.id', $realtime->connections[$connection]['userId'] ?? null);
Span::add('realtime.message_type', $messageType);
Span::current()?->finish();
}
2021-06-24 12:22:32 +00:00
});
$server->onClose(function (int $connection) use ($realtime, $stats, $register, $container, $presenceState) {
2026-04-22 10:12:17 +00:00
$projectId = null;
$userId = null;
$subscriptionsBeforeClose = 0;
$success = false;
Span::init('realtime.close');
Span::add('realtime.connection.id', $connection);
2026-04-22 10:12:17 +00:00
if (array_key_exists($connection, $realtime->connections)) {
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
$userId = $realtime->connections[$connection]['userId'] ?? null;
}
try {
if (array_key_exists($connection, $realtime->connections)) {
$stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal');
$register->get('telemetry.connectionCounter')->add(-1);
$register->get('telemetry.workerClientCounter')->add(-1, $register->get('telemetry.workerAttributes'));
$subscriptionsBeforeClose = \count($realtime->getSubscriptionMetadata($connection));
if ($subscriptionsBeforeClose > 0) {
$register->get('telemetry.workerSubscriptionCounter')->add(-$subscriptionsBeforeClose, $register->get('telemetry.workerAttributes'));
}
$projectId = $realtime->connections[$connection]['projectId'];
/** @var array<string, Document> $presencesById */
$presencesById = $realtime->connections[$connection]['presences'] ?? [];
2026-04-15 08:08:34 +00:00
if (
!empty($presencesById)
2026-04-15 08:08:34 +00:00
&& $projectId !== 'console'
) {
go(function () use ($presencesById, $projectId, $userId, $container, $presenceState): void {
// Fresh span: the parent realtime.close span finishes before this coroutine
Span::init('realtime.close.presenceCleanup');
Span::add('realtime.projectId', $projectId);
Span::add('realtime.presenceCount', \count($presencesById));
2026-04-15 08:08:34 +00:00
try {
$dbForPlatform = getConsoleDB();
$project = $dbForPlatform->getAuthorization()->skip(fn () => $dbForPlatform->getDocument('projects', $projectId));
if ($project->isEmpty()) {
return;
}
2026-04-15 08:08:34 +00:00
$presenceIds = \array_keys($presencesById);
$presences = \array_values($presencesById);
2026-04-15 08:08:34 +00:00
$dbForProject = getProjectDB($project);
$user = new User([]);
if (!empty($userId)) {
try {
$fetched = $dbForProject->getAuthorization()->skip(
fn () => $dbForProject->getDocument('users', $userId)
);
if (!$fetched->isEmpty()) {
$user = new User($fetched->getArrayCopy());
}
} catch (Throwable) {
// Fall back to empty User if lookup fails.
}
}
/** @var UsagePublisher $publisherForUsage */
$publisherForUsage = $container->get('publisherForUsage');
/** @var array<string, true> $deletedIds */
$deletedIds = [];
2026-04-17 06:34:55 +00:00
try {
2026-05-18 06:18:52 +00:00
$deletionCount = $dbForProject->getAuthorization()->skip(
function () use ($dbForProject, $presenceIds, &$deletedIds): int {
return $dbForProject->deleteDocuments(
'presenceLogs',
[Query::equal('$id', $presenceIds)],
onNext: function (Document $deleted) use (&$deletedIds): void {
$deletedIds[$deleted->getId()] = true;
},
);
}
);
$presenceState->triggerUsage($publisherForUsage, $project, -$deletionCount);
} catch (Throwable $th) {
Span::error($th);
logError($th, 'realtimeOnClosePresenceDeletion', tags: [
'projectId' => $projectId,
'presences' => \count($presences)
]);
2026-04-17 06:34:55 +00:00
}
2026-04-16 07:05:50 +00:00
$queueForEvents = getQueueForEvents();
$queueForRealtime = getQueueForRealtime();
2026-04-16 07:05:50 +00:00
foreach ($presences as $presence) {
if (!isset($deletedIds[$presence->getId()])) {
continue;
}
2026-04-16 07:05:50 +00:00
try {
$presenceState->triggerEvent(
$queueForEvents,
$queueForRealtime,
$project,
$user,
'presences.[presenceId].delete',
$presence,
);
2026-04-16 07:05:50 +00:00
} catch (Throwable) {
// Swallow errors to avoid breaking disconnect cleanup
}
}
} catch (Throwable $th) {
Span::error($th);
logError($th, 'realtimeOnClosePresenceCleanup', tags: [
'projectId' => $projectId,
]);
} finally {
Span::current()?->finish();
2026-04-15 08:08:34 +00:00
}
});
2026-04-15 08:08:34 +00:00
}
triggerStats([
METRIC_REALTIME_CONNECTIONS => -1,
], $projectId);
}
$success = true;
} catch (\Throwable $th) {
// Log only; do not rethrow. If we let this bubble, Swoole dumps full coroutine
// backtraces and unsubscribe() below would never run (connection cleanup would fail).
Console::error('Realtime onClose error: ' . $th->getMessage());
Span::error($th);
} finally {
2026-04-22 11:43:35 +00:00
try {
$realtime->unsubscribe($connection);
} catch (\Throwable $th) {
Console::error('Realtime onClose unsubscribe error: ' . $th->getMessage());
Span::error($th);
}
Span::add('realtime.success', $success);
if (!empty($projectId)) {
Span::add('project.id', $projectId);
}
if (!empty($userId)) {
Span::add('user.id', $userId);
}
Span::add('realtime.subscriptions_before_close', $subscriptionsBeforeClose);
2026-04-22 11:37:53 +00:00
Span::current()?->finish();
2021-06-24 12:22:32 +00:00
}
2021-06-28 14:34:28 +00:00
2021-06-24 12:22:32 +00:00
Console::info('Connection close: ' . $connection);
});
$server->start();