diff --git a/app/http.php b/app/http.php index 5b32d8f134..05e989b330 100644 --- a/app/http.php +++ b/app/http.php @@ -323,7 +323,28 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo $swooleResponse->end(\json_encode($output)); } finally { - $pools->reclaim(); + $connectionForConsole = $app->getResource('connectionForConsole'); + $connectionForProject = $app->getResource('connectionForProject'); + $connectionForQueue = $app->getResource('connectionForQueue'); + $connectionsForCache = $app->getResource('connectionsForCache'); + + if (!is_null($connectionForConsole)) { + $connectionForConsole->reclaim(); + } + + if (!is_null($connectionForProject)) { + $connectionForProject->reclaim(); + } + + if (!is_null($connectionForQueue)) { + $connectionForQueue->reclaim(); + } + + if (!empty($connectionsForCache)) { + foreach ($connectionsForCache as $connection) { + $connection->reclaim(); + } + } } }); diff --git a/app/init.php b/app/init.php index e79811c42d..5844cafa5c 100644 --- a/app/init.php +++ b/app/init.php @@ -884,7 +884,13 @@ App::setResource('localeCodes', function () { // Queues App::setResource('queue', function (Group $pools) { - return $pools->get('queue')->pop()->getResource(); + $connection = $pools->get('queue')->pop(); + + App::setResource('connectionForQueue', function () use ($connection) { + return $connection; + }); + + return $connection->getResource(); }, ['pools']); App::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); @@ -1126,15 +1132,33 @@ App::setResource('console', function () { ]); }, []); +App::setResource('connectionForProject', function () { + return null; +}); +App::setResource('connectionForConsole', function () { + return null; +}); +App::setResource('connectionForQueue', function () { + return null; +}); +App::setResource('connectionsForCache', function () { + return []; +}); + App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } - $dbAdapter = $pools + $connection = $pools ->get($project->getAttribute('database')) - ->pop() - ->getResource(); + ->pop(); + + App::setResource('connectionForProject', function () use ($connection) { + return $connection; + }); + + $dbAdapter = $connection->getResource(); $database = new Database($dbAdapter, $cache); @@ -1148,11 +1172,15 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, }, ['pools', 'dbForConsole', 'cache', 'project']); App::setResource('dbForConsole', function (Group $pools, Cache $cache) { - $dbAdapter = $pools + $connection = $pools ->get('console') - ->pop() - ->getResource() - ; + ->pop(); + + App::setResource('connectionForConsole', function () use ($connection) { + return $connection; + }); + + $dbAdapter = $connection->getResource(); $database = new Database($dbAdapter, $cache); @@ -1187,15 +1215,18 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, return $database; } - $dbAdapter = $pools + $connection = $pools ->get($databaseName) - ->pop() - ->getResource(); + ->pop(); + + $dbAdapter = $connection->getResource(); + + App::setResource('connectionForProject', function () use ($connection) { + return $connection; + }, []); $database = new Database($dbAdapter, $cache); - $databases[$databaseName] = $database; - $database ->setNamespace('_' . $project->getInternalId()) ->setMetadata('host', \gethostname()) @@ -1211,15 +1242,21 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, App::setResource('cache', function (Group $pools) { $list = Config::getParam('pools-cache', []); $adapters = []; + $connections = []; foreach ($list as $value) { - $adapters[] = $pools + $connection = $pools ->get($value) - ->pop() - ->getResource() - ; + ->pop(); + + $connections[] = $connection; + $adapters[] = $connection->getResource(); } + App::setResource('connectionsForCache', function () use ($connections) { + return $connections; + }, []); + return new Cache(new Sharding($adapters)); }, ['pools']); diff --git a/app/realtime.php b/app/realtime.php index 9c6ae850e4..8a0b2f2c38 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -26,92 +26,151 @@ use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\Config\Config; use Utopia\Database\Database; +use Utopia\Pools\Group; +use Utopia\Registry\Registry; use Utopia\WebSocket\Server; use Utopia\WebSocket\Adapter; /** - * @var \Utopia\Registry\Registry $register + * @var Registry $register */ require_once __DIR__ . '/init.php'; -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(); // Allows overriding -if (!function_exists("getConsoleDB")) { - function getConsoleDB(): Database +if (!function_exists('getConsoleDB')) { + /** + * @return array{Database, callable} + * @throws Exception|\Exception + */ + function getConsoleDB(): array { global $register; - /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ $pools = $register->get('pools'); - $dbAdapter = $pools + $dbConnection = $pools ->get('console') - ->pop() - ->getResource() - ; + ->pop(); - $database = new Database($dbAdapter, getCache()); + $dbAdapter = $dbConnection->getResource(); + + [$cache, $reclaimCache] = getCache(); + + $database = new Database($dbAdapter, $cache); $database ->setNamespace('_console') ->setMetadata('host', \gethostname()) ->setMetadata('project', '_console'); - return $database; + return [$database, function () use ($dbConnection, $reclaimCache) { + $dbConnection->reclaim(); + $reclaimCache(); + }]; } } // Allows overriding -if (!function_exists("getProjectDB")) { - function getProjectDB(Document $project): Database +if (!function_exists('getProjectDB')) { + /** + * @param Document $project + * @return array{Database, callable} + * @throws Exception + */ + function getProjectDB(Document $project): array { global $register; - /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ $pools = $register->get('pools'); if ($project->isEmpty() || $project->getId() === 'console') { return getConsoleDB(); } - $dbAdapter = $pools + $dbConnection = $pools ->get($project->getAttribute('database')) - ->pop() - ->getResource() - ; + ->pop(); - $database = new Database($dbAdapter, getCache()); + $dbAdapter = $dbConnection->getResource(); + + [$cache, $reclaimCache] = getCache(); + + $database = new Database($dbAdapter, $cache); $database ->setNamespace('_' . $project->getInternalId()) ->setMetadata('host', \gethostname()) ->setMetadata('project', $project->getId()); - return $database; + return [$database, function () use ($dbConnection, $reclaimCache) { + $dbConnection->reclaim(); + $reclaimCache(); + }]; } } // Allows overriding -if (!function_exists("getCache")) { - function getCache(): Cache +if (!function_exists('getCache')) { + /** + * @return array{Cache, callable} + * @throws Exception|\Exception + */ + function getCache(): array { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); $list = Config::getParam('pools-cache', []); + + $connections = []; $adapters = []; foreach ($list as $value) { - $adapters[] = $pools + $connection = $pools ->get($value) - ->pop() - ->getResource() - ; + ->pop(); + + $connections[] = $connection; + $adapters[] = $connection->getResource(); } - return new Cache(new Sharding($adapters)); + $cache = new Cache(new Sharding($adapters)); + + return [$cache, function () use ($connections) { + foreach ($connections as $connection) { + $connection->reclaim(); + } + }]; + } +} + +if (!function_exists('getPubSub')) { + /** + * @return array{Redis, callable} + * @throws Exception|\Exception + */ + function getPubSub(): array + { + global $register; + + /** @var Group $pools */ + $pools = $register->get('pools'); + + $connection = $pools + ->get('pubsub') + ->pop(); + + $redis = $connection->getResource(); + + return [$redis, function () use ($connection) { + $connection->reclaim(); + }]; } } @@ -133,13 +192,17 @@ $statsDocument = null; $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); $adapter = new Adapter\Swoole(port: App::getEnv('PORT', 80)); + $adapter ->setPackageMaxLength(64000) // Default maximum Package Size (64kb) ->setWorkerNumber($workerNumber); $server = new Server($adapter); -$logError = function (Throwable $error, string $action) use ($register) { +function logError(Throwable $error, string $action): void +{ + global $register; + $logger = $register->get('logger'); if ($logger && !$error instanceof Exception) { @@ -173,11 +236,13 @@ $logError = function (Throwable $error, string $action) use ($register) { Console::error('[Error] Message: ' . $error->getMessage()); Console::error('[Error] File: ' . $error->getFile()); Console::error('[Error] Line: ' . $error->getLine()); -}; +} -$server->error($logError); +$server->error(function (Throwable $th, string $method) { + logError($th, $method); +}); -$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument, $logError) { +$server->onStart(function () use ($stats, $register, $containerId, &$statsDocument) { sleep(5); // wait for the initial database schema to be ready Console::success('Server started successfully'); @@ -186,11 +251,17 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume */ go(function () use ($register, $containerId, &$statsDocument) { $attempts = 0; - $database = getConsoleDB(); do { try { + /** + * @var Database $database + * @var callable $reclaim + */ + [$database, $reclaim] = getConsoleDB(); + $attempts++; + $document = new Document([ '$id' => ID::unique(), '$collection' => ID::custom('realtime'), @@ -200,102 +271,131 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume 'value' => '{}' ]); - $statsDocument = Authorization::skip(fn () => $database->createDocument('realtime', $document)); + $statsDocument = Authorization::skip(function () use ($database, $document) { + return $database->createDocument('realtime', $document); + }); + break; } catch (Throwable) { Console::warning("Collection not ready. Retrying connection ({$attempts})..."); sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $register->get('pools')->reclaim(); + + if (isset($reclaim)) { + $reclaim(); + } }); /** * Save current connections to the Database every 5 seconds. */ - // Timer::tick(5000, function () use ($register, $stats, &$statsDocument, $logError) { - // $payload = []; - // foreach ($stats as $projectId => $value) { - // $payload[$projectId] = $stats->get($projectId, 'connectionsTotal'); - // } - // if (empty($payload) || empty($statsDocument)) { - // return; - // } + Timer::tick(5000, function () use ($register, $stats, &$statsDocument) { + $payload = []; - // try { - // $database = getConsoleDB(); + foreach ($stats as $projectId => $value) { + $payload[$projectId] = $stats->get($projectId, 'connectionsTotal'); + } - // $statsDocument - // ->setAttribute('timestamp', DateTime::now()) - // ->setAttribute('value', json_encode($payload)); + if (empty($payload) || empty($statsDocument)) { + return; + } - // Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument)); - // } catch (Throwable $th) { - // call_user_func($logError, $th, "updateWorkerDocument"); - // } finally { - // $register->get('pools')->reclaim(); - // } - // }); + try { + /** + * @var Database $database + * @var callable $reclaim + */ + [$database, $reclaim] = getConsoleDB(); + + $statsDocument + ->setAttribute('timestamp', DateTime::now()) + ->setAttribute('value', json_encode($payload)); + + Authorization::skip(function () use ($database, $statsDocument) { + $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument); + }); + } catch (Throwable $th) { + logError($th, 'updateWorkerDocument'); + } finally { + if (isset($reclaim)) { + $reclaim(); + } + } + }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, $logError) { +$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; $start = time(); - Timer::tick(5000, function () use ($server, $register, $realtime, $stats, $logError) { + Timer::tick(5000, function () use ($server, $register, $realtime, $stats) { /** * Sending current connections to project channels on the console project every 5 seconds. */ if ($realtime->hasSubscriber('console', Role::users()->toString(), 'project')) { - $database = getConsoleDB(); + try { + /** + * @var Database $database + * @var callable $reclaim + */ + [$database, $reclaim] = getConsoleDB(); - $payload = []; + $payload = []; - $list = Authorization::skip(fn () => $database->find('realtime', [ - Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)), - ])); + $list = Authorization::skip(function () use ($database) { + return $database->find('realtime', [ + Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)), + ]); + }); - /** - * Aggregate stats across containers. - */ - foreach ($list as $document) { - foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { - if (array_key_exists($projectId, $payload)) { - $payload[$projectId] += $value; - } else { - $payload[$projectId] = $value; + /** + * Aggregate stats across containers. + */ + foreach ($list as $document) { + foreach (json_decode($document->getAttribute('value')) as $projectId => $value) { + if (array_key_exists($projectId, $payload)) { + $payload[$projectId] += $value; + } else { + $payload[$projectId] = $value; + } } } - } - foreach ($stats as $projectId => $value) { - if (!array_key_exists($projectId, $payload)) { - continue; - } + foreach ($stats as $projectId => $value) { + if (!array_key_exists($projectId, $payload)) { + continue; + } - $event = [ - 'project' => 'console', - 'roles' => ['team:' . $stats->get($projectId, 'teamId')], - 'data' => [ - 'events' => ['stats.connections'], - 'channels' => ['project'], - 'timestamp' => DateTime::formatTz(DateTime::now()), - 'payload' => [ - $projectId => $payload[$projectId] + $event = [ + 'project' => 'console', + 'roles' => ['team:' . $stats->get($projectId, 'teamId')], + 'data' => [ + 'events' => ['stats.connections'], + 'channels' => ['project'], + 'timestamp' => DateTime::formatTz(DateTime::now()), + 'payload' => [ + $projectId => $payload[$projectId] + ] ] - ] - ]; + ]; - $server->send($realtime->getSubscribers($event), json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ])); + $server->send($realtime->getSubscribers($event), json_encode([ + 'type' => 'event', + 'data' => $event['data'] + ])); + } + } catch (Throwable $th) { + logError($th, 'sendStats'); + } finally { + if (isset($reclaim)) { + $reclaim(); + } } - - $register->get('pools')->reclaim(); } + /** * Sending test message for SDK E2E tests every 5 seconds. */ @@ -327,9 +427,15 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Attempting restart in 5 seconds (attempt #' . $attempts . ')'); sleep(5); // 5 sec delay between connection attempts } + $start = time(); - $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ + /** + * @var Redis $redis + * @var callable $reclaimForRedis + */ + [$redis, $reclaimForRedis] = getPubSub(); + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -348,17 +454,36 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, if ($realtime->hasSubscriber($projectId, 'user:' . $userId)) { $connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId])); - $consoleDatabase = getConsoleDB(); - $project = Authorization::skip(fn() => $consoleDatabase->getDocument('projects', $projectId)); - $database = getProjectDB($project); - $user = $database->getDocument('users', $userId); + /** + * @var Database $dbForConsole + * @var Database $dbForProject + * @var callable $reclaimForConsole + * @var callable $reclaimForProject + */ + [$dbForConsole, $reclaimForConsole] = getConsoleDB(); + + $project = Authorization::skip(function () use ($dbForConsole, $projectId) { + return $dbForConsole->getDocument('projects', $projectId); + }); + + [$dbForProject, $reclaimForProject] = getProjectDB($project); + + $user = $dbForProject->getDocument('users', $userId); $roles = Auth::getRoles($user); $realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']); - $register->get('pools')->reclaim(); + /** + * If we successfully reclaim, clear the callbacks + * so the finally block doesn't try to reclaim again. + */ + $reclaimForConsole(); + $reclaimForConsole = null; + + $reclaimForProject(); + $reclaimForProject = null; } } @@ -383,30 +508,43 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } }); } catch (Throwable $th) { - call_user_func($logError, $th, "pubSubConnection"); - + logError($th, 'pubSubConnection'); Console::error('Pub/sub error: ' . $th->getMessage()); $attempts++; sleep(DATABASE_RECONNECT_SLEEP); continue; } finally { - $register->get('pools')->reclaim(); + if (isset($reclaimForRedis)) { + $reclaimForRedis(); + } + if (isset($reclaimForConsole)) { + $reclaimForConsole(); + } + if (isset($reclaimForProject)) { + $reclaimForProject(); + } } } Console::error('Failed to restart pub/sub...'); }); -$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime, $logError) { +$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) { $app = new App('UTC'); $request = new Request($request); $response = new Response(new SwooleResponse()); Console::info("Connection open (user: {$connection})"); - App::setResource('pools', fn() => $register->get('pools')); - App::setResource('request', fn() => $request); - App::setResource('response', fn() => $response); + App::setResource('pools', function () use ($register) { + return $register->get('pools'); + }); + App::setResource('request', function () use ($request) { + return $request; + }); + App::setResource('response', function () use ($response) { + return $response; + }); try { /** @var Document $project */ @@ -419,9 +557,13 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing or unknown project ID'); } - $dbForProject = getProjectDB($project); - $console = $app->getResource('console'); /** @var Document $console */ - $user = $app->getResource('user'); /** @var Document $user */ + [$dbForProject, $reclaimForProject] = getProjectDB($project); + + /** @var Document $console */ + $console = $app->getResource('console'); + + /** @var Document $user */ + $user = $app->getResource('user'); /* * Abuse Check @@ -481,7 +623,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $stats->incr($project->getId(), 'connections'); $stats->incr($project->getId(), 'connectionsTotal'); } catch (Throwable $th) { - call_user_func($logError, $th, "initServer"); + logError($th, 'initServer'); $response = [ 'type' => 'error', @@ -500,7 +642,9 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Message: ' . $response['data']['message']); } } finally { - $register->get('pools')->reclaim(); + if (isset($reclaimForProject)) { + $reclaimForProject(); + } } }); @@ -508,11 +652,14 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re try { $response = new Response(new SwooleResponse()); $projectId = $realtime->connections[$connection]['projectId']; - $database = getConsoleDB(); + [$database, $reclaimForConsole] = getConsoleDB(); if ($projectId !== 'console') { - $project = Authorization::skip(fn() => $database->getDocument('projects', $projectId)); - $database = getProjectDB($project); + $project = Authorization::skip(function () use ($database, $projectId) { + return $database->getDocument('projects', $projectId); + }); + + [$database, $reclaimForProject] = getProjectDB($project); } else { $project = null; } @@ -598,7 +745,12 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re $server->close($connection, $th->getCode()); } } finally { - $register->get('pools')->reclaim(); + if (isset($reclaimForConsole)) { + $reclaimForConsole(); + } + if (isset($reclaimForProject)) { + $reclaimForProject(); + } } }); @@ -606,6 +758,7 @@ $server->onClose(function (int $connection) use ($realtime, $stats) { if (array_key_exists($connection, $realtime->connections)) { $stats->decr($realtime->connections[$connection]['projectId'], 'connectionsTotal'); } + $realtime->unsubscribe($connection); Console::info('Connection close: ' . $connection);