2020-10-16 07:31:09 +00:00
< ? php
2023-10-24 12:32:22 +00:00
use Appwrite\Extend\Exception ;
2024-03-07 23:30:23 +00:00
use Appwrite\Extend\Exception as AppwriteException ;
2021-06-28 14:34:28 +00:00
use Appwrite\Messaging\Adapter\Realtime ;
2021-06-24 12:22:32 +00:00
use Appwrite\Network\Validator\Origin ;
2025-05-14 06:14:07 +00:00
use Appwrite\PubSub\Adapter\Pool as PubSubPool ;
2025-11-04 05:25:20 +00:00
use Appwrite\Utopia\Database\Documents\User ;
2023-10-24 12:32:22 +00:00
use Appwrite\Utopia\Request ;
2021-08-27 09:20:49 +00:00
use Appwrite\Utopia\Response ;
2025-12-03 16:02:22 +00:00
use Swoole\Coroutine ;
2021-06-24 12:22:32 +00:00
use Swoole\Http\Request as SwooleRequest ;
use Swoole\Http\Response as SwooleResponse ;
use Swoole\Runtime ;
use Swoole\Table ;
use Swoole\Timer ;
use Utopia\Abuse\Abuse ;
2024-12-20 14:44:50 +00:00
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis ;
2025-11-04 05:22:18 +00:00
use Utopia\Auth\Hashes\Sha ;
use Utopia\Auth\Proofs\Token ;
use Utopia\Auth\Store ;
2025-05-14 06:14:07 +00:00
use Utopia\Cache\Adapter\Pool as CachePool ;
2024-03-06 17:34:21 +00:00
use Utopia\Cache\Adapter\Sharding ;
use Utopia\Cache\Cache ;
use Utopia\Config\Config ;
2026-02-10 05:04:24 +00:00
use Utopia\Console ;
2025-05-14 06:14:07 +00:00
use Utopia\Database\Adapter\Pool as DatabasePool ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Database ;
2022-07-14 13:12:44 +00:00
use Utopia\Database\DateTime ;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Document ;
2026-01-16 12:48:24 +00:00
use Utopia\Database\Exception\Query as QueryException ;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Helpers\ID ;
use Utopia\Database\Helpers\Role ;
2021-10-07 15:35:17 +00:00
use Utopia\Database\Query ;
2026-02-19 18:50:48 +00:00
use Utopia\Database\Validator\Authorization ;
2024-05-06 05:33:36 +00:00
use Utopia\DSN\DSN ;
2026-02-10 05:04:24 +00:00
use Utopia\Http\Http ;
2024-03-06 17:34:21 +00:00
use Utopia\Logger\Log ;
2025-05-14 06:14:07 +00:00
use Utopia\Pools\Group ;
use Utopia\Registry\Registry ;
2024-04-01 11:08:46 +00:00
use Utopia\System\System ;
2024-11-12 09:45:00 +00:00
use Utopia\Telemetry\Adapter\None as NoTelemetry ;
2021-06-24 12:22:32 +00:00
use Utopia\WebSocket\Adapter ;
2024-03-06 17:34:21 +00:00
use Utopia\WebSocket\Server ;
2021-02-24 17:12:38 +00:00
2023-10-24 12:32:22 +00:00
/**
2025-05-14 06:14:07 +00:00
* @ var Registry $register
2023-10-24 12:32:22 +00:00
*/
2021-06-14 10:48:31 +00:00
require_once __DIR__ . '/init.php' ;
2020-10-16 07:31:09 +00:00
2021-06-24 12:22:32 +00:00
Runtime :: enableCoroutine ( SWOOLE_HOOK_ALL );
2020-10-18 11:51:16 +00:00
2026-03-17 19:51:47 +00:00
// Log uncaught exceptions in one line instead of relying on Swoole's full backtrace dump
set_exception_handler ( function ( \Throwable $e ) {
Console :: error ( sprintf (
'Realtime uncaught exception: %s in %s:%d' ,
$e -> getMessage (),
$e -> getFile (),
$e -> getLine ()
));
});
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getConsoleDB' )) {
2024-03-20 12:47:20 +00:00
function getConsoleDB () : Database
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-19 08:35:30 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'dbForPlatform' ])) {
return $ctx [ 'dbForPlatform' ];
2025-05-14 06:14:07 +00:00
}
2022-10-17 17:26:21 +00:00
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
/** @var Group $pools */
$pools = $register -> get ( 'pools' );
2022-10-17 17:26:21 +00:00
2025-05-14 06:14:07 +00:00
$adapter = new DatabasePool ( $pools -> get ( 'console' ));
$database = new Database ( $adapter , getCache ());
2024-03-20 12:47:20 +00:00
$database
2026-01-15 05:43:28 +00:00
-> setDatabase ( APP_DATABASE )
2024-03-20 12:47:20 +00:00
-> setNamespace ( '_console' )
-> setMetadata ( 'host' , \gethostname ())
-> setMetadata ( 'project' , '_console' );
2025-11-05 05:31:17 +00:00
$database -> setDocumentType ( 'users' , User :: class );
2025-12-05 03:55:34 +00:00
return $ctx [ 'dbForPlatform' ] = $database ;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 12:47:20 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getProjectDB' )) {
2024-03-20 12:47:20 +00:00
function getProjectDB ( Document $project ) : Database
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
if ( ! isset ( $ctx [ 'dbForProject' ])) {
$ctx [ 'dbForProject' ] = [];
2025-12-03 16:02:22 +00:00
}
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'dbForProject' ][ $project -> getSequence ()])) {
return $ctx [ 'dbForProject' ][ $project -> getSequence ()];
2025-05-14 06:14:07 +00:00
}
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
/** @var Group $pools */
2024-03-20 12:47:20 +00:00
$pools = $register -> get ( 'pools' );
2022-10-17 17:26:21 +00:00
2024-03-20 12:47:20 +00:00
if ( $project -> isEmpty () || $project -> getId () === 'console' ) {
return getConsoleDB ();
}
2022-10-17 17:26:21 +00:00
2024-05-06 06:13:41 +00:00
try {
$dsn = new DSN ( $project -> getAttribute ( 'database' ));
} catch ( \InvalidArgumentException ) {
2024-05-07 02:07:04 +00:00
// TODO: Temporary until all projects are using shared tables
2024-05-06 06:13:41 +00:00
$dsn = new DSN ( 'mysql://' . $project -> getAttribute ( 'database' ));
}
2024-05-06 05:33:36 +00:00
2025-05-14 06:14:07 +00:00
$adapter = new DatabasePool ( $pools -> get ( $dsn -> getHost ()));
2024-05-06 05:33:36 +00:00
$database = new Database ( $adapter , getCache ());
2022-10-17 17:26:21 +00:00
2024-11-12 09:27:24 +00:00
$sharedTables = \explode ( ',' , System :: getEnv ( '_APP_DATABASE_SHARED_TABLES' , '' ));
if ( \in_array ( $dsn -> getHost (), $sharedTables )) {
2024-04-30 07:40:47 +00:00
$database
-> setSharedTables ( true )
2026-03-13 12:16:14 +00:00
-> setTenant ( $project -> getSequence ())
2024-05-06 05:33:36 +00:00
-> setNamespace ( $dsn -> getParam ( 'namespace' ));
2024-04-30 07:40:47 +00:00
} else {
$database
-> setSharedTables ( false )
-> setTenant ( null )
2025-05-26 05:42:11 +00:00
-> setNamespace ( '_' . $project -> getSequence ());
2024-04-30 07:40:47 +00:00
}
2023-10-18 03:21:10 +00:00
2024-03-07 13:52:13 +00:00
$database
2026-01-15 06:08:25 +00:00
-> setDatabase ( APP_DATABASE )
2024-03-20 12:47:20 +00:00
-> setMetadata ( 'host' , \gethostname ())
-> setMetadata ( 'project' , $project -> getId ());
2022-10-17 17:26:21 +00:00
2025-11-05 05:31:17 +00:00
$database -> setDocumentType ( 'users' , User :: class );
2025-12-05 03:55:34 +00:00
return $ctx [ 'dbForProject' ][ $project -> getSequence ()] = $database ;
2024-03-20 12:47:20 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-03-20 13:14:23 +00:00
// Allows overriding
2024-10-08 07:54:40 +00:00
if ( ! function_exists ( 'getCache' )) {
2024-03-20 13:14:23 +00:00
function getCache () : Cache
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'cache' ])) {
return $ctx [ 'cache' ];
2025-05-14 06:14:07 +00:00
}
2025-12-03 16:02:22 +00:00
global $register ;
2025-05-14 06:14:07 +00:00
$pools = $register -> get ( 'pools' ); /** @var Group $pools */
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
$list = Config :: getParam ( 'pools-cache' , []);
$adapters = [];
2022-10-19 08:35:30 +00:00
2024-03-20 13:14:23 +00:00
foreach ( $list as $value ) {
2025-05-14 06:14:07 +00:00
$adapters [] = new CachePool ( $pools -> get ( $value ));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
2025-12-05 03:55:34 +00:00
return $ctx [ 'cache' ] = new Cache ( new Sharding ( $adapters ));
2024-03-20 13:14:23 +00:00
}
2022-10-17 17:26:21 +00:00
}
2024-12-20 14:44:50 +00:00
// Allows overriding
if ( ! function_exists ( 'getRedis' )) {
function getRedis () : \Redis
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'redis' ])) {
return $ctx [ 'redis' ];
2025-05-14 06:14:07 +00:00
}
2024-12-20 14:44:50 +00:00
$host = System :: getEnv ( '_APP_REDIS_HOST' , 'localhost' );
$port = System :: getEnv ( '_APP_REDIS_PORT' , 6379 );
$pass = System :: getEnv ( '_APP_REDIS_PASS' , '' );
$redis = new \Redis ();
@ $redis -> pconnect ( $host , ( int ) $port );
if ( $pass ) {
$redis -> auth ( $pass );
}
$redis -> setOption ( \Redis :: OPT_READ_TIMEOUT , - 1 );
2025-12-05 03:55:34 +00:00
return $ctx [ 'redis' ] = $redis ;
2024-12-20 14:44:50 +00:00
}
}
if ( ! function_exists ( 'getTimelimit' )) {
2025-12-05 03:55:34 +00:00
function getTimelimit ( string $key = " " , int $limit = 0 , int $seconds = 1 ) : TimeLimitRedis
2024-12-20 14:44:50 +00:00
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'timelimit' ])) {
return $ctx [ 'timelimit' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'timelimit' ] = new TimeLimitRedis ( $key , $limit , $seconds , getRedis ());
2024-12-20 14:44:50 +00:00
}
}
2024-09-09 08:52:37 +00:00
if ( ! function_exists ( 'getRealtime' )) {
function getRealtime () : Realtime
{
2025-12-03 16:02:22 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'realtime' ])) {
return $ctx [ 'realtime' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'realtime' ] = new Realtime ();
2024-09-09 08:52:37 +00:00
}
}
2024-11-12 09:45:00 +00:00
if ( ! function_exists ( 'getTelemetry' )) {
function getTelemetry ( int $workerId ) : Utopia\Telemetry\Adapter
{
2025-12-05 03:55:34 +00:00
$ctx = Coroutine :: getContext ();
2025-05-14 06:14:07 +00:00
2025-12-05 03:55:34 +00:00
if ( isset ( $ctx [ 'telemetry' ])) {
return $ctx [ 'telemetry' ];
2025-05-14 06:14:07 +00:00
}
2025-12-05 03:55:34 +00:00
return $ctx [ 'telemetry' ] = new NoTelemetry ();
2024-11-12 09:45:00 +00:00
}
}
2026-03-05 12:40:09 +00:00
if ( ! function_exists ( 'triggerStats' )) {
function triggerStats ( array $event , string $projectId ) : void
{
2026-03-10 06:45:25 +00:00
return ;
2026-03-05 12:40:09 +00:00
}
}
2024-09-09 08:52:37 +00:00
$realtime = getRealtime ();
2021-06-24 12:22:32 +00:00
2021-08-18 15:44:11 +00:00
/**
* Table for statistics across all workers .
*/
2021-06-24 12:22:32 +00:00
$stats = new Table ( 4096 , 1 );
$stats -> column ( 'projectId' , Table :: TYPE_STRING , 64 );
2021-08-19 10:14:19 +00:00
$stats -> column ( 'teamId' , Table :: TYPE_STRING , 64 );
2021-06-24 12:22:32 +00:00
$stats -> column ( 'connections' , Table :: TYPE_INT );
$stats -> column ( 'connectionsTotal' , Table :: TYPE_INT );
$stats -> column ( 'messages' , Table :: TYPE_INT );
$stats -> create ();
2021-07-01 10:31:48 +00:00
$containerId = uniqid ();
2021-10-07 15:35:17 +00:00
$statsDocument = null ;
2025-01-06 12:48:01 +00:00
$workerNumber = intval ( System :: getEnv ( '_APP_CPU_NUM' , swoole_cpu_num ())) * intval ( System :: getEnv ( '_APP_WORKER_PER_CORE' , 6 ));
2021-07-01 10:31:48 +00:00
2024-04-01 11:02:47 +00:00
$adapter = new Adapter\Swoole ( port : System :: getEnv ( 'PORT' , 80 ));
2022-02-28 11:05:11 +00:00
$adapter
-> setPackageMaxLength ( 64000 ) // Default maximum Package Size (64kb)
-> setWorkerNumber ( $workerNumber );
2021-06-24 12:22:32 +00:00
2021-08-18 15:44:11 +00:00
$server = new Server ( $adapter );
2021-06-28 14:34:28 +00:00
2026-02-26 14:47:34 +00:00
// Allows overriding
if ( ! function_exists ( 'logError' )) {
function logError ( Throwable $error , string $action , array $tags = [], ? Document $project = null , ? Document $user = null , ? Authorization $authorization = null ) : void
{
global $register ;
$logger = $register -> get ( 'realtimeLogger' );
2021-06-24 12:22:32 +00:00
2026-02-26 14:47:34 +00:00
if ( $logger && ! $error instanceof Exception ) {
$version = System :: getEnv ( '_APP_VERSION' , 'UNKNOWN' );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log = new Log ();
$log -> setNamespace ( " realtime " );
$log -> setServer ( System :: getEnv ( '_APP_LOGGING_SERVICE_IDENTIFIER' , \gethostname ()));
$log -> setVersion ( $version );
$log -> setType ( Log :: TYPE_ERROR );
$log -> setMessage ( $error -> getMessage ());
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> addTag ( 'code' , $error -> getCode ());
$log -> addTag ( 'verboseType' , get_class ( $error ));
2026-02-26 15:40:32 +00:00
$log -> addTag ( 'projectId' , $project ? -> getId () ? : 'n/a' );
$log -> addTag ( 'userId' , $user ? -> getId () ? : 'n/a' );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
foreach ( $tags as $key => $value ) {
$log -> addTag ( $key , $value ? : 'n/a' );
}
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> addExtra ( 'file' , $error -> getFile ());
$log -> addExtra ( 'line' , $error -> getLine ());
$log -> addExtra ( 'trace' , $error -> getTraceAsString ());
$log -> addExtra ( 'detailedTrace' , $error -> getTrace ());
$log -> addExtra ( 'roles' , $authorization ? -> getRoles () ? ? []);
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$log -> setAction ( $action );
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$isProduction = System :: getEnv ( '_APP_ENV' , 'development' ) === 'production' ;
$log -> setEnvironment ( $isProduction ? Log :: ENVIRONMENT_PRODUCTION : Log :: ENVIRONMENT_STAGING );
try {
$responseCode = $logger -> addLog ( $log );
Console :: info ( 'Error log pushed with status code: ' . $responseCode );
} catch ( Throwable $th ) {
Console :: error ( 'Error pushing log: ' . $th -> getMessage ());
}
2024-09-26 10:51:51 +00:00
}
2021-12-27 10:35:51 +00:00
2026-02-26 14:47:34 +00:00
Console :: error ( '[Error] Type: ' . get_class ( $error ));
Console :: error ( '[Error] Message: ' . $error -> getMessage ());
Console :: error ( '[Error] File: ' . $error -> getFile ());
Console :: error ( '[Error] Line: ' . $error -> getLine ());
}
}
2021-11-23 14:24:25 +00:00
2026-02-26 14:47:34 +00:00
$server -> error ( logError ( ... ));
2021-06-28 14:34:28 +00:00
2026-02-26 14:47:34 +00:00
$server -> onStart ( function () use ( $stats , $register , $containerId , & $statsDocument ) {
2022-01-03 14:06:40 +00:00
sleep ( 5 ); // wait for the initial database schema to be ready
2022-08-10 05:42:20 +00:00
Console :: success ( 'Server started successfully' );
2021-08-17 09:08:18 +00:00
2021-07-01 10:31:48 +00:00
/**
2021-08-18 15:44:11 +00:00
* Create document for this worker to share stats across Containers .
2021-07-01 10:31:48 +00:00
*/
2022-11-13 22:42:35 +00:00
go ( function () use ( $register , $containerId , & $statsDocument ) {
2022-06-21 16:00:23 +00:00
$attempts = 0 ;
2022-10-17 17:26:21 +00:00
$database = getConsoleDB ();
2022-10-19 08:35:30 +00:00
2022-06-21 16:00:23 +00:00
do {
try {
$attempts ++ ;
$document = new Document ([
2022-08-14 14:22:38 +00:00
'$id' => ID :: unique (),
2022-08-14 10:33:36 +00:00
'$collection' => ID :: custom ( 'realtime' ),
2022-08-02 09:19:15 +00:00
'$permissions' => [],
2022-06-21 16:00:23 +00:00
'container' => $containerId ,
2022-07-14 13:12:44 +00:00
'timestamp' => DateTime :: now (),
2022-06-21 16:00:23 +00:00
'value' => '{}'
]);
2026-01-14 15:08:00 +00:00
$statsDocument = $database -> getAuthorization () -> skip ( fn () => $database -> createDocument ( 'realtime' , $document ));
2022-06-21 16:00:23 +00:00
break ;
2023-10-24 12:32:22 +00:00
} catch ( Throwable ) {
2022-06-21 16:00:23 +00:00
Console :: warning ( " Collection not ready. Retrying connection ( { $attempts } )... " );
sleep ( DATABASE_RECONNECT_SLEEP );
}
} while ( true );
2021-07-01 10:31:48 +00:00
});
/**
* Save current connections to the Database every 5 seconds .
*/
2024-05-24 04:34:43 +00:00
// TODO: Remove this if check once it doesn't cause issues for cloud
if ( System :: getEnv ( '_APP_EDITION' , 'self-hosted' ) === 'self-hosted' ) {
2026-02-26 14:47:34 +00:00
Timer :: tick ( 5000 , function () use ( $register , $stats , & $statsDocument ) {
2024-05-24 04:34:43 +00:00
$payload = [];
foreach ( $stats as $projectId => $value ) {
$payload [ $projectId ] = $stats -> get ( $projectId , 'connectionsTotal' );
}
if ( empty ( $payload ) || empty ( $statsDocument )) {
return ;
}
try {
$database = getConsoleDB ();
$statsDocument
-> setAttribute ( 'timestamp' , DateTime :: now ())
-> setAttribute ( 'value' , json_encode ( $payload ));
2026-03-06 09:42:07 +00:00
$database -> getAuthorization () -> skip ( fn () => $database -> updateDocument ( 'realtime' , $statsDocument -> getId (), new Document ([
'timestamp' => $statsDocument -> getAttribute ( 'timestamp' ),
'value' => $statsDocument -> getAttribute ( 'value' )
])));
2024-05-24 04:34:43 +00:00
} catch ( Throwable $th ) {
2026-02-26 14:47:34 +00:00
logError ( $th , " updateWorkerDocument " );
2024-05-24 04:34:43 +00:00
}
});
}
2021-06-24 12:22:32 +00:00
});
2026-02-26 14:47:34 +00:00
$server -> onWorkerStart ( function ( int $workerId ) use ( $server , $register , $stats , $realtime ) {
2021-09-26 21:33:36 +00:00
Console :: success ( 'Worker ' . $workerId . ' started successfully' );
2021-06-24 12:22:32 +00:00
2024-11-12 09:45:00 +00:00
$telemetry = getTelemetry ( $workerId );
$register -> set ( 'telemetry' , fn () => $telemetry );
$register -> set ( 'telemetry.connectionCounter' , fn () => $telemetry -> createUpDownCounter ( 'realtime.server.open_connections' ));
$register -> set ( 'telemetry.connectionCreatedCounter' , fn () => $telemetry -> createCounter ( 'realtime.server.connection.created' ));
$register -> set ( 'telemetry.messageSentCounter' , fn () => $telemetry -> createCounter ( 'realtime.server.message.sent' ));
2021-06-24 12:22:32 +00:00
$attempts = 0 ;
$start = time ();
2026-02-26 14:47:34 +00:00
Timer :: tick ( 5000 , function () use ( $server , $register , $realtime , $stats ) {
2021-08-17 11:18:32 +00:00
/**
* Sending current connections to project channels on the console project every 5 seconds .
*/
2024-05-24 04:34:43 +00:00
// TODO: Remove this if check once it doesn't cause issues for cloud
if ( System :: getEnv ( '_APP_EDITION' , 'self-hosted' ) === 'self-hosted' ) {
if ( $realtime -> hasSubscriber ( 'console' , Role :: users () -> toString (), 'project' )) {
$database = getConsoleDB ();
$payload = [];
2026-01-14 15:08:00 +00:00
$list = $database -> getAuthorization () -> skip ( fn () => $database -> find ( 'realtime' , [
2024-05-24 04:34:43 +00:00
Query :: greaterThan ( 'timestamp' , DateTime :: addSeconds ( new \DateTime (), - 15 )),
]));
/**
* Aggregate stats across containers .
*/
foreach ( $list as $document ) {
foreach ( json_decode ( $document -> getAttribute ( 'value' )) as $projectId => $value ) {
if ( array_key_exists ( $projectId , $payload )) {
$payload [ $projectId ] += $value ;
} else {
$payload [ $projectId ] = $value ;
}
}
}
foreach ( $stats as $projectId => $value ) {
if ( ! array_key_exists ( $projectId , $payload )) {
continue ;
}
$event = [
'project' => 'console' ,
'roles' => [ 'team:' . $stats -> get ( $projectId , 'teamId' )],
'data' => [
'events' => [ 'stats.connections' ],
'channels' => [ 'project' ],
'timestamp' => DateTime :: formatTz ( DateTime :: now ()),
'payload' => [
$projectId => $payload [ $projectId ]
]
]
];
$server -> send ( $realtime -> getSubscribers ( $event ), json_encode ([
'type' => 'event' ,
'data' => $event [ 'data' ]
]));
}
}
}
2021-08-17 11:18:32 +00:00
/**
* Sending test message for SDK E2E tests every 5 seconds .
*/
2022-08-17 03:11:49 +00:00
if ( $realtime -> hasSubscriber ( 'console' , Role :: guests () -> toString (), 'tests' )) {
2021-08-17 11:18:32 +00:00
$payload = [ 'response' => 'WS:/v1/realtime:passed' ];
$event = [
2022-06-20 07:37:00 +00:00
'project' => 'console' ,
2022-08-17 03:11:49 +00:00
'roles' => [ Role :: guests () -> toString ()],
2021-08-17 11:18:32 +00:00
'data' => [
2022-05-17 12:09:04 +00:00
'events' => [ 'test.event' ],
2021-08-27 11:26:26 +00:00
'channels' => [ 'tests' ],
2022-10-18 17:10:50 +00:00
'timestamp' => DateTime :: formatTz ( DateTime :: now ()),
2021-08-27 11:26:26 +00:00
'payload' => $payload
2021-08-17 11:18:32 +00:00
]
];
2026-02-05 05:47:54 +00:00
$subscribers = $realtime -> getSubscribers ( $event );
2026-02-03 06:13:23 +00:00
2026-02-05 07:48:40 +00:00
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ( $subscribers as $id => $matched ) {
2026-02-05 07:48:40 +00:00
$key = implode ( ',' , array_keys ( $matched ));
$groups [ $key ][ 'ids' ][] = $id ;
$groups [ $key ][ 'subscriptions' ] = array_keys ( $matched );
}
foreach ( $groups as $group ) {
2026-02-02 16:20:23 +00:00
$data = $event [ 'data' ];
2026-02-05 07:48:40 +00:00
$data [ 'subscriptions' ] = $group [ 'subscriptions' ];
2026-02-02 16:20:23 +00:00
2026-02-05 07:48:40 +00:00
$server -> send ( $group [ 'ids' ], json_encode ([
2026-02-02 16:20:23 +00:00
'type' => 'event' ,
'data' => $data
]));
}
2021-06-24 12:22:32 +00:00
}
});
while ( $attempts < 300 ) {
try {
if ( $attempts > 0 ) {
Console :: error ( 'Pub/sub connection lost (lasted ' . ( time () - $start ) . ' seconds, worker: ' . $workerId . ' ) .
Attempting restart in 5 seconds ( attempt #' . $attempts . ')');
sleep ( 5 ); // 5 sec delay between connection attempts
}
$start = time ();
2025-05-14 06:14:07 +00:00
$pubsub = new PubSubPool ( $register -> get ( 'pools' ) -> get ( 'pubsub' ));
2024-10-31 12:12:03 +00:00
if ( $pubsub -> ping ( true )) {
2021-06-24 12:22:32 +00:00
$attempts = 0 ;
Console :: success ( 'Pub/sub connection established (worker: ' . $workerId . ')' );
} else {
Console :: error ( 'Pub/sub failed (worker: ' . $workerId . ')' );
}
2025-11-27 16:56:33 +00:00
$pubsub -> subscribe ([ 'realtime' ], function ( mixed $redis , string $channel , string $payload ) use ( $server , $workerId , $stats , $register , $realtime ) {
2021-06-24 12:22:32 +00:00
$event = json_decode ( $payload , true );
if ( $event [ 'permissionsChanged' ] && isset ( $event [ 'userId' ])) {
2022-06-20 07:37:00 +00:00
$projectId = $event [ 'project' ];
2021-06-24 12:22:32 +00:00
$userId = $event [ 'userId' ];
2021-06-28 10:18:00 +00:00
2021-06-28 14:34:28 +00:00
if ( $realtime -> hasSubscriber ( $projectId , 'user:' . $userId )) {
$connection = array_key_first ( reset ( $realtime -> subscriptions [ $projectId ][ 'user:' . $userId ]));
2022-10-17 17:26:21 +00:00
$consoleDatabase = getConsoleDB ();
2026-01-14 15:08:00 +00:00
$project = $consoleDatabase -> getAuthorization () -> skip ( fn () => $consoleDatabase -> getDocument ( 'projects' , $projectId ));
2022-10-17 17:26:21 +00:00
$database = getProjectDB ( $project );
2021-06-28 10:18:00 +00:00
2026-03-29 03:04:34 +00:00
/** @var User $user */
$user = $database -> getDocument ( 'users' , $userId );
2025-11-19 03:21:06 +00:00
2026-01-14 15:08:00 +00:00
$roles = $user -> getRoles ( $database -> getAuthorization ());
2026-02-02 14:15:21 +00:00
$authorization = $realtime -> connections [ $connection ][ 'authorization' ] ? ? null ;
2026-02-03 06:13:23 +00:00
2026-02-05 05:47:54 +00:00
$meta = $realtime -> getSubscriptionMetadata ( $connection );
2021-06-28 10:18:00 +00:00
2024-07-19 00:06:14 +00:00
$realtime -> unsubscribe ( $connection );
2026-02-02 14:15:21 +00:00
2026-02-05 05:47:54 +00:00
foreach ( $meta as $subscriptionId => $subscription ) {
$queries = Query :: parseQueries ( $subscription [ 'queries' ] ? ? []);
2026-02-03 08:16:05 +00:00
$realtime -> subscribe (
$projectId ,
$connection ,
$subscriptionId ,
$roles ,
2026-02-05 05:47:54 +00:00
$subscription [ 'channels' ] ? ? [],
2026-02-03 08:16:05 +00:00
$queries
);
2026-02-02 14:15:21 +00:00
}
2026-02-02 16:20:23 +00:00
// Restore authorization after subscribe
2026-02-02 14:15:21 +00:00
if ( $authorization !== null ) {
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
}
2022-05-09 07:35:55 +00:00
}
2021-06-24 12:22:32 +00:00
}
2021-06-28 10:18:00 +00:00
2026-02-05 05:47:54 +00:00
$receivers = $realtime -> getSubscribers ( $event );
2021-06-28 10:18:00 +00:00
2026-02-04 05:30:22 +00:00
if ( Http :: isDevelopment () && ! empty ( $receivers )) {
2025-12-24 13:26:55 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Receivers: " . count ( $receivers ));
2026-02-05 05:47:54 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Connection IDs: " . json_encode ( array_keys ( $receivers )));
Console :: log ( " [Debug][Worker { $workerId } ] Matched: " . json_encode ( array_values ( $receivers )));
2025-12-24 13:26:55 +00:00
Console :: log ( " [Debug][Worker { $workerId } ] Event: " . $payload );
}
2021-06-24 12:22:32 +00:00
2026-02-05 07:48:40 +00:00
// Group connections by matched subscription IDs for batch sending
$groups = [];
2026-02-05 05:47:54 +00:00
foreach ( $receivers as $id => $matched ) {
2026-02-05 07:48:40 +00:00
$key = implode ( ',' , array_keys ( $matched ));
$groups [ $key ][ 'ids' ][] = $id ;
$groups [ $key ][ 'subscriptions' ] = array_keys ( $matched );
}
$total = 0 ;
2026-03-03 13:18:37 +00:00
$outboundBytes = 0 ;
2026-02-05 07:48:40 +00:00
foreach ( $groups as $group ) {
2026-01-28 13:10:30 +00:00
$data = $event [ 'data' ];
2026-02-05 07:48:40 +00:00
$data [ 'subscriptions' ] = $group [ 'subscriptions' ];
2026-02-05 05:47:54 +00:00
2026-03-03 13:18:37 +00:00
$payloadJson = json_encode ([
2026-02-05 05:47:54 +00:00
'type' => 'event' ,
'data' => $data
2026-03-03 13:18:37 +00:00
]);
$server -> send ( $group [ 'ids' ], $payloadJson );
$count = count ( $group [ 'ids' ]);
$total += $count ;
$outboundBytes += strlen ( $payloadJson ) * $count ;
2026-01-28 13:10:30 +00:00
}
2021-06-24 12:22:32 +00:00
2026-02-05 05:47:54 +00:00
if ( $total > 0 ) {
$register -> get ( 'telemetry.messageSentCounter' ) -> add ( $total );
$stats -> incr ( $event [ 'project' ], 'messages' , $total );
2026-03-03 13:18:37 +00:00
$projectId = $event [ 'project' ] ? ? null ;
if ( ! empty ( $projectId )) {
2026-03-05 12:40:09 +00:00
$metrics = [
METRIC_REALTIME_CONNECTIONS_MESSAGES_SENT => $total ,
];
2026-03-03 13:18:37 +00:00
2026-03-05 12:40:09 +00:00
if ( $outboundBytes > 0 ) {
$metrics [ METRIC_REALTIME_OUTBOUND ] = $outboundBytes ;
2026-03-03 13:18:37 +00:00
}
2026-03-05 12:40:09 +00:00
triggerStats ( $metrics , $projectId );
2026-03-03 13:18:37 +00:00
}
2021-06-24 12:22:32 +00:00
}
});
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-02-26 14:47:34 +00:00
logError ( $th , " pubSubConnection " );
2021-11-23 14:24:25 +00:00
2021-06-24 12:22:32 +00:00
Console :: error ( 'Pub/sub error: ' . $th -> getMessage ());
$attempts ++ ;
2022-04-04 06:30:07 +00:00
sleep ( DATABASE_RECONNECT_SLEEP );
2021-06-24 12:22:32 +00:00
continue ;
}
}
Console :: error ( 'Failed to restart pub/sub...' );
});
2026-02-26 14:47:34 +00:00
$server -> onOpen ( function ( int $connection , SwooleRequest $request ) use ( $server , $register , $stats , & $realtime ) {
2026-02-04 05:30:22 +00:00
$app = new Http ( 'UTC' );
2021-06-24 12:22:32 +00:00
$request = new Request ( $request );
2021-08-27 09:20:49 +00:00
$response = new Response ( new SwooleResponse ());
2021-06-24 12:22:32 +00:00
2021-06-29 16:22:10 +00:00
Console :: info ( " Connection open (user: { $connection } ) " );
2021-06-24 12:22:32 +00:00
2026-02-04 05:30:22 +00:00
Http :: setResource ( 'pools' , fn () => $register -> get ( 'pools' ));
Http :: setResource ( 'request' , fn () => $request );
Http :: setResource ( 'response' , fn () => $response );
2021-06-24 12:22:32 +00:00
2026-02-26 14:47:34 +00:00
$project = null ;
$logUser = null ;
$authorization = null ;
2021-06-24 12:22:32 +00:00
try {
2023-10-24 12:32:22 +00:00
/** @var Document $project */
2021-06-24 12:22:32 +00:00
$project = $app -> getResource ( 'project' );
2026-01-14 15:08:00 +00:00
$authorization = $app -> getResource ( 'authorization' );
2021-06-24 12:22:32 +00:00
/*
* Project Check
*/
if ( empty ( $project -> getId ())) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , 'Missing or unknown project ID' );
2021-06-24 12:22:32 +00:00
}
2026-03-16 02:15:26 +00:00
$timelimit = $app -> getResource ( 'timelimit' );
$user = $app -> getResource ( 'user' ); /** @var User $user */
$logUser = $user ;
2024-03-04 22:12:54 +00:00
if (
array_key_exists ( 'realtime' , $project -> getAttribute ( 'apis' , []))
&& ! $project -> getAttribute ( 'apis' , [])[ 'realtime' ]
2026-03-16 06:26:07 +00:00
&& ! ( $user -> isPrivileged ( $authorization -> getRoles ()) || $user -> isApp ( $authorization -> getRoles ()))
2024-03-04 22:12:54 +00:00
) {
throw new AppwriteException ( AppwriteException :: GENERAL_API_DISABLED );
}
2026-02-26 10:37:26 +00:00
$projectRegion = $project -> getAttribute ( 'region' , '' );
2026-02-26 11:00:09 +00:00
$currentRegion = System :: getEnv ( '_APP_REGION' , 'default' );
2026-02-26 10:37:26 +00:00
if ( ! empty ( $projectRegion ) && $projectRegion !== $currentRegion ) {
2026-02-26 11:55:15 +00:00
throw new AppwriteException ( AppwriteException :: GENERAL_ACCESS_FORBIDDEN , 'Project is not accessible in this region. Please make sure you are using the correct endpoint' );
2026-02-26 10:37:26 +00:00
}
2021-06-24 12:22:32 +00:00
/*
* Abuse Check
*
* Abuse limits are connecting 128 times per minute and ip address .
*/
2024-12-20 14:44:50 +00:00
$timelimit = $timelimit ( 'url:{url},ip:{ip}' , 128 , 60 );
$timelimit
2021-06-24 12:22:32 +00:00
-> setParam ( '{ip}' , $request -> getIP ())
-> setParam ( '{url}' , $request -> getURI ());
2024-12-20 14:44:50 +00:00
$abuse = new Abuse ( $timelimit );
2021-06-24 12:22:32 +00:00
2024-04-01 11:02:47 +00:00
if ( System :: getEnv ( '_APP_OPTIONS_ABUSE' , 'enabled' ) === 'enabled' && $abuse -> check ()) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_TOO_MANY_MESSAGES , 'Too many requests' );
2021-06-24 12:22:32 +00:00
}
2026-03-06 10:07:30 +00:00
$rawSize = $request -> getSize ();
2026-03-05 12:40:09 +00:00
2026-03-06 10:07:30 +00:00
triggerStats ([
METRIC_REALTIME_INBOUND => $rawSize ,
], $project -> getId ());
2026-03-05 12:40:09 +00:00
2021-06-24 12:22:32 +00:00
/*
* Validate Client Domain - Check to avoid CSRF attack .
* Adding Appwrite API domains to allow XDOMAIN communication .
* Skip this check for non - web platforms which are not required to send an origin header .
*/
$origin = $request -> getOrigin ();
2025-12-07 20:29:45 +00:00
$originValidator = $app -> getResource ( 'originValidator' );
2021-06-24 12:22:32 +00:00
2025-06-30 14:18:05 +00:00
if ( ! empty ( $origin ) && ! $originValidator -> isValid ( $origin ) && $project -> getId () !== 'console' ) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , $originValidator -> getDescription ());
2021-06-24 12:22:32 +00:00
}
2026-01-14 15:08:00 +00:00
$roles = $user -> getRoles ( $authorization );
2021-06-24 12:22:32 +00:00
2021-07-13 15:18:02 +00:00
$channels = Realtime :: convertChannels ( $request -> getQuery ( 'channels' , []), $user -> getId ());
2021-06-24 12:22:32 +00:00
/**
* Channels Check
*/
if ( empty ( $channels )) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , 'Missing channels' );
2021-06-24 12:22:32 +00:00
}
2026-02-05 05:47:54 +00:00
$names = array_keys ( $channels );
2026-02-03 06:13:23 +00:00
2026-02-02 16:20:23 +00:00
try {
2026-02-05 05:47:54 +00:00
$subscriptions = Realtime :: constructSubscriptions (
$names ,
2026-02-03 06:13:23 +00:00
fn ( $channel ) => $request -> getQuery ( $channel , null )
2026-02-02 16:20:23 +00:00
);
} catch ( QueryException $e ) {
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , $e -> getMessage ());
}
2026-02-02 14:15:21 +00:00
2026-02-05 05:47:54 +00:00
$mapping = [];
foreach ( $subscriptions as $index => $subscription ) {
2026-02-02 16:20:23 +00:00
$subscriptionId = ID :: unique ();
$realtime -> subscribe (
$project -> getId (),
$connection ,
$subscriptionId ,
$roles ,
$subscription [ 'channels' ],
2026-02-05 05:47:54 +00:00
$subscription [ 'queries' ]
2026-02-02 16:20:23 +00:00
);
2026-02-05 05:47:54 +00:00
$mapping [ $index ] = $subscriptionId ;
2026-02-02 14:15:21 +00:00
}
2025-12-03 16:27:56 +00:00
2026-01-14 15:08:00 +00:00
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
2022-07-25 12:37:29 +00:00
$user = empty ( $user -> getId ()) ? null : $response -> output ( $user , Response :: MODEL_ACCOUNT );
2021-08-27 09:20:49 +00:00
2026-03-05 12:40:09 +00:00
$connectedPayloadJson = json_encode ([
2021-08-27 09:20:49 +00:00
'type' => 'connected' ,
'data' => [
2026-02-05 05:47:54 +00:00
'channels' => $names ,
'subscriptions' => $mapping ,
2021-08-27 09:20:49 +00:00
'user' => $user
]
2026-03-05 12:40:09 +00:00
]);
$server -> send ([ $connection ], $connectedPayloadJson );
2021-06-24 12:22:32 +00:00
2024-11-12 09:45:00 +00:00
$register -> get ( 'telemetry.connectionCounter' ) -> add ( 1 );
$register -> get ( 'telemetry.connectionCreatedCounter' ) -> add ( 1 );
2021-08-19 10:14:19 +00:00
$stats -> set ( $project -> getId (), [
2022-08-15 11:24:31 +00:00
'projectId' => $project -> getId (),
2021-08-19 10:14:19 +00:00
'teamId' => $project -> getAttribute ( 'teamId' )
]);
2021-06-24 12:22:32 +00:00
$stats -> incr ( $project -> getId (), 'connections' );
$stats -> incr ( $project -> getId (), 'connectionsTotal' );
2026-03-03 13:18:37 +00:00
2026-03-05 12:40:09 +00:00
$connectedOutboundBytes = \strlen ( $connectedPayloadJson );
triggerStats ([ METRIC_REALTIME_CONNECTIONS => 1 , METRIC_REALTIME_OUTBOUND => $connectedOutboundBytes ], $project -> getId ());
2026-03-03 13:18:37 +00:00
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-02-26 15:40:32 +00:00
logError ( $th , 'realtime' , project : $project , user : $logUser , authorization : $authorization );
2021-11-23 14:24:25 +00:00
2024-05-08 22:48:25 +00:00
// Handle SQL error code is 'HY000'
$code = $th -> getCode ();
2026-02-26 10:37:26 +00:00
if ( ! \is_int ( $code )) {
2024-05-08 22:48:25 +00:00
$code = 500 ;
}
2025-10-04 13:08:35 +00:00
$message = $th -> getMessage ();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
2026-02-09 07:37:11 +00:00
$realtimeViolation = $th instanceof AppwriteException && $th -> getType () === AppwriteException :: REALTIME_POLICY_VIOLATION ;
if (( $code === 0 || $code >= 500 ) && ! $realtimeViolation && ! Http :: isDevelopment ()) {
2025-10-04 13:08:35 +00:00
$message = 'Error: Server Error' ;
}
2021-06-24 12:22:32 +00:00
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error' ,
'data' => [
2024-05-08 22:48:25 +00:00
'code' => $code ,
2025-10-04 13:08:35 +00:00
'message' => $message
2021-08-27 08:20:44 +00:00
]
2021-06-24 12:22:32 +00:00
];
2021-08-19 08:03:52 +00:00
2021-06-24 12:22:32 +00:00
$server -> send ([ $connection ], json_encode ( $response ));
2024-05-08 22:48:25 +00:00
$server -> close ( $connection , $code );
2021-07-13 10:20:26 +00:00
2026-02-04 05:30:22 +00:00
if ( Http :: isDevelopment ()) {
2021-08-30 14:42:31 +00:00
Console :: error ( '[Error] Connection Error' );
Console :: error ( '[Error] Code: ' . $response [ 'data' ][ 'code' ]);
Console :: error ( '[Error] Message: ' . $response [ 'data' ][ 'message' ]);
2021-08-19 08:03:52 +00:00
}
2021-06-24 12:22:32 +00:00
}
});
2026-02-26 14:47:34 +00:00
$server -> onMessage ( function ( int $connection , string $message ) use ( $server , $register , $realtime , $containerId ) {
$project = null ;
$authorization = null ;
2021-08-26 16:02:38 +00:00
try {
2026-03-03 13:18:37 +00:00
$rawSize = \strlen ( $message );
2021-08-27 09:20:49 +00:00
$response = new Response ( new SwooleResponse ());
2026-01-14 15:08:00 +00:00
$projectId = $realtime -> connections [ $connection ][ 'projectId' ] ? ? null ;
// Get authorization from connection (stored during onOpen)
$authorization = $realtime -> connections [ $connection ][ 'authorization' ] ? ? null ;
2026-02-19 18:50:48 +00:00
if ( $authorization === null ) {
$authorization = new Authorization ( '' );
}
2026-01-14 15:08:00 +00:00
2022-10-17 17:26:21 +00:00
$database = getConsoleDB ();
2026-01-14 15:08:00 +00:00
$database -> setAuthorization ( $authorization );
2022-07-06 10:53:30 +00:00
2026-03-09 18:31:44 +00:00
if ( ! empty ( $projectId ) && $projectId !== 'console' ) {
2026-01-14 15:08:00 +00:00
$project = $authorization -> skip ( fn () => $database -> getDocument ( 'projects' , $projectId ));
2025-11-28 14:22:59 +00:00
$database = getProjectDB ( $project );
2026-01-14 15:08:00 +00:00
$database -> setAuthorization ( $authorization );
2023-10-24 12:32:22 +00:00
} else {
$project = null ;
2022-07-06 10:53:30 +00:00
}
2022-06-20 07:37:00 +00:00
2021-08-26 16:02:38 +00:00
/*
* Abuse Check
*
* Abuse limits are sending 32 times per minute and connection .
*/
2025-12-03 19:37:41 +00:00
$timeLimit = getTimelimit ( 'url:{url},connection:{connection}' , 32 , 60 );
2021-12-01 13:19:41 +00:00
2021-08-26 16:02:38 +00:00
$timeLimit
-> setParam ( '{connection}' , $connection )
-> setParam ( '{container}' , $containerId );
$abuse = new Abuse ( $timeLimit );
2024-04-01 11:02:47 +00:00
if ( $abuse -> check () && System :: getEnv ( '_APP_OPTIONS_ABUSE' , 'enabled' ) === 'enabled' ) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_TOO_MANY_MESSAGES , 'Too many messages.' );
2021-08-26 16:02:38 +00:00
}
2026-03-03 13:18:37 +00:00
// Record realtime inbound bytes for this project
if ( $project !== null && ! $project -> isEmpty ()) {
2026-03-05 12:40:09 +00:00
triggerStats ([
METRIC_REALTIME_INBOUND => $rawSize ,
], $project -> getId ());
2026-03-03 13:18:37 +00:00
}
2021-08-26 16:02:38 +00:00
$message = json_decode ( $message , true );
if ( is_null ( $message ) || ( ! array_key_exists ( 'type' , $message ) && ! array_key_exists ( 'data' , $message ))) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Message format is not valid.' );
2021-08-26 16:02:38 +00:00
}
2026-03-09 18:31:44 +00:00
// Ping does not require project context; other messages do (e.g. after unsubscribe during auth)
if ( empty ( $projectId ) && ( $message [ 'type' ] ? ? '' ) !== 'ping' ) {
throw new Exception ( Exception :: REALTIME_POLICY_VIOLATION , 'Missing project context. Reconnect to the project first.' );
}
2021-08-26 16:02:38 +00:00
switch ( $message [ 'type' ]) {
2024-11-07 11:05:37 +00:00
case 'ping' :
2026-03-05 12:40:09 +00:00
$pongPayloadJson = json_encode ([
2024-11-07 11:05:37 +00:00
'type' => 'pong'
2026-03-05 12:40:09 +00:00
]);
$server -> send ([ $connection ], $pongPayloadJson );
if ( $project !== null && ! $project -> isEmpty ()) {
$pongOutboundBytes = \strlen ( $pongPayloadJson );
if ( $pongOutboundBytes > 0 ) {
triggerStats ([
METRIC_REALTIME_OUTBOUND => $pongOutboundBytes ,
], $project -> getId ());
}
}
2024-11-07 11:05:37 +00:00
break ;
2021-08-26 16:02:38 +00:00
case 'authentication' :
if ( ! array_key_exists ( 'session' , $message [ 'data' ])) {
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Payload is not valid.' );
2021-08-26 16:02:38 +00:00
}
2025-11-04 05:22:18 +00:00
$store = new Store ();
2025-03-18 21:12:03 +00:00
2025-11-04 05:22:18 +00:00
$store -> decode ( $message [ 'data' ][ 'session' ]);
2025-03-18 21:12:03 +00:00
2026-03-29 03:04:34 +00:00
/** @var User $user */
$user = $database -> getDocument ( 'users' , $store -> getProperty ( 'id' , '' ));
2025-11-04 05:22:18 +00:00
/**
* TODO :
* Moving forward , we should try to use our dependency injection container
* to inject the proof for token .
* This way we will have one source of truth for the proof for token .
*/
$proofForToken = new Token ();
$proofForToken -> setHash ( new Sha ());
2021-08-26 16:02:38 +00:00
if (
empty ( $user -> getId ()) // Check a document has been found in the DB
2025-11-04 05:22:18 +00:00
|| ! $user -> sessionVerify ( $store -> getProperty ( 'secret' , '' ), $proofForToken ) // Validate user has valid login token
2021-08-26 16:02:38 +00:00
) {
// cookie not valid
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Session is not valid.' );
2021-08-26 16:02:38 +00:00
}
2026-01-14 15:08:00 +00:00
$roles = $user -> getRoles ( $database -> getAuthorization ());
$authorization = $realtime -> connections [ $connection ][ 'authorization' ] ? ? null ;
2026-02-03 08:16:05 +00:00
$projectId = $realtime -> connections [ $connection ][ 'projectId' ] ? ? null ;
2026-02-05 05:47:54 +00:00
$meta = $realtime -> getSubscriptionMetadata ( $connection );
2026-02-03 08:16:05 +00:00
$realtime -> unsubscribe ( $connection );
if ( ! empty ( $projectId )) {
2026-02-05 05:47:54 +00:00
foreach ( $meta as $subscriptionId => $subscription ) {
$queries = Query :: parseQueries ( $subscription [ 'queries' ] ? ? []);
2026-02-03 08:16:05 +00:00
$realtime -> subscribe (
$projectId ,
$connection ,
$subscriptionId ,
$roles ,
2026-02-05 05:47:54 +00:00
$subscription [ 'channels' ] ? ? [],
2026-02-03 08:16:05 +00:00
$queries
);
}
2026-02-02 14:15:21 +00:00
}
2026-01-14 15:08:00 +00:00
if ( $authorization !== null ) {
$realtime -> connections [ $connection ][ 'authorization' ] = $authorization ;
}
2022-07-25 12:37:29 +00:00
$user = $response -> output ( $user , Response :: MODEL_ACCOUNT );
2026-03-05 12:40:09 +00:00
$authResponsePayloadJson = json_encode ([
2021-08-27 08:20:44 +00:00
'type' => 'response' ,
'data' => [
'to' => 'authentication' ,
2021-08-27 09:20:49 +00:00
'success' => true ,
'user' => $user
2021-08-27 08:20:44 +00:00
]
2026-03-05 12:40:09 +00:00
]);
$server -> send ([ $connection ], $authResponsePayloadJson );
if ( $project !== null && ! $project -> isEmpty ()) {
$authOutboundBytes = \strlen ( $authResponsePayloadJson );
if ( $authOutboundBytes > 0 ) {
triggerStats ([
METRIC_REALTIME_OUTBOUND => $authOutboundBytes ,
], $project -> getId ());
}
}
2021-08-27 08:20:44 +00:00
2021-08-26 16:02:38 +00:00
break ;
default :
2023-10-24 12:32:22 +00:00
throw new Exception ( Exception :: REALTIME_MESSAGE_FORMAT_INVALID , 'Message type is not valid.' );
2021-08-26 16:02:38 +00:00
}
2023-10-24 12:32:22 +00:00
} catch ( Throwable $th ) {
2026-02-26 15:40:32 +00:00
logError ( $th , 'realtimeMessage' , project : $project , authorization : $authorization );
2025-10-04 13:08:35 +00:00
$code = $th -> getCode ();
2025-10-06 04:27:54 +00:00
if ( ! is_int ( $code )) {
$code = 500 ;
}
2025-10-04 13:08:35 +00:00
$message = $th -> getMessage ();
2025-10-10 12:56:00 +00:00
// sanitize 0 && 5xx errors
2026-02-04 05:30:22 +00:00
if (( $code === 0 || $code >= 500 ) && ! Http :: isDevelopment ()) {
2025-10-04 13:08:35 +00:00
$message = 'Error: Server Error' ;
}
2021-08-26 16:02:38 +00:00
$response = [
2021-08-27 08:20:44 +00:00
'type' => 'error' ,
'data' => [
2025-10-04 13:08:35 +00:00
'code' => $code ,
'message' => $message
2021-08-27 08:20:44 +00:00
]
2021-08-26 16:02:38 +00:00
];
$server -> send ([ $connection ], json_encode ( $response ));
if ( $th -> getCode () === 1008 ) {
$server -> close ( $connection , $th -> getCode ());
}
}
2021-06-24 12:22:32 +00:00
});
2024-11-12 09:45:00 +00:00
$server -> onClose ( function ( int $connection ) use ( $realtime , $stats , $register ) {
2026-03-17 19:51:47 +00:00
try {
if ( array_key_exists ( $connection , $realtime -> connections )) {
$stats -> decr ( $realtime -> connections [ $connection ][ 'projectId' ], 'connectionsTotal' );
$register -> get ( 'telemetry.connectionCounter' ) -> add ( - 1 );
2026-03-03 13:18:37 +00:00
2026-03-17 19:51:47 +00:00
$projectId = $realtime -> connections [ $connection ][ 'projectId' ];
2026-03-03 13:18:37 +00:00
2026-03-17 19:51:47 +00:00
triggerStats ([
METRIC_REALTIME_CONNECTIONS => - 1 ,
], $projectId );
}
} catch ( \Throwable $th ) {
// Log only; do not rethrow. If we let this bubble, Swoole dumps full coroutine
// backtraces and unsubscribe() below would never run (connection cleanup would fail).
Console :: error ( 'Realtime onClose error: ' . $th -> getMessage ());
2021-06-24 12:22:32 +00:00
}
2021-06-28 14:34:28 +00:00
$realtime -> unsubscribe ( $connection );
2021-06-24 12:22:32 +00:00
Console :: info ( 'Connection close: ' . $connection );
});
2021-06-28 10:18:00 +00:00
$server -> start ();