diff --git a/app/realtime.php b/app/realtime.php index e7d8faf2c0..2e2024216a 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -32,19 +32,21 @@ use Utopia\System\System; use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Server; +global $global; + /** - * @var \Utopia\Registry\Registry $register + * @var \Utopia\Registry\Registry $global */ -require_once __DIR__ . '/init.php'; +require_once __DIR__ . '/init2.php'; Runtime::enableCoroutine(SWOOLE_HOOK_ALL); function getConsoleDB(Authorization $auth): Database { - global $register; + global $global; /** @var \Utopia\Pools\Group $pools */ - $pools = $register->get('pools'); + $pools = $global->get('pools'); $dbAdapter = $pools ->get('console') @@ -65,10 +67,10 @@ function getConsoleDB(Authorization $auth): Database function getProjectDB(Document $project, Authorization $auth): Database { - global $register; + global $global; /** @var \Utopia\Pools\Group $pools */ - $pools = $register->get('pools'); + $pools = $global->get('pools'); if ($project->isEmpty() || $project->getId() === 'console') { return getConsoleDB($auth); @@ -93,9 +95,9 @@ function getProjectDB(Document $project, Authorization $auth): Database function getCache(): Cache { - global $register; + global $global; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + $pools = $global->get('pools'); /** @var \Utopia\Pools\Group $pools */ $list = Config::getParam('pools-cache', []); $adapters = []; @@ -135,8 +137,8 @@ $adapter $server = new Server($adapter); -$logError = function (Throwable $error, string $action) use ($register) { - $logger = $register->get('logger'); +$logError = function (Throwable $error, string $action) use ($global) { + $logger = $global->get('logger'); if ($logger && !$error instanceof Exception) { $version = System::getEnv('_APP_VERSION', 'UNKNOWN'); @@ -173,7 +175,7 @@ $logError = function (Throwable $error, string $action) use ($register) { $server->error($logError); -$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) { +$server->onStart(function () use ($stats, $global, $containerId, &$statsDocument, $logError) { $auth = new Authorization(); sleep(5); // wait for the initial database schema to be ready @@ -182,7 +184,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume /** * Create document for this worker to share stats across Containers. */ - go(function () use ($register, $containerId, &$statsDocument, $auth) { + go(function () use ($global, $containerId, &$statsDocument, $auth) { $attempts = 0; $database = getConsoleDB($auth); @@ -206,13 +208,13 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); }); /** * Save current connections to the Database every 5 seconds. */ - Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError, $auth) { + Timer::tick(5000, function () use ($global, $stats, &$statsDocument, $logError, $auth) { $payload = []; foreach ($stats as $projectId => $value) { $payload[$projectId] = $stats->get($projectId, 'connectionsTotal'); @@ -233,12 +235,12 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume } catch (Throwable $th) { call_user_func($logError, $th, "updateWorkerDocument"); } finally { - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); } }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) { +$server->onWorkerStart(function (int $workerId) use ($server, $global, $stats, $realtime, $logError) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; @@ -246,7 +248,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $auth = new Authorization(); - Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError, $auth) { + Timer::tick(5000, function () use ($server, $global, $realtime, $stats, $logError, $auth) { /** * Sending current connections to project channels on the console project every 5 seconds. */ @@ -296,7 +298,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ])); } - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); } /** * Sending test message for SDK E2E tests every 5 seconds. @@ -329,9 +331,10 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Attempting restart in 5 seconds (attempt #' . $attempts . ')'); sleep(5); // 5 sec delay between connection attempts } + $start = time(); - $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ + $redis = $global->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -341,7 +344,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime, $auth) { + $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $global, $realtime, $auth) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { @@ -361,7 +364,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']); - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); } } @@ -393,14 +396,15 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, sleep(DATABASE_RECONNECT_SLEEP); continue; } finally { - $register->get('pools')->reclaim(); + //$global->get('pools')->reclaim(); + // TODO eldad add connections reclaim } } Console::error('Failed to restart pub/sub...'); }); -$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) { +$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $global, $stats, &$realtime, $logError) { $auth = new Authorization(); $http = new Http(new FPMServer(), 'UTC'); @@ -409,7 +413,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::info("Connection open (user: {$connection})"); - Http::setResource('pools', fn () => $register->get('pools')); + Http::setResource('pools', fn () => $global->get('pools')); Http::setResource('request', fn () => $request); Http::setResource('response', fn () => $response); @@ -515,11 +519,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Message: ' . $response['data']['message']); } } finally { - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); } }); -$server->onMessage(function (int $connection, string $message) use ($server, $register, $realtime, $containerId) { +$server->onMessage(function (int $connection, string $message) use ($server, $global, $realtime, $containerId) { $auth = new Authorization(); try { @@ -615,7 +619,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re $server->close($connection, $th->getCode()); } } finally { - $register->get('pools')->reclaim(); + $global->get('pools')->reclaim(); } });