2020-10-16 07:31:09 +00:00
< ? php
2026-04-16 07:05:50 +00:00
use Appwrite\Event\Event as QueueEvent ;
2026-04-28 11:49:24 +00:00
use Appwrite\Event\Publisher\Usage as UsagePublisher ;
2026-04-16 07:05:50 +00:00
use Appwrite\Event\Realtime as QueueRealtime ;
2026-04-16 12:17:34 +00:00
use Appwrite\Extend\Exception ;
use Appwrite\Extend\Exception as AppwriteException ;
2021-06-28 14:34:28 +00:00
use Appwrite\Messaging\Adapter\Realtime ;
2021-06-24 12:22:32 +00:00
use Appwrite\Network\Validator\Origin ;
2026-05-13 13:42:11 +00:00
use Appwrite\Presences\State as PresenceState ;
2025-05-14 06:14:07 +00:00
use Appwrite\PubSub\Adapter\Pool as PubSubPool ;
2026-05-04 05:48:22 +00:00
use Appwrite\Realtime\Message\Dispatcher as MessageDispatcher ;
2026-05-06 06:50:06 +00:00
use Appwrite\Realtime\Message\Handlers\Authentication as AuthenticationHandler ;
use Appwrite\Realtime\Message\Handlers\Ping as PingHandler ;
use Appwrite\Realtime\Message\Handlers\Presence as PresenceHandler ;
use Appwrite\Realtime\Message\Handlers\Subscribe as SubscribeHandler ;
use Appwrite\Realtime\Message\Handlers\Unsubscribe as UnsubscribeHandler ;
2025-11-04 05:25:20 +00:00
use Appwrite\Utopia\Database\Documents\User ;
2023-10-24 12:32:22 +00:00
use Appwrite\Utopia\Request ;
2021-08-27 09:20:49 +00:00
use Appwrite\Utopia\Response ;
2025-12-03 16:02:22 +00:00
use Swoole\Coroutine ;
2021-06-24 12:22:32 +00:00
use Swoole\Http\Request as SwooleRequest ;
use Swoole\Http\Response as SwooleResponse ;
use Swoole\Runtime ;
use Swoole\Table ;
use Swoole\Timer ;
use Utopia\Abuse\Abuse ;
2024-12-20 14:44:50 +00:00
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis ;
2025-05-14 06:14:07 +00:00
use Utopia\Cache\Adapter\Pool as CachePool ;
2024-03-06 17:34:21 +00:00
use Utopia\Cache\Adapter\Sharding ;
use Utopia\Cache\Cache ;
use Utopia\Config\Config ;
2026-02-10 05:04:24 +00:00
use Utopia\Console ;
2025-05-14 06:14:07 +00:00
use Utopia\Database\Adapter\Pool as DatabasePool ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Database ;
2022-07-14 13:12:44 +00:00
use Utopia\Database\DateTime ;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Document ;
2026-05-18 16:19:13 +00:00
use Utopia\Database\Exception\Authorization as AuthorizationException ;
2026-01-16 12:48:24 +00:00
use Utopia\Database\Exception\Query as QueryException ;
2026-05-18 16:19:13 +00:00
use Utopia\Database\Exception\Timeout as TimeoutException ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Helpers\ID ;
use Utopia\Database\Helpers\Role ;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Query ;
2026-02-19 18:50:48 +00:00
use Utopia\Database\Validator\Authorization ;
2026-03-16 17:30:36 +00:00
use Utopia\DI\Container ;
2024-05-06 05:33:36 +00:00
use Utopia\DSN\DSN ;
2024-03-06 17:34:21 +00:00
use Utopia\Logger\Log ;
2025-05-14 06:14:07 +00:00
use Utopia\Pools\Group ;
2026-04-16 07:05:50 +00:00
use Utopia\Queue\Broker\Pool as BrokerPool ;
2026-04-28 11:49:24 +00:00
use Utopia\Queue\Queue ;
2025-05-14 06:14:07 +00:00
use Utopia\Registry\Registry ;
2026-04-22 10:12:17 +00:00
use Utopia\Span\Span ;
2024-04-01 11:08:46 +00:00
use Utopia\System\System ;
2024-11-12 09:45:00 +00:00
use Utopia\Telemetry\Adapter\None as NoTelemetry ;
2021-06-24 12:22:32 +00:00
use Utopia\WebSocket\Adapter ;
2024-03-06 17:34:21 +00:00
use Utopia\WebSocket\Server ;
2021-02-24 17:12:38 +00:00
2021-06-14 10:48:31 +00:00
require_once __DIR__ . '/init.php' ;
2026-04-27 13:03:47 +00:00
2026-04-28 05:37:16 +00:00
if ( System :: getEnv ( '_APP_EDITION' , 'self-hosted' ) === 'self-hosted' ) {
2026-04-27 13:03:47 +00:00
require_once __DIR__ . '/init/span.php' ;
}
2020-10-16 07:31:09 +00:00
2026-04-09 05:22:31 +00:00
/** @var Registry $register */
$register = $GLOBALS [ 'register' ] ? ? throw new \RuntimeException ( 'Registry not initialized' );
2026-04-10 04:49:41 +00:00
$registerConnectionResources ? ? = require __DIR__ . '/init/realtime/connection.php' ;
2026-03-23 04:36:58 +00:00
2021-06-24 12:22:32 +00:00
Runtime :: enableCoroutine ( SWOOLE_HOOK_ALL );
2020-10-18 11:51:16 +00:00
2026-03-17 19:51:47 +00:00
// Log uncaught exceptions in one line instead of relying on Swoole's full backtrace dump
set_exception_handler ( function ( \Throwable $e ) {
Console :: error ( sprintf (
'Realtime uncaught exception: %s in %s:%d' ,
$e -> getMessage (),
$e -> getFile (),
$e -> getLine ()
));
});
2026-05-14 07:16:58 +00:00
global $container ;
if ( ! $container -> has ( 'pools' )) {
$container -> set ( 'pools' , function ( $register ) {
return $register -> get ( 'pools' );
}, [ 'register' ]);
}
if ( ! $container -> has ( 'publisherForUsage' )) {
$container -> set ( 'publisherForUsage' , function ( Group $pools ) : UsagePublisher {
$statsUsageConnection = System :: getEnv ( '_APP_CONNECTIONS_QUEUE_STATS_USAGE' , '' );
$publisherPoolName = 'publisher' ;
if ( ! empty ( $statsUsageConnection )) {
try {
$pools -> get ( 'publisher_' . $statsUsageConnection );
$publisherPoolName = 'publisher_' . $statsUsageConnection ;
} catch ( Throwable ) {
// Fallback to default publisher pool when custom one is unavailable.
}
}
return new UsagePublisher (
new BrokerPool ( publisher : $pools -> get ( $publisherPoolName )),
new Queue ( System :: getEnv (
'_APP_STATS_USAGE_QUEUE_NAME' ,
QueueEvent :: STATS_USAGE_QUEUE_NAME
))
);
}, [ 'pools' ]);
}
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getConsoleDB' )) {
2024-03-20 12:47:20 +00:00
function getConsoleDB () : Database
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-19 08:35:30 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'dbForPlatform' ])) {
return $ctx [ 'dbForPlatform' ];
2025-05-14 06:14:07 +00:00
}
2022-10-17 17:26:21 +00:00
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
/** @var Group $pools */
$pools = $register -> get ( 'pools' );
2022-10-17 17:26:21 +00:00
2025-05-14 06:14:07 +00:00
$adapter = new DatabasePool ( $pools -> get ( 'console' ));
$database = new Database ( $adapter , getCache ());
2024-03-20 12:47:20 +00:00
$database
2026-01-15 05:43:28 +00:00
-> setDatabase ( APP_DATABASE )
2024-03-20 12:47:20 +00:00
-> setNamespace ( '_console' )
-> setMetadata ( 'host' , \gethostname ())
-> setMetadata ( 'project' , '_console' );
2025-11-05 05:31:17 +00:00
$database -> setDocumentType ( 'users' , User :: class );
2025-12-05 03:55:34 +00:00
return $ctx [ 'dbForPlatform' ] = $database ;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getProjectDB' )) {
2024-03-20 12:47:20 +00:00
function getProjectDB ( Document $project ) : Database
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
if ( ! isset ( $ctx [ 'dbForProject' ])) {
$ctx [ 'dbForProject' ] = [];
2025-12-03 16:02:22 +00:00
}
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'dbForProject' ][ $project -> getSequence ()])) {
return $ctx [ 'dbForProject' ][ $project -> getSequence ()];
2025-05-14 06:14:07 +00:00
}
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
/** @var Group $pools */
2024-03-20 12:47:20 +00:00
$pools = $register -> get ( 'pools' );
2022-10-17 17:26:21 +00:00
2024-03-20 12:47:20 +00:00
if ( $project -> isEmpty () || $project -> getId () === 'console' ) {
return getConsoleDB ();
}
2022-10-17 17:26:21 +00:00
2024-05-06 06:13:41 +00:00
try {
$dsn = new DSN ( $project -> getAttribute ( 'database' ));
} catch ( \InvalidArgumentException ) {
2024-05-07 02:07:04 +00:00
// TODO: Temporary until all projects are using shared tables
2024-05-06 06:13:41 +00:00
$dsn = new DSN ( 'mysql://' . $project -> getAttribute ( 'database' ));
}
2024-05-06 05:33:36 +00:00
2025-05-14 06:14:07 +00:00
$adapter = new DatabasePool ( $pools -> get ( $dsn -> getHost ()));
2024-05-06 05:33:36 +00:00
$database = new Database ( $adapter , getCache ());
2022-10-17 17:26:21 +00:00
2024-11-12 09:27:24 +00:00
$sharedTables = \explode ( ',' , System :: getEnv ( '_APP_DATABASE_SHARED_TABLES' , '' ));
if ( \in_array ( $dsn -> getHost (), $sharedTables )) {
2026-04-29 12:41:56 +00:00
$collections = Config :: getParam ( 'collections' , []);
$projectCollections = $collections [ 'projects' ] ? ? [];
$projectsGlobalCollections = array_keys ( $projectCollections );
$projectsGlobalCollections [] = 'audit' ;
2024-04-30 07:40:47 +00:00
$database
-> setSharedTables ( true )
2026-04-29 12:41:56 +00:00
-> setGlobalCollections ( $projectsGlobalCollections )
2026-03-13 12:16:14 +00:00
-> setTenant ( $project -> getSequence ())
2024-05-06 05:33:36 +00:00
-> setNamespace ( $dsn -> getParam ( 'namespace' ));
2024-04-30 07:40:47 +00:00
} else {
$database
-> setSharedTables ( false )
-> setTenant ( null )
2025-05-26 05:42:11 +00:00
-> setNamespace ( '_' . $project -> getSequence ());
2024-04-30 07:40:47 +00:00
}
2023-10-18 03:21:10 +00:00
2024-03-07 13:52:13 +00:00
$database
2026-01-15 06:08:25 +00:00
-> setDatabase ( APP_DATABASE )
2024-03-20 12:47:20 +00:00
-> setMetadata ( 'host' , \gethostname ())
-> setMetadata ( 'project' , $project -> getId ());
2022-10-17 17:26:21 +00:00
2025-11-05 05:31:17 +00:00
$database -> setDocumentType ( 'users' , User :: class );
2025-12-05 03:55:34 +00:00
return $ctx [ 'dbForProject' ][ $project -> getSequence ()] = $database ;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 13:14:23 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getCache' )) {
2024-03-20 13:14:23 +00:00
function getCache () : Cache
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'cache' ])) {
return $ctx [ 'cache' ];
2025-05-14 06:14:07 +00:00
}
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
$pools = $register -> get ( 'pools' ); /** @var Group $pools */
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
$list = Config :: getParam ( 'pools-cache' , []);
$adapters = [];
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
foreach ( $list as $value ) {
2025-05-14 06:14:07 +00:00
$adapters [] = new CachePool ( $pools -> get ( $value ));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
return $ctx [ 'cache' ] = new Cache ( new Sharding ( $adapters ));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-12-20 14:44:50 +00:00
// Allows overriding
if ( ! function_exists ( 'getRedis' )) {
function getRedis () : \Redis
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'redis' ])) {
return $ctx [ 'redis' ];
2025-05-14 06:14:07 +00:00
}
2024-12-20 14:44:50 +00:00
$host = System :: getEnv ( '_APP_REDIS_HOST' , 'localhost' );
$port = System :: getEnv ( '_APP_REDIS_PORT' , 6379 );
$pass = System :: getEnv ( '_APP_REDIS_PASS' , '' );
$redis = new \Redis ();
@ $redis -> pconnect ( $host , ( int ) $port );
if ( $pass ) {
$redis -> auth ( $pass );
}
$redis -> setOption ( \Redis :: OPT_READ_TIMEOUT , - 1 );
2025-12-05 03:55:34 +00:00
return $ctx [ 'redis' ] = $redis ;
2024-12-20 14:44:50 +00:00
}
}
if ( ! function_exists ( 'getTimelimit' )) {
2025-12-05 03:55:34 +00:00
function getTimelimit ( string $key = " " , int $limit = 0 , int $seconds = 1 ) : TimeLimitRedis
2024-12-20 14:44:50 +00:00
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'timelimit' ])) {
return $ctx [ 'timelimit' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'timelimit' ] = new TimeLimitRedis ( $key , $limit , $seconds , getRedis ());
2024-12-20 14:44:50 +00:00
}
}
2024-09-09 08:52:37 +00:00
if ( ! function_exists ( 'getRealtime' )) {
function getRealtime () : Realtime
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'realtime' ])) {
return $ctx [ 'realtime' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'realtime' ] = new Realtime ();
2024-09-09 08:52:37 +00:00
}
}
2026-04-16 07:05:50 +00:00
2024-11-12 09:45:00 +00:00
if ( ! function_exists ( 'getTelemetry' )) {
function getTelemetry ( int $workerId ) : Utopia\Telemetry\Adapter
{
2025-12-05 03:55:34 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'telemetry' ])) {
return $ctx [ 'telemetry' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'telemetry' ] = new NoTelemetry ();
2024-11-12 09:45:00 +00:00
}
}
2026-05-14 06:18:41 +00:00
if ( ! function_exists ( 'getQueueForEvents' )) {
function getQueueForEvents () : QueueEvent
2026-04-16 07:05:50 +00:00
{
2026-05-13 08:34:41 +00:00
$ctx = Coroutine :: getContext ();
2026-04-16 07:05:50 +00:00
2026-05-13 08:34:41 +00:00
if ( ! isset ( $ctx [ 'queueForEvents' ])) {
global $register ;
/** @var Group $pools */
$pools = $register -> get ( 'pools' );
$ctx [ 'queueForEvents' ] = new QueueEvent ( new BrokerPool (
publisher : $pools -> get ( 'publisher' )
));
}
2026-04-16 07:05:50 +00:00
2026-05-14 06:18:41 +00:00
return $ctx [ 'queueForEvents' ];
2026-05-13 08:34:41 +00:00
}
}
if ( ! function_exists ( 'getQueueForRealtime' )) {
function getQueueForRealtime () : QueueRealtime
{
$ctx = Coroutine :: getContext ();
if ( ! isset ( $ctx [ 'queueForRealtime' ])) {
$ctx [ 'queueForRealtime' ] = new QueueRealtime ();
}
2026-05-14 06:18:41 +00:00
return $ctx [ 'queueForRealtime' ];
2026-04-16 07:05:50 +00:00
}
2026-04-30 10:26:31 +00:00
}
2026-04-16 07:05:50 +00:00
2026-05-13 13:22:14 +00:00
if ( ! function_exists ( 'triggerStats' )) {
function triggerStats ( array $event , string $projectId ) : void
{
}
}
2026-05-18 05:12:11 +00:00
if ( ! function_exists ( 'checkForProjectUsage' )) {
2026-05-18 06:02:37 +00:00
function checkForProjectUsage ( Document $project ) : void
2026-05-18 05:12:11 +00:00
{
}
}
2024-09-09 08:52:37 +00:00
$realtime = getRealtime ();
2026-04-17 06:31:56 +00:00
$presenceState = new PresenceState ();
2021-06-24 12:22:32 +00:00
2026-05-04 05:48:22 +00:00
$messageDispatcher = ( new MessageDispatcher ())
2026-05-13 13:22:14 +00:00
-> addHandler ( new PingHandler ())
-> addHandler ( new AuthenticationHandler ())
-> addHandler ( new SubscribeHandler ())
-> addHandler ( new UnsubscribeHandler ())
-> addHandler ( new PresenceHandler ());
2026-05-04 05:48:22 +00:00
2021-08-18 15:44:11 +00:00
/**
* Table for statistics across all workers .
*/
2021-06-24 12:22:32 +00:00
$stats = new Table ( 4096 , 1 );
$stats -> column ( 'projectId' , Table :: TYPE_STRING , 64 );
2021-08-19 10:14:19 +00:00
$stats -> column ( 'teamId' , Table :: TYPE_STRING , 64 );
2021-06-24 12:22:32 +00:00
$stats -> column ( 'connections' , Table :: TYPE_INT );
$stats -> column ( 'connectionsTotal' , Table :: TYPE_INT );
$stats -> column ( 'messages' , Table :: TYPE_INT );
$stats -> create ();
2021-07-01 10:31:48 +00:00
$containerId = uniqid ();
2021-10-07 15:35:17 +00:00
$statsDocument = null ;
2026-04-21 15:55:56 +00:00
$workerNumber = intval ( System :: getEnv ( '_APP_WORKERS_NUM' , 0 ))
? : intval ( System :: getEnv ( '_APP_CPU_NUM' , swoole_cpu_num ())) * intval ( System :: getEnv ( '_APP_WORKER_PER_CORE' , 6 ));
2021-07-01 10:31:48 +00:00
2024-04-01 11:02:47 +00:00
$adapter = new Adapter\Swoole ( port : System :: getEnv ( 'PORT' , 80 ));
2022-02-28 11:05:11 +00:00
$adapter
-> setPackageMaxLength ( 64000 ) // Default maximum Package Size (64kb)
-> setWorkerNumber ( $workerNumber );
2021-06-24 12:22:32 +00:00
2021-08-18 15:44:11 +00:00
$server = new Server ( $adapter );
2021-06-28 14:34:28 +00:00
2026-02-26 14:47:34 +00:00
// Allows overriding
if ( ! function_exists ( 'logError' )) {
function logError ( Throwable $error , string $action , array $tags = [], ? Document $project = null , ? Document $user = null , ? Authorization $authorization = null ) : void
{
global $register ;
$logger = $register -> get ( 'realtimeLogger' );
2021-06-24 12:22:32 +00:00
2026-05-18 16:19:13 +00:00
// Match HTTP semantics (app/controllers/general.php): AppwriteException uses its
// configured publish flag; everything else publishes only for code 0 or >= 500.
// Without this, expected client errors (e.g. Utopia DB Authorization) hit Sentry.
2026-05-18 16:23:06 +00:00
if ( $error instanceof AppwriteException ) {
2026-05-18 16:19:13 +00:00
$publish = $error -> isPublishable ();
} else {
$publish = $error -> getCode () === 0 || $error -> getCode () >= 500 ;
}
if ( $logger && $publish ) {
2026-02-26 14:47:34 +00:00
$version = System :: getEnv ( '_APP_VERSION' , 'UNKNOWN' );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log = new Log ();
$log -> setNamespace ( " realtime " );
$log -> setServer ( System :: getEnv ( '_APP_LOGGING_SERVICE_IDENTIFIER' , \gethostname ()));
$log -> setVersion ( $version );
$log -> setType ( Log :: TYPE_ERROR );
$log -> setMessage ( $error -> getMessage ());
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> addTag ( 'code' , $error -> getCode ());
$log -> addTag ( 'verboseType' , get_class ( $error ));
2026-02-26 15:40:32 +00:00
$log -> addTag ( 'projectId' , $project ? -> getId () ? : 'n/a' );
$log -> addTag ( 'userId' , $user ? -> getId () ? : 'n/a' );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
foreach ( $tags as $key => $value ) {
$log -> addTag ( $key , $value ? : 'n/a' );
}
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> addExtra ( 'file' , $error -> getFile ());
$log -> addExtra ( 'line' , $error -> getLine ());
$log -> addExtra ( 'trace' , $error -> getTraceAsString ());
$log -> addExtra ( 'detailedTrace' , $error -> getTrace ());
$log -> addExtra ( 'roles' , $authorization ? -> getRoles () ? ? []);
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> setAction ( $action );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$isProduction = System :: getEnv ( '_APP_ENV' , 'development' ) === 'production' ;
$log -> setEnvironment ( $isProduction ? Log :: ENVIRONMENT_PRODUCTION : Log :: ENVIRONMENT_STAGING );
try {
$responseCode = $logger -> addLog ( $log );
Console :: info ( 'Error log pushed with status code: ' . $responseCode );
} catch ( Throwable $th ) {
Console :: error ( 'Error pushing log: ' . $th -> getMessage ());
}
2024-09-26 10:51:51 +00:00
}
2021-12-27 10:35:51 +00:00
2026-02-26 14:47:34 +00:00
Console :: error ( '[Error] Type: ' . get_class ( $error ));
Console :: error ( '[Error] Message: ' . $error -> getMessage ());
Console :: error ( '[Error] File: ' . $error -> getFile ());
Console :: error ( '[Error] Line: ' . $error -> getLine ());
}
}
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$server -> error ( logError ( ... ));
2021-06-28 14:34:28 +00:00
2026-04-01 17:31:11 +00:00
$server -> onStart ( function () use ( $stats , $containerId , & $statsDocument ) {
2022-01-03 14:06:40 +00:00
sleep ( 5 ); // wait for the initial database schema to be ready
2022-08-10 05:42:20 +00:00
Console :: success ( 'Server started successfully' );
2021-08-17 09:08:18 +00:00
2021-07-01 10:31:48 +00:00
/**
2021-08-18 15:44:11 +00:00
* Create document for this worker to share stats across Containers .
2021-07-01 10:31:48 +00:00
*/
2026-04-01 17:31:11 +00:00
go ( function () use ( $containerId , & $statsDocument ) {
2022-06-21 16:00:23 +00:00
$attempts = 0 ;
2022-10-17 17:26:21 +00:00
$database = getConsoleDB ();
2022-10-19 08:35:30 +00:00
2022-06-21 16:00:23 +00:00
do {
try {
$attempts ++ ;
$document = new Document ([
2022-08-14 14:22:38 +00:00
'$id' => ID :: unique (),
2022-08-14 10:33:36 +00:00
'$collection' => ID :: custom ( 'realtime' ),
2022-08-02 09:19:15 +00:00
'$permissions' => [],
2022-06-21 16:00:23 +00:00
'container' => $containerId ,
2022-07-14 13:12:44 +00:00
'timestamp' => DateTime :: now (),
2022-06-21 16:00:23 +00:00
'value' => '{}'
]);
2026-01-14 15:08:00 +00:00
$statsDocument = $database -> getAuthorization () -> skip ( fn () => $database -> createDocument ( 'realtime' , $document ));
2022-06-21 16:00:23 +00:00
break ;
2023-10-24 12:32:22 +00:00
} catch ( Throwable ) {
2022-06-21 16:00:23 +00:00
Console :: warning ( " Collection not ready. Retrying connection ( { $attempts } )... " );
sleep ( DATABASE_RECONNECT_SLEEP );
}
} while ( true );
2021-07-01 10:31:48 +00:00
});
/**
* Save current connections to the Database every 5 seconds .
*/
2024-05-24 04:34:43 +00:00
// TODO: Remove this if check once it doesn't cause issues for cloud
if ( System :: getEnv ( '_APP_EDITION' , 'self-hosted' ) === 'self-hosted' ) {
2026-04-01 17:31:11 +00:00
Timer :: tick ( 5000 , function () use ( $stats , & $statsDocument ) {
2024-05-24 04:34:43 +00:00
$payload = [];
foreach ( $stats as $projectId => $value ) {
$payload [ $projectId ] = $stats -> get ( $projectId , 'connectionsTotal' );
}
if ( empty ( $payload ) || empty ( $statsDocument )) {
return ;
}
try {
$database = getConsoleDB ();
$statsDocument
-> setAttribute ( 'timestamp' , DateTime :: now ())
-> setAttribute ( 'value' , json_encode ( $payload ));
2026-03-06 09:42:07 +00:00
$database -> getAuthorization () -> skip ( fn () => $database -> updateDocument ( 'realtime' , $statsDocument -> getId (), new Document ([
'timestamp' => $statsDocument -> getAttribute ( 'timestamp' ),
'value' => $statsDocument -> getAttribute ( 'value' )
])));
2024-05-24 04:34:43 +00:00
} catch ( Throwable $th ) {
2026-02-26 14:47:34 +00:00
logError ( $th , " updateWorkerDocument " );
2024-05-24 04:34:43 +00:00
}
});
}
2021-06-24 12:22:32 +00:00
});
2026-02-26 14:47:34 +00:00
$server -> onWorkerStart ( function ( int $workerId ) use ( $server , $register , $stats , $realtime ) {
2021-09-26 21:33:36 +00:00
Console :: success ( 'Worker ' . $workerId . ' started successfully' );
2021-06-24 12:22:32 +00:00
2024-11-12 09:45:00 +00:00
$telemetry = getTelemetry ( $workerId );
2026-04-20 06:09:50 +00:00
$realtimeDelayBuckets = [ 100 , 250 , 500 , 750 , 1000 , 1500 , 2000 , 3000 , 5000 , 7500 , 10000 , 15000 , 30000 ];
2026-04-20 06:22:43 +00:00
$workerTelemetryAttributes = [ 'workerId' => ( string ) $workerId ];
2024-11-12 09:45:00 +00:00
$register -> set ( 'telemetry' , fn () => $telemetry );
2026-04-20 06:22:43 +00:00
$register -> set ( 'telemetry.workerAttributes' , fn () => $workerTelemetryAttributes );
2026-04-20 06:15:05 +00:00
$register -> set ( 'telemetry.workerCounter' , fn () => $telemetry -> createUpDownCounter ( 'realtime.server.active_workers' ));
$register -> set ( 'telemetry.workerClientCounter' , fn () => $telemetry -> createUpDownCounter ( 'realtime.server.worker_clients' ));
2026-04-20 06:22:43 +00:00
$register -> set ( 'telemetry.workerSubscriptionCounter' , fn () => $telemetry -> createUpDownCounter ( 'realtime.server.worker_subscriptions' ));
2024-11-12 09:45:00 +00:00
$register -> set ( 'telemetry.connectionCounter' , fn () => $telemetry -> createUpDownCounter ( 'realtime.server.open_connections' ));
$register -> set ( 'telemetry.connectionCreatedCounter' , fn () => $telemetry -> createCounter ( 'realtime.server.connection.created' ));
$register -> set ( 'telemetry.messageSentCounter' , fn () => $telemetry -> createCounter ( 'realtime.server.message.sent' ));
2026-04-17 12:32:04 +00:00
$register -> set ( 'telemetry.deliveryDelayHistogram' , fn () => $telemetry -> createHistogram (
name : 'realtime.server.delivery_delay' ,
unit : 'ms' ,
2026-04-20 06:09:50 +00:00
advisory : [ 'ExplicitBucketBoundaries' => $realtimeDelayBuckets ],
));
$register -> set ( 'telemetry.arrivalDelayHistogram' , fn () => $telemetry -> createHistogram (
name : 'realtime.server.arrival_delay' ,
unit : 'ms' ,
advisory : [ 'ExplicitBucketBoundaries' => $realtimeDelayBuckets ],
2026-04-17 12:32:04 +00:00
));
2026-04-20 06:15:05 +00:00
$register -> get ( 'telemetry.workerCounter' ) -> add ( 1 );
2024-11-12 09:45:00 +00:00
2021-06-24 12:22:32 +00:00
$attempts = 0 ;
$start = time ();
2026-04-01 17:31:11 +00:00
Timer :: tick ( 5000 , function () use ( $server , $realtime , $stats ) {
2021-08-17 11:18:32 +00:00
/**
* Sending current connections to project channels on the console project every 5 seconds .
*/
2024-05-24 04:34:43 +00:00
// TODO: Remove this if check once it doesn't cause issues for cloud
if ( System :: getEnv ( '_APP_EDITION' , 'self-hosted' ) === 'self-hosted' ) {
if ( $realtime -> hasSubscriber ( 'console' , Role :: users () -> toString (), 'project' )) {
$database = getConsoleDB ();
$payload = [];
2026-01-14 15:08:00 +00:00
$list = $database -> getAuthorization () -> skip ( fn () => $database -> find ( 'realtime' , [
2024-05-24 04:34:43 +00:00
Query :: greaterThan ( 'timestamp' , DateTime :: addSeconds ( new \DateTime (), - 15 )),
]));
/**
* Aggregate stats across containers .
*/
foreach ( $list as $document ) {
foreach ( json_decode ( $document -> getAttribute ( 'value' )) as $projectId => $value ) {
if ( array_key_exists ( $projectId , $payload )) {
$payload [ $projectId ] += $value ;
} else {
$payload [ $projectId ] = $value ;
}
}
}
foreach ( $stats as $projectId => $value ) {
if ( ! array_key_exists ( $projectId , $payload )) {
continue ;
}
$event = [
'project' => 'console' ,
'roles' => [ 'team:' . $stats -> get ( $projectId , 'teamId' )],
'data' => [
'events' => [ 'stats.connections' ],
'channels' => [ 'project' ],
'timestamp' => DateTime :: formatTz ( DateTime :: now ()),
'payload' => [
$projectId => $payload [ $projectId ]
]
]
];
2026-04-06 11:40:57 +00:00
$server -> send ( array_keys ( $realtime -> getSubscribers ( $event )), json_encode ([
2024-05-24 04:34:43 +00:00
'type' => 'event' ,
'data' => $event [ 'data' ]
]));
}
}
}
2021-08-17 11:18:32 +00:00
/**
* Sending test message for SDK E2E tests every 5 seconds .
*/
2022-08-17 03:11:49 +00:00
if ( $realtime -> hasSubscriber ( 'console' , Role :: guests () -> toString (), 'tests' )) {
2021-08-17 11:18:32 +00:00
$payload = [ 'response' => 'WS:/v1/realtime:passed' ];
$event = [
2022-06-20 07:37:00 +00:00
'project' => 'console' ,
2022-08-17 03:11:49 +00:00
'roles' => [ Role :: guests () -> toString ()],
2021-08-17 11:18:32 +00:00
'data' => [
2022-05-17 12:09:04 +00:00
'events' => [ 'test.event' ],
2021-08-27 11:26:26 +00:00
'channels' => [ 'tests' ],
2022-10-18 17:10:50 +00:00
'timestamp' => DateTime :: formatTz ( DateTime :: now ()),
2021-08-27 11:26:26 +00:00
'payload' => $payload
2021-08-17 11:18:32 +00:00
]
];
2026-02-05 05:47:54 +00:00
$subscribers = $realtime -> getSubscribers ( $event );
2026-02-03 06:13:23 +00:00
2026-02-05 07:48:40 +00:00
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ( $subscribers as $id => $matched ) {
2026-02-05 07:48:40 +00:00
$key = implode ( ',' , array_keys ( $matched ));
$groups [ $key ][ 'ids' ][] = $id ;
$groups [ $key ][ 'subscriptions' ] = array_keys ( $matched );
}
foreach ( $groups as $group ) {
2026-02-02 16:20:23 +00:00
$data = $event [ 'data' ];
2026-02-05 07:48:40 +00:00
$data [ 'subscriptions' ] = $group [ 'subscriptions' ];
2026-02-02 16:20:23 +00:00
2026-02-05 07:48:40 +00:00
$server -> send ( $group [ 'ids' ], json_encode ([
2026-02-02 16:20:23 +00:00
'type' => 'event' ,
'data' => $data
]));
}
2021-06-24 12:22:32 +00:00
}
});
while ( $attempts < 300 ) {
try {
if ( $attempts > 0 ) {
Console :: error ( 'Pub/sub connection lost (lasted ' . ( time () - $start ) . ' seconds, worker: ' . $workerId . ' ) .
Attempting restart in 5 seconds ( attempt #' . $attempts . ')');
sleep ( 5 ); // 5 sec delay between connection attempts
}
$start = time ();
2025-05-14 06:14:07 +00:00
$pubsub = new PubSubPool ( $register -> get ( 'pools' ) -> get ( 'pubsub' ));
2024-10-31 12:12:03 +00:00
if ( $pubsub -> ping ( true )) {
2021-06-24 12:22:32 +00:00
$attempts = 0 ;
Console :: success ( 'Pub/sub connection established (worker: ' . $workerId . ')' );
} else {
Console :: error ( 'Pub/sub failed (worker: ' . $workerId . ')' );
}
2025-11-27 16:56:33 +00:00
$pubsub -> subscribe ([ 'realtime' ], function ( mixed $redis , string $channel , string $payload ) use ( $server , $workerId , $stats , $register , $realtime ) {
2021-06-24 12:22:32 +00:00
$event = json_decode ( $payload , true );
2026-04-20 06:09:50 +00:00
$eventTimestamp = $event [ 'data' ][ 'timestamp' ] ? ? null ;
if ( \is_string ( $eventTimestamp )) {
try {
2026-04-20 06:43:53 +00:00
$eventDate = new \DateTimeImmutable ( $eventTimestamp , new \DateTimeZone ( 'UTC' ));
2026-04-20 06:09:50 +00:00
$now = new \DateTimeImmutable ( 'now' , new \DateTimeZone ( 'UTC' ));
$eventTimestampMs = ( float ) $eventDate -> format ( 'U.u' ) * 1000 ;
$nowTimestampMs = ( float ) $now -> format ( 'U.u' ) * 1000 ;
$arrivalDelayMs = ( int ) \max ( 0 , $nowTimestampMs - $eventTimestampMs );
$register -> get ( 'telemetry.arrivalDelayHistogram' ) -> record ( $arrivalDelayMs );
} catch ( \Throwable ) {
// Ignore invalid timestamp payloads.
}
}
2021-06-24 12:22:32 +00:00
if ( $event [ 'permissionsChanged' ] && isset ( $event [ 'userId' ])) {
2022-06-20 07:37:00 +00:00
$projectId = $event [ 'project' ];
2021-06-24 12:22:32 +00:00
$userId = $event [ 'userId' ];
2021-06-28 10:18:00 +00:00
2021-06-28 14:34:28 +00:00
if ( $realtime -> hasSubscriber ( $projectId , 'user:' . $userId )) {
$connection = array_key_first ( reset ( $realtime -> subscriptions [ $projectId ][ 'user:' . $userId ]));
2026-04-20 06:22:43 +00:00
$subscriptionsBefore = \count ( $realtime -> getSubscriptionMetadata ( $connection ));
2022-10-17 17:26:21 +00:00
$consoleDatabase = getConsoleDB ();
2026-01-14 15:08:00 +00:00
$project = $consoleDatabase -> getAuthorization () -> skip ( fn () => $consoleDatabase -> getDocument ( 'projects' , $projectId ));
2022-10-17 17:26:21 +00:00
$database = getProjectDB ( $project );
2021-06-28 10:18:00 +00:00
2026-03-29 03:04:34 +00:00
/** @var User $user */
2022-05-09 07:35:55 +00:00
$user = $database -> getDocument ( 'users' , $userId );
2025-11-19 03:21:06 +00:00
2026-01-14 15:08:00 +00:00
$roles = $user -> getRoles ( $database -> getAuthorization ());
2026-02-02 14:15:21 +00:00
$authorization = $realtime -> connections [ $connection ][ 'authorization' ] ? ? null ;
2026-04-27 12:01:31 +00:00
$previousUserId = $realtime -> connections [ $connection ][ 'userId' ] ? ? '' ;
2026-02-03 06:13:23 +00:00
2026-02-05 05:47:54 +00:00
$meta = $realtime -> getSubscriptionMetadata ( $connection );
2021-06-28 10:18:00 +00:00
2024-07-19 00:06:14 +00:00
$realtime -> unsubscribe ( $connection );
2026-02-02 14:15:21 +00:00
2026-02-05 05:47:54 +00:00
foreach ( $meta as $subscriptionId => $subscription ) {
$queries = Query :: parseQueries ( $subscription [ 'queries' ] ? ? []);
2026-04-27 12:01:31 +00:00
$channels = Realtime :: rebindAccountChannels (
$subscription [ 'channels' ] ? ? [],
$previousUserId ,
$userId
);
2026-02-03 08:16:05 +00:00
$realtime -> subscribe (
$projectId ,
$connection ,
$subscriptionId ,
$roles ,
2026-04-27 12:01:31 +00:00
$channels ,
2026-04-27 12:24:48 +00:00
$queries ,
$userId
2026-02-03 08:16:05 +00:00
);
2026-02-02 14:15:21 +00:00
}
2026-02-02 16:20:23 +00:00
// Restore authorization after subscribe
2026-02-02 14:15:21 +00:00
if ( $authorization !== null ) {
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
}
2026-04-20 06:22:43 +00:00
$subscriptionsAfter = \count ( $realtime -> getSubscriptionMetadata ( $connection ));
$subscriptionDelta = $subscriptionsAfter - $subscriptionsBefore ;
if ( $subscriptionDelta !== 0 ) {
$register -> get ( 'telemetry.workerSubscriptionCounter' ) -> add ( $subscriptionDelta , $register -> get ( 'telemetry.workerAttributes' ));
}
2022-05-09 07:35:55 +00:00
}
2021-06-24 12:22:32 +00:00
}
2021-06-28 10:18:00 +00:00
2026-05-13 06:50:12 +00:00
// Strip deleted presences from in-memory connection state so onClose doesn't
// re-fire delete events for rows already removed via HTTP DELETE.
$deletedPresenceId = Realtime :: extractDeletedPresenceId ( $event );
if ( $deletedPresenceId !== null ) {
$realtime -> removePresenceFromConnections (
( string ) ( $event [ 'project' ] ? ? '' ),
$deletedPresenceId ,
);
}
2026-02-05 05:47:54 +00:00
$receivers = $realtime -> getSubscribers ( $event );
2021-06-28 10:18:00 +00:00
2026-04-09 05:22:31 +00:00
if ( System :: getEnv ( '_APP_ENV' , 'production' ) === 'development' && ! empty ( $receivers )) {
2025-12-24 13:26:55 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Receivers: " . count ( $receivers ));
2026-02-05 05:47:54 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Connection IDs: " . json_encode ( array_keys ( $receivers )));
Console :: log ( " [Debug][Worker { $workerId } ] Matched: " . json_encode ( array_values ( $receivers )));
2025-12-24 13:26:55 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Event: " . $payload );
}
2021-06-24 12:22:32 +00:00
2026-02-05 07:48:40 +00:00
// Group connections by matched subscription IDs for batch sending
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ( $receivers as $id => $matched ) {
2026-02-05 07:48:40 +00:00
$key = implode ( ',' , array_keys ( $matched ));
$groups [ $key ][ 'ids' ][] = $id ;
$groups [ $key ][ 'subscriptions' ] = array_keys ( $matched );
}
$total = 0 ;
2026-03-03 13:18:37 +00:00
$outboundBytes = 0 ;
2026-02-05 07:48:40 +00:00
foreach ( $groups as $group ) {
2026-01-28 13:10:30 +00:00
$data = $event [ 'data' ];
2026-02-05 07:48:40 +00:00
$data [ 'subscriptions' ] = $group [ 'subscriptions' ];
2026-02-05 05:47:54 +00:00
2026-03-03 13:18:37 +00:00
$payloadJson = json_encode ([
2026-02-05 05:47:54 +00:00
'type' => 'event' ,
'data' => $data
2026-03-03 13:18:37 +00:00
]);
$server -> send ( $group [ 'ids' ], $payloadJson );
$count = count ( $group [ 'ids' ]);
$total += $count ;
$outboundBytes += strlen ( $payloadJson ) * $count ;
2026-01-28 13:10:30 +00:00
}
2021-06-24 12:22:32 +00:00
2026-02-05 05:47:54 +00:00
if ( $total > 0 ) {
$register -> get ( 'telemetry.messageSentCounter' ) -> add ( $total );
$stats -> incr ( $event [ 'project' ], 'messages' , $total );
2026-04-17 08:38:42 +00:00
$updatedAt = $event [ 'data' ][ 'payload' ][ '$updatedAt' ] ? ? null ;
if ( \is_string ( $updatedAt )) {
try {
2026-04-20 06:43:53 +00:00
$updatedAtDate = new \DateTimeImmutable ( $updatedAt , new \DateTimeZone ( 'UTC' ));
2026-04-17 08:38:42 +00:00
$now = new \DateTimeImmutable ( 'now' , new \DateTimeZone ( 'UTC' ));
$updatedAtTimestampMs = ( float ) $updatedAtDate -> format ( 'U.u' ) * 1000 ;
$nowTimestampMs = ( float ) $now -> format ( 'U.u' ) * 1000 ;
$delayMs = ( int ) \max ( 0 , $nowTimestampMs - $updatedAtTimestampMs );
$register -> get ( 'telemetry.deliveryDelayHistogram' ) -> record ( $delayMs );
} catch ( \Throwable ) {
// Ignore invalid timestamp payloads.
}
}
2026-03-03 13:18:37 +00:00
$projectId = $event [ 'project' ] ? ? null ;
if ( ! empty ( $projectId )) {
2026-03-05 12:40:09 +00:00
$metrics = [
METRIC_REALTIME_CONNECTIONS_MESSAGES_SENT => $total ,
];
2026-03-03 13:18:37 +00:00
2026-03-05 12:40:09 +00:00
if ( $outboundBytes > 0 ) {
$metrics [ METRIC_REALTIME_OUTBOUND ] = $outboundBytes ;
2026-03-03 13:18:37 +00:00
}
2026-03-05 12:40:09 +00:00
triggerStats ( $metrics , $projectId );
2026-03-03 13:18:37 +00:00
}
2021-06-24 12:22:32 +00:00
}
});
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-02-26 14:47:34 +00:00
logError ( $th , " pubSubConnection " );
2021-11-23 14:24:25 +00:00
2021-06-24 12:22:32 +00:00
Console :: error ( 'Pub/sub error: ' . $th -> getMessage ());
$attempts ++ ;
2022-04-04 06:30:07 +00:00
sleep ( DATABASE_RECONNECT_SLEEP );
2021-06-24 12:22:32 +00:00
continue ;
}
}
Console :: error ( 'Failed to restart pub/sub...' );
});
2026-04-20 06:15:05 +00:00
$server -> onWorkerStop ( function ( int $workerId ) use ( $register ) {
Console :: warning ( 'Worker ' . $workerId . ' stopping' );
try {
$register -> get ( 'telemetry.workerCounter' ) -> add ( - 1 );
} catch ( \Throwable $th ) {
Console :: error ( 'Realtime onWorkerStop telemetry error: ' . $th -> getMessage ());
}
});
2026-04-10 03:55:00 +00:00
$server -> onOpen ( function ( int $connection , SwooleRequest $request ) use ( $server , $register , $stats , & $realtime , $registerConnectionResources ) {
2026-03-16 17:30:36 +00:00
global $container ;
2021-06-24 12:22:32 +00:00
$request = new Request ( $request );
2021-08-27 09:20:49 +00:00
$response = new Response ( new SwooleResponse ());
2021-06-24 12:22:32 +00:00
2021-06-29 16:22:10 +00:00
Console :: info ( " Connection open (user: { $connection } ) " );
2021-06-24 12:22:32 +00:00
2026-03-17 04:22:55 +00:00
$connectionContainer = new Container ( $container );
2026-03-19 17:56:49 +00:00
$connectionContainer -> set ( 'request' , fn () => $request );
2026-04-10 03:55:00 +00:00
$registerConnectionResources ( $connectionContainer );
2021-06-24 12:22:32 +00:00
2026-02-26 14:47:34 +00:00
$project = null ;
$logUser = null ;
$authorization = null ;
2026-04-22 11:22:54 +00:00
$rawSize = $request -> getSize ();
$channelCount = 0 ;
$subscriptionCount = 0 ;
$outboundBytes = 0 ;
$responseCode = 200 ;
$subscriptionMode = 'message' ;
$success = false ;
Span :: init ( 'realtime.open' );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.connection.id' , $connection );
Span :: add ( 'realtime.inbound_bytes' , $rawSize );
2026-04-22 11:27:51 +00:00
if ( ! empty ( $request -> getOrigin ())) {
Span :: add ( 'realtime.origin' , $request -> getOrigin ());
}
2026-02-26 14:47:34 +00:00
2021-06-24 12:22:32 +00:00
try {
2023-10-24 12:32:22 +00:00
/** @var Document $project */
2026-04-09 05:22:31 +00:00
$project = $connectionContainer -> get ( 'project' );
$authorization = $connectionContainer -> get ( 'authorization' );
2021-06-24 12:22:32 +00:00
/*
* Project Check
*/
if ( empty ( $project -> getId ())) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , 'Missing or unknown project ID' );
2021-06-24 12:22:32 +00:00
}
2026-04-09 05:22:31 +00:00
$timelimit = $connectionContainer -> get ( 'timelimit' );
$user = $connectionContainer -> get ( 'user' ); /** @var User $user */
2026-03-16 02:15:26 +00:00
$logUser = $user ;
2026-04-09 13:45:06 +00:00
$apis = $project -> getAttribute ( 'apis' , []);
2026-04-09 13:54:00 +00:00
// Websocket is what to check, but realtime is checked too for backwards compatibility
2026-04-09 13:45:06 +00:00
$websocketEnabled = $apis [ 'websocket' ] ? ? $apis [ 'realtime' ] ? ? true ;
2024-03-04 22:12:54 +00:00
if (
2026-04-09 13:45:06 +00:00
! $websocketEnabled
2026-05-18 05:34:54 +00:00
&& ! ( $user -> isPrivileged ( $authorization -> getRoles ()) || $user -> isKey ( $authorization -> getRoles ()))
2024-03-04 22:12:54 +00:00
) {
throw new AppwriteException ( AppwriteException :: GENERAL_API_DISABLED );
}
2026-02-26 10:37:26 +00:00
$projectRegion = $project -> getAttribute ( 'region' , '' );
2026-02-26 11:00:09 +00:00
$currentRegion = System :: getEnv ( '_APP_REGION' , 'default' );
2026-02-26 10:37:26 +00:00
if ( ! empty ( $projectRegion ) && $projectRegion !== $currentRegion ) {
2026-02-26 11:55:15 +00:00
throw new AppwriteException ( AppwriteException :: GENERAL_ACCESS_FORBIDDEN , 'Project is not accessible in this region. Please make sure you are using the correct endpoint' );
2026-02-26 10:37:26 +00:00
}
2021-06-24 12:22:32 +00:00
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address .
*/
2024-12-20 14:44:50 +00:00
$timelimit = $timelimit ( 'url:{url},ip:{ip}' , 128 , 60 );
$timelimit
2021-06-24 12:22:32 +00:00
-> setParam ( '{ip}' , $request -> getIP ())
-> setParam ( '{url}' , $request -> getURI ());
2024-12-20 14:44:50 +00:00
$abuse = new Abuse ( $timelimit );
2021-06-24 12:22:32 +00:00
2024-04-01 11:02:47 +00:00
if ( System :: getEnv ( '_APP_OPTIONS_ABUSE' , 'enabled' ) === 'enabled' && $abuse -> check ()) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_TOO_MANY_MESSAGES , 'Too many requests' );
2021-06-24 12:22:32 +00:00
}
2026-03-06 10:07:30 +00:00
triggerStats ([
METRIC_REALTIME_INBOUND => $rawSize ,
], $project -> getId ());
2026-03-05 12:40:09 +00:00
2021-06-24 12:22:32 +00:00
/*
* Validate Client Domain - Check to avoid CSRF attack .
* Adding Appwrite API domains to allow XDOMAIN communication .
* Skip this check for non - web platforms which are not required to send an origin header .
*/
$origin = $request -> getOrigin ();
2026-04-09 05:22:31 +00:00
$originValidator = $connectionContainer -> get ( 'originValidator' );
2021-06-24 12:22:32 +00:00
2025-06-30 14:18:05 +00:00
if ( ! empty ( $origin ) && ! $originValidator -> isValid ( $origin ) && $project -> getId () !== 'console' ) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , $originValidator -> getDescription ());
2021-06-24 12:22:32 +00:00
}
2026-01-14 15:08:00 +00:00
$roles = $user -> getRoles ( $authorization );
2021-06-24 12:22:32 +00:00
2021-07-13 15:18:02 +00:00
$channels = Realtime :: convertChannels ( $request -> getQuery ( 'channels' , []), $user -> getId ());
2026-04-22 11:22:54 +00:00
$channelCount = \count ( $channels );
2021-06-24 12:22:32 +00:00
2026-04-10 05:43:03 +00:00
$updateStats = static function ( string $projectId , ? string $teamId , string $payloadJson ) use ( $register , $stats ) : void {
2026-04-09 12:08:25 +00:00
$register -> get ( 'telemetry.connectionCounter' ) -> add ( 1 );
2026-04-20 06:22:43 +00:00
$register -> get ( 'telemetry.workerClientCounter' ) -> add ( 1 , $register -> get ( 'telemetry.workerAttributes' ));
2026-04-09 12:08:25 +00:00
$register -> get ( 'telemetry.connectionCreatedCounter' ) -> add ( 1 );
$stats -> set ( $projectId , [
'projectId' => $projectId ,
'teamId' => $teamId
]);
$stats -> incr ( $projectId , 'connections' );
$stats -> incr ( $projectId , 'connectionsTotal' );
triggerStats ([
METRIC_REALTIME_CONNECTIONS => 1 ,
METRIC_REALTIME_OUTBOUND => \strlen ( $payloadJson ),
], $projectId );
};
2021-06-24 12:22:32 +00:00
/**
* Channels Check
*/
if ( empty ( $channels )) {
2026-04-07 11:25:10 +00:00
// in case of message based 'subscribe' channels will be empty at first and only projectId and roles will be available
2026-04-09 12:34:01 +00:00
$sanitizedUser = empty ( $user -> getId ()) ? null : $response -> output ( $user , Response :: MODEL_ACCOUNT );
2026-04-07 11:25:10 +00:00
$connectedPayloadJson = json_encode ([
'type' => 'connected' ,
'data' => [
'channels' => [],
'subscriptions' => [],
2026-04-09 12:34:01 +00:00
'user' => $sanitizedUser
2026-04-07 11:25:10 +00:00
]
]);
2026-04-07 12:05:48 +00:00
$realtime -> subscribe ( $project -> getId (), $connection , '' , $roles , [], [], $user -> getId ());
2026-04-09 12:34:01 +00:00
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
2026-04-07 11:25:10 +00:00
$server -> send ([ $connection ], $connectedPayloadJson );
2026-04-22 11:22:54 +00:00
$outboundBytes += \strlen ( $connectedPayloadJson );
2026-04-10 05:43:03 +00:00
$updateStats ( $project -> getId (), $project -> getAttribute ( 'teamId' ), $connectedPayloadJson );
2026-04-22 11:22:54 +00:00
$subscriptionMode = 'message' ;
$success = true ;
2026-04-07 11:25:10 +00:00
return ;
2021-06-24 12:22:32 +00:00
}
2026-02-05 05:47:54 +00:00
$names = array_keys ( $channels );
2026-04-22 11:22:54 +00:00
$subscriptionMode = 'url' ;
2026-02-03 06:13:23 +00:00
2026-02-02 16:20:23 +00:00
try {
2026-02-05 05:47:54 +00:00
$subscriptions = Realtime :: constructSubscriptions (
$names ,
2026-02-03 06:13:23 +00:00
fn ( $channel ) => $request -> getQuery ( $channel , null )
2026-02-02 16:20:23 +00:00
);
} catch ( QueryException $e ) {
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , $e -> getMessage ());
}
2026-02-02 14:15:21 +00:00
2026-02-05 05:47:54 +00:00
$mapping = [];
foreach ( $subscriptions as $index => $subscription ) {
2026-02-02 16:20:23 +00:00
$subscriptionId = ID :: unique ();
$realtime -> subscribe (
$project -> getId (),
$connection ,
$subscriptionId ,
$roles ,
$subscription [ 'channels' ],
2026-04-07 12:05:48 +00:00
$subscription [ 'queries' ],
$user -> getId ()
2026-02-02 16:20:23 +00:00
);
2026-02-05 05:47:54 +00:00
$mapping [ $index ] = $subscriptionId ;
2026-02-02 14:15:21 +00:00
}
2026-04-22 11:22:54 +00:00
$subscriptionCount = \count ( $subscriptions );
2026-04-20 06:22:43 +00:00
if ( ! empty ( $subscriptions )) {
$register -> get ( 'telemetry.workerSubscriptionCounter' ) -> add ( \count ( $subscriptions ), $register -> get ( 'telemetry.workerAttributes' ));
}
2025-12-03 16:27:56 +00:00
2026-01-14 15:08:00 +00:00
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
2022-07-25 12:37:29 +00:00
$user = empty ( $user -> getId ()) ? null : $response -> output ( $user , Response :: MODEL_ACCOUNT );
2021-08-27 09:20:49 +00:00
2026-03-05 12:40:09 +00:00
$connectedPayloadJson = json_encode ([
2021-08-27 09:20:49 +00:00
'type' => 'connected' ,
'data' => [
2026-02-05 05:47:54 +00:00
'channels' => $names ,
'subscriptions' => $mapping ,
2021-08-27 09:20:49 +00:00
'user' => $user
]
2026-03-05 12:40:09 +00:00
]);
$server -> send ([ $connection ], $connectedPayloadJson );
2026-04-22 11:22:54 +00:00
$outboundBytes += \strlen ( $connectedPayloadJson );
2026-04-10 05:43:03 +00:00
$updateStats ( $project -> getId (), $project -> getAttribute ( 'teamId' ), $connectedPayloadJson );
2026-04-22 11:22:54 +00:00
$success = true ;
2026-03-03 13:18:37 +00:00
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-05-18 16:19:13 +00:00
Span :: error ( $th );
// Convert known Utopia DB exceptions to AppwriteException so isPublishable()
// suppresses expected client errors (permission denied, query timeout) from Sentry.
if ( $th instanceof AuthorizationException ) {
$th = new AppwriteException ( AppwriteException :: USER_UNAUTHORIZED , previous : $th );
} elseif ( $th instanceof TimeoutException ) {
$th = new AppwriteException ( AppwriteException :: DATABASE_TIMEOUT , previous : $th );
}
2026-02-26 15:40:32 +00:00
logError ( $th , 'realtime' , project : $project , user : $logUser , authorization : $authorization );
2021-11-23 14:24:25 +00:00
2024-05-08 22:48:25 +00:00
// Handle SQL error code is 'HY000'
$code = $th -> getCode ();
2026-02-26 10:37:26 +00:00
if ( ! \is_int ( $code )) {
2024-05-08 22:48:25 +00:00
$code = 500 ;
}
2026-04-22 11:22:54 +00:00
$responseCode = $code ;
2024-05-08 22:48:25 +00:00
2025-10-04 13:08:35 +00:00
$message = $th -> getMessage ();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
2026-02-09 07:37:11 +00:00
$realtimeViolation = $th instanceof AppwriteException && $th -> getType () === AppwriteException :: REALTIME_POLICY_VIOLATION ;
2026-04-09 05:22:31 +00:00
if (( $code === 0 || $code >= 500 ) && ! $realtimeViolation && System :: getEnv ( '_APP_ENV' , 'production' ) !== 'development' ) {
2025-10-04 13:08:35 +00:00
$message = 'Error: Server Error' ;
}
2021-06-24 12:22:32 +00:00
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error' ,
'data' => [
2024-05-08 22:48:25 +00:00
'code' => $code ,
2025-10-04 13:08:35 +00:00
'message' => $message
2021-08-27 08:20:44 +00:00
]
2021-06-24 12:22:32 +00:00
];
2021-08-19 08:03:52 +00:00
2026-04-22 11:22:54 +00:00
$responsePayloadJson = json_encode ( $response );
$server -> send ([ $connection ], $responsePayloadJson );
$outboundBytes += \strlen ( $responsePayloadJson );
2024-05-08 22:48:25 +00:00
$server -> close ( $connection , $code );
2021-07-13 10:20:26 +00:00
2026-04-09 05:22:31 +00:00
if ( System :: getEnv ( '_APP_ENV' , 'production' ) === 'development' ) {
2021-08-30 14:42:31 +00:00
Console :: error ( '[Error] Connection Error' );
Console :: error ( '[Error] Code: ' . $response [ 'data' ][ 'code' ]);
Console :: error ( '[Error] Message: ' . $response [ 'data' ][ 'message' ]);
2021-08-19 08:03:52 +00:00
}
2026-04-22 11:22:54 +00:00
} finally {
Span :: add ( 'realtime.success' , $success );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.response_code' , $responseCode );
Span :: add ( 'realtime.subscription_mode' , $subscriptionMode );
Span :: add ( 'realtime.channel_count' , $channelCount );
Span :: add ( 'realtime.subscription_count' , $subscriptionCount );
Span :: add ( 'realtime.outbound_bytes' , $outboundBytes );
2026-04-22 11:27:51 +00:00
if ( ! empty ( $project ? -> getId ())) {
2026-05-13 10:03:28 +00:00
Span :: add ( 'project.id' , $project -> getId ());
2026-04-22 11:27:51 +00:00
}
if ( ! empty ( $logUser ? -> getId ())) {
2026-05-13 10:03:28 +00:00
Span :: add ( 'user.id' , $logUser -> getId ());
2026-04-22 11:27:51 +00:00
}
2026-04-22 11:22:54 +00:00
Span :: current () ? -> finish ();
2021-06-24 12:22:32 +00:00
}
});
2026-05-04 11:07:35 +00:00
$server -> onMessage ( function ( int $connection , string $message ) use ( $container , $server , $realtime , $containerId , $register , $presenceState , $messageDispatcher ) {
2026-02-26 14:47:34 +00:00
$project = null ;
$authorization = null ;
2026-04-22 11:14:20 +00:00
$projectId = $realtime -> connections [ $connection ][ 'projectId' ] ? ? null ;
$rawSize = \strlen ( $message );
$messageType = 'invalid' ;
$outboundBytes = 0 ;
$responseCode = 200 ;
$success = false ;
Span :: init ( 'realtime.message' );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.connection.id' , $connection );
Span :: add ( 'realtime.inbound_bytes' , $rawSize );
Span :: add ( 'realtime.container.id' , $containerId );
2026-04-22 11:14:20 +00:00
2021-08-26 16:02:38 +00:00
try {
2021-08-27 09:20:49 +00:00
$response = new Response ( new SwooleResponse ());
2026-01-14 15:08:00 +00:00
2026-05-15 07:47:14 +00:00
// Build a fresh Authorization per message. The connection-scoped instance is shared
// across coroutines, and `Authorization::skip()` toggles instance state — concurrent
// messages on the same connection (e.g. `authentication` + `presence` sent back-to-back)
// would interleave skip/restore and leak permission checks into supposedly-skipped lookups.
$authorization = new Authorization ();
$connectionAuthorization = $realtime -> connections [ $connection ][ 'authorization' ] ? ? null ;
if ( $connectionAuthorization !== null ) {
foreach ( $connectionAuthorization -> getRoles () as $role ) {
$authorization -> addRole ( $role );
}
2026-02-19 18:50:48 +00:00
}
2026-04-16 12:17:34 +00:00
$connectionRoles = $realtime -> connections [ $connection ][ 'roles' ] ? ? [];
foreach ( $connectionRoles as $role ) {
if ( $authorization -> hasRole ( $role )) {
continue ;
}
$authorization -> addRole ( $role );
}
2022-10-17 17:26:21 +00:00
$database = getConsoleDB ();
2026-01-14 15:08:00 +00:00
$database -> setAuthorization ( $authorization );
2022-07-06 10:53:30 +00:00
2026-03-09 18:31:44 +00:00
if ( ! empty ( $projectId ) && $projectId !== 'console' ) {
2026-04-17 18:13:44 +00:00
// Negative-cache race: if any prior code path queried projects:$projectId
// before this project existed (e.g. a router probe during connection
2026-05-19 06:24:06 +00:00
// setup), the Database's shared cache may hold an empty result. Try the
// cached read first, and only purge/retry when the first lookup reports
// not-found so the shared cache remains effective for normal traffic.
try {
$project = $authorization -> skip ( fn () => $database -> getDocument ( 'projects' , $projectId ));
} catch ( AppwriteException $e ) {
if ( $e -> getCode () !== 404 ) {
throw $e ;
}
2026-01-14 15:08:00 +00:00
2026-05-19 06:24:06 +00:00
$database -> purgeCachedDocument ( 'projects' , $projectId );
$project = $authorization -> skip ( fn () => $database -> getDocument ( 'projects' , $projectId ));
}
2026-01-14 15:08:00 +00:00
2025-11-28 14:22:59 +00:00
$database = getProjectDB ( $project );
2026-01-14 15:08:00 +00:00
$database -> setAuthorization ( $authorization );
2023-10-24 12:32:22 +00:00
} else {
$project = null ;
2022-07-06 10:53:30 +00:00
}
2022-06-20 07:37:00 +00:00
2026-05-18 05:12:11 +00:00
if ( $project !== null ) {
checkForProjectUsage ( $project );
}
2021-08-26 16:02:38 +00:00
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection .
*/
2025-12-03 19:37:41 +00:00
$timeLimit = getTimelimit ( 'url:{url},connection:{connection}' , 32 , 60 );
2021-12-01 13:19:41 +00:00
2021-08-26 16:02:38 +00:00
$timeLimit
-> setParam ( '{connection}' , $connection )
-> setParam ( '{container}' , $containerId );
$abuse = new Abuse ( $timeLimit );
2024-04-01 11:02:47 +00:00
if ( $abuse -> check () && System :: getEnv ( '_APP_OPTIONS_ABUSE' , 'enabled' ) === 'enabled' ) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_TOO_MANY_MESSAGES , 'Too many messages.' );
2021-08-26 16:02:38 +00:00
}
2026-03-03 13:18:37 +00:00
// Record realtime inbound bytes for this project
2026-05-04 10:29:25 +00:00
// not making this a part of the dispatcher as we need to get the inbound bytes as well even if we dont enter the dispatcher
2026-03-03 13:18:37 +00:00
if ( $project !== null && ! $project -> isEmpty ()) {
2026-03-05 12:40:09 +00:00
triggerStats ([
METRIC_REALTIME_INBOUND => $rawSize ,
], $project -> getId ());
2026-03-03 13:18:37 +00:00
}
2021-08-26 16:02:38 +00:00
$message = json_decode ( $message , true );
if ( is_null ( $message ) || ( ! array_key_exists ( 'type' , $message ) && ! array_key_exists ( 'data' , $message ))) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Message format is not valid.' );
2021-08-26 16:02:38 +00:00
}
2026-04-22 11:43:35 +00:00
$messageType = $message [ 'type' ] ? ? 'invalid' ;
2026-04-22 12:14:18 +00:00
if ( ! \is_scalar ( $messageType )) {
2026-04-22 11:31:08 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Message type is not valid.' );
}
2026-05-04 10:29:25 +00:00
// Child of the global container: per-message values like $connection and $project
// live on this scope so concurrent message coroutines don't clobber each other,
2026-05-13 08:34:41 +00:00
// while globally-registered services (pools, ...) remain reachable via the parent.
2026-05-04 10:29:25 +00:00
$messageContainer = new Container ( $container );
2026-05-07 06:18:45 +00:00
$messageContainer -> set ( 'connectionId' , fn () => $connection );
2026-05-04 05:48:22 +00:00
$messageContainer -> set ( 'server' , fn () => $server );
$messageContainer -> set ( 'realtime' , fn () => $realtime );
$messageContainer -> set ( 'register' , fn () => $register );
$messageContainer -> set ( 'response' , fn () => $response );
$messageContainer -> set ( 'presenceState' , fn () => $presenceState );
$messageContainer -> set ( 'database' , fn () => $database );
$messageContainer -> set ( 'authorization' , fn () => $authorization );
$messageContainer -> set ( 'project' , fn () => $project );
$messageContainer -> set ( 'projectId' , fn () => $projectId );
2026-05-14 06:18:41 +00:00
$messageContainer -> set ( 'queueForEvents' , fn () => getQueueForEvents ());
$messageContainer -> set ( 'queueForRealtime' , fn () => getQueueForRealtime ());
2026-03-05 12:40:09 +00:00
2026-05-04 11:07:35 +00:00
$responsePayload = $messageDispatcher -> dispatch ( $messageContainer , $message );
if ( $responsePayload !== null ) {
$responseJson = json_encode ( $responsePayload );
if ( $responseJson === false ) {
throw new \RuntimeException (
'Failed to encode realtime response payload: ' . json_last_error_msg ()
);
}
$server -> send ([ $connection ], $responseJson );
$bytes = \strlen ( $responseJson );
$outboundBytes += $bytes ;
if ( $project !== null && ! $project -> isEmpty ()) {
triggerStats ([ METRIC_REALTIME_OUTBOUND => $bytes ], $project -> getId ());
}
}
2026-04-10 05:34:44 +00:00
2026-04-22 11:14:20 +00:00
$success = true ;
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-05-18 16:19:13 +00:00
Span :: error ( $th );
// Convert known Utopia DB exceptions to AppwriteException so isPublishable()
// suppresses expected client errors (permission denied, query timeout) from Sentry.
if ( $th instanceof AuthorizationException ) {
$th = new AppwriteException ( AppwriteException :: USER_UNAUTHORIZED , previous : $th );
} elseif ( $th instanceof TimeoutException ) {
$th = new AppwriteException ( AppwriteException :: DATABASE_TIMEOUT , previous : $th );
}
2026-02-26 15:40:32 +00:00
logError ( $th , 'realtimeMessage' , project : $project , authorization : $authorization );
2025-10-04 13:08:35 +00:00
$code = $th -> getCode ();
2025-10-06 04:27:54 +00:00
if ( ! is_int ( $code )) {
$code = 500 ;
}
2026-04-22 11:14:20 +00:00
$responseCode = $code ;
2025-10-06 04:27:54 +00:00
2025-10-04 13:08:35 +00:00
$message = $th -> getMessage ();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
2026-04-09 05:22:31 +00:00
if (( $code === 0 || $code >= 500 ) && System :: getEnv ( '_APP_ENV' , 'production' ) !== 'development' ) {
2025-10-04 13:08:35 +00:00
$message = 'Error: Server Error' ;
}
2021-08-26 16:02:38 +00:00
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error' ,
'data' => [
2025-10-04 13:08:35 +00:00
'code' => $code ,
'message' => $message
2021-08-27 08:20:44 +00:00
]
2021-08-26 16:02:38 +00:00
];
2026-04-22 11:14:20 +00:00
$responsePayloadJson = json_encode ( $response );
$server -> send ([ $connection ], $responsePayloadJson );
$outboundBytes += \strlen ( $responsePayloadJson );
2021-08-26 16:02:38 +00:00
if ( $th -> getCode () === 1008 ) {
$server -> close ( $connection , $th -> getCode ());
}
2026-04-22 11:14:20 +00:00
} finally {
Span :: add ( 'realtime.success' , $success );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.response_code' , $responseCode );
Span :: add ( 'realtime.outbound_bytes' , $outboundBytes );
2026-05-13 10:03:28 +00:00
Span :: add ( 'project.id' , $project ? -> getId () ? ? $projectId );
Span :: add ( 'user.id' , $realtime -> connections [ $connection ][ 'userId' ] ? ? null );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.message_type' , $messageType );
2026-04-22 11:14:20 +00:00
Span :: current () ? -> finish ();
2021-08-26 16:02:38 +00:00
}
2021-06-24 12:22:32 +00:00
});
2026-05-14 06:18:41 +00:00
$server -> onClose ( function ( int $connection ) use ( $realtime , $stats , $register , $container , $presenceState ) {
2026-04-22 10:12:17 +00:00
$projectId = null ;
$userId = null ;
2026-04-22 11:22:54 +00:00
$subscriptionsBeforeClose = 0 ;
$success = false ;
Span :: init ( 'realtime.close' );
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.connection.id' , $connection );
2026-04-22 10:12:17 +00:00
if ( array_key_exists ( $connection , $realtime -> connections )) {
$projectId = $realtime -> connections [ $connection ][ 'projectId' ] ? ? null ;
$userId = $realtime -> connections [ $connection ][ 'userId' ] ? ? null ;
}
2026-03-17 19:51:47 +00:00
try {
if ( array_key_exists ( $connection , $realtime -> connections )) {
$stats -> decr ( $realtime -> connections [ $connection ][ 'projectId' ], 'connectionsTotal' );
$register -> get ( 'telemetry.connectionCounter' ) -> add ( - 1 );
2026-04-20 06:22:43 +00:00
$register -> get ( 'telemetry.workerClientCounter' ) -> add ( - 1 , $register -> get ( 'telemetry.workerAttributes' ));
$subscriptionsBeforeClose = \count ( $realtime -> getSubscriptionMetadata ( $connection ));
if ( $subscriptionsBeforeClose > 0 ) {
$register -> get ( 'telemetry.workerSubscriptionCounter' ) -> add ( - $subscriptionsBeforeClose , $register -> get ( 'telemetry.workerAttributes' ));
}
2026-03-03 13:18:37 +00:00
2026-03-17 19:51:47 +00:00
$projectId = $realtime -> connections [ $connection ][ 'projectId' ];
2026-05-08 06:39:12 +00:00
/** @var array<string, Document> $presencesById */
$presencesById = $realtime -> connections [ $connection ][ 'presences' ] ? ? [];
2026-04-15 08:08:34 +00:00
if (
2026-05-08 06:39:12 +00:00
! empty ( $presencesById )
2026-04-15 08:08:34 +00:00
&& $projectId !== 'console'
) {
2026-05-14 06:18:41 +00:00
go ( function () use ( $presencesById , $projectId , $userId , $container , $presenceState ) : void {
2026-05-08 06:39:12 +00:00
// Fresh span: the parent realtime.close span finishes before this coroutine
Span :: init ( 'realtime.close.presenceCleanup' );
Span :: add ( 'realtime.projectId' , $projectId );
Span :: add ( 'realtime.presenceCount' , \count ( $presencesById ));
2026-04-15 08:08:34 +00:00
2026-05-08 06:39:12 +00:00
try {
2026-05-13 13:22:14 +00:00
$dbForPlatform = getConsoleDB ();
$project = $dbForPlatform -> getAuthorization () -> skip ( fn () => $dbForPlatform -> getDocument ( 'projects' , $projectId ));
2026-05-08 06:39:12 +00:00
if ( $project -> isEmpty ()) {
return ;
}
2026-04-15 08:08:34 +00:00
2026-05-08 06:39:12 +00:00
$presenceIds = \array_keys ( $presencesById );
$presences = \array_values ( $presencesById );
2026-04-15 08:08:34 +00:00
$dbForProject = getProjectDB ( $project );
2026-05-08 06:39:12 +00:00
2026-05-13 13:22:14 +00:00
$user = new User ([]);
if ( ! empty ( $userId )) {
try {
$fetched = $dbForProject -> getAuthorization () -> skip (
fn () => $dbForProject -> getDocument ( 'users' , $userId )
);
if ( ! $fetched -> isEmpty ()) {
2026-05-13 13:43:59 +00:00
$user = new User ( $fetched -> getArrayCopy ());
2026-05-13 13:22:14 +00:00
}
} catch ( Throwable ) {
// Fall back to empty User if lookup fails.
}
}
2026-05-14 06:18:41 +00:00
/** @var UsagePublisher $publisherForUsage */
$publisherForUsage = $container -> get ( 'publisherForUsage' );
2026-05-18 06:02:37 +00:00
/** @var array<string, true> $deletedIds */
2026-05-18 05:32:21 +00:00
$deletedIds = [];
2026-04-17 06:34:55 +00:00
try {
2026-05-18 06:18:52 +00:00
$deletionCount = $dbForProject -> getAuthorization () -> skip (
function () use ( $dbForProject , $presenceIds , & $deletedIds ) : int {
return $dbForProject -> deleteDocuments (
'presenceLogs' ,
[ Query :: equal ( '$id' , $presenceIds )],
onNext : function ( Document $deleted ) use ( & $deletedIds ) : void {
$deletedIds [ $deleted -> getId ()] = true ;
},
);
}
);
2026-05-14 06:18:41 +00:00
$presenceState -> triggerUsage ( $publisherForUsage , $project , - $deletionCount );
2026-05-11 11:17:36 +00:00
} catch ( Throwable $th ) {
Span :: error ( $th );
logError ( $th , 'realtimeOnClosePresenceDeletion' , tags : [
'projectId' => $projectId ,
'presences' => \count ( $presences )
]);
2026-04-17 06:34:55 +00:00
}
2026-04-16 07:05:50 +00:00
2026-05-14 06:18:41 +00:00
$queueForEvents = getQueueForEvents ();
$queueForRealtime = getQueueForRealtime ();
2026-04-16 07:05:50 +00:00
foreach ( $presences as $presence ) {
2026-05-18 05:32:21 +00:00
if ( ! isset ( $deletedIds [ $presence -> getId ()])) {
continue ;
}
2026-04-16 07:05:50 +00:00
try {
2026-05-14 06:18:41 +00:00
$presenceState -> triggerEvent (
$queueForEvents ,
$queueForRealtime ,
$project ,
$user ,
'presences.[presenceId].delete' ,
$presence ,
);
2026-04-16 07:05:50 +00:00
} catch ( Throwable ) {
// Swallow errors to avoid breaking disconnect cleanup
}
}
2026-05-08 06:39:12 +00:00
} catch ( Throwable $th ) {
Span :: error ( $th );
logError ( $th , 'realtimeOnClosePresenceCleanup' , tags : [
'projectId' => $projectId ,
]);
} finally {
Span :: current () ? -> finish ();
2026-04-15 08:08:34 +00:00
}
2026-05-08 06:39:12 +00:00
});
2026-04-15 08:08:34 +00:00
}
2026-03-03 13:18:37 +00:00
2026-03-17 19:51:47 +00:00
triggerStats ([
METRIC_REALTIME_CONNECTIONS => - 1 ,
], $projectId );
}
2026-04-22 11:22:54 +00:00
$success = true ;
2026-03-17 19:51:47 +00:00
} catch ( \Throwable $th ) {
// Log only; do not rethrow. If we let this bubble, Swoole dumps full coroutine
// backtraces and unsubscribe() below would never run (connection cleanup would fail).
Console :: error ( 'Realtime onClose error: ' . $th -> getMessage ());
2026-04-22 11:22:54 +00:00
Span :: error ( $th );
} finally {
2026-04-22 11:43:35 +00:00
try {
$realtime -> unsubscribe ( $connection );
} catch ( \Throwable $th ) {
Console :: error ( 'Realtime onClose unsubscribe error: ' . $th -> getMessage ());
Span :: error ( $th );
}
2026-04-22 11:22:54 +00:00
Span :: add ( 'realtime.success' , $success );
2026-04-22 11:27:51 +00:00
if ( ! empty ( $projectId )) {
2026-05-13 10:03:28 +00:00
Span :: add ( 'project.id' , $projectId );
2026-04-22 11:27:51 +00:00
}
if ( ! empty ( $userId )) {
2026-05-13 10:03:28 +00:00
Span :: add ( 'user.id' , $userId );
2026-04-22 11:27:51 +00:00
}
2026-05-13 09:29:15 +00:00
Span :: add ( 'realtime.subscriptions_before_close' , $subscriptionsBeforeClose );
2026-04-22 11:37:53 +00:00
Span :: current () ? -> finish ();
2021-06-24 12:22:32 +00:00
}
2021-06-28 14:34:28 +00:00
2021-06-24 12:22:32 +00:00
Console :: info ( 'Connection close: ' . $connection );
});
2021-06-28 10:18:00 +00:00
$server -> start ();