diff --git a/app/http.php b/app/http.php index 05e989b330..008a355550 100644 --- a/app/http.php +++ b/app/http.php @@ -2,6 +2,7 @@ require_once __DIR__ . '/../vendor/autoload.php'; +use Appwrite\Utopia\Pools\Connections; use Appwrite\Utopia\Response; use Swoole\Process; use Swoole\Http\Server; @@ -323,27 +324,13 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo $swooleResponse->end(\json_encode($output)); } finally { - $connectionForConsole = $app->getResource('connectionForConsole'); - $connectionForProject = $app->getResource('connectionForProject'); - $connectionForQueue = $app->getResource('connectionForQueue'); - $connectionsForCache = $app->getResource('connectionsForCache'); + /** + * @var Connections $connections + */ + $connections = $app->getResource('connections'); - if (!is_null($connectionForConsole)) { - $connectionForConsole->reclaim(); - } - - if (!is_null($connectionForProject)) { - $connectionForProject->reclaim(); - } - - if (!is_null($connectionForQueue)) { - $connectionForQueue->reclaim(); - } - - if (!empty($connectionsForCache)) { - foreach ($connectionsForCache as $connection) { - $connection->reclaim(); - } + if (!empty($connections)) { + $connections->reclaim(); } } }); diff --git a/app/init.php b/app/init.php index 5844cafa5c..829911f904 100644 --- a/app/init.php +++ b/app/init.php @@ -33,6 +33,7 @@ use Appwrite\Network\Validator\Email; use Appwrite\Network\Validator\Origin; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; +use Appwrite\Utopia\Pools\Connections; use Utopia\App; use Utopia\Logger\Logger; use Utopia\Cache\Adapter\Redis as RedisCache; @@ -881,17 +882,14 @@ App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en'))) App::setResource('localeCodes', function () { return array_map(fn($locale) => $locale['code'], Config::getParam('locale-codes', [])); }); - // Queues -App::setResource('queue', function (Group $pools) { +App::setResource('queue', function (Group $pools, Connections $connections) { $connection = $pools->get('queue')->pop(); - App::setResource('connectionForQueue', function () use ($connection) { - return $connection; - }); + $connections->add($connection); return $connection->getResource(); -}, ['pools']); +}, ['pools', 'connections']); App::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); }, ['queue']); @@ -1132,20 +1130,11 @@ App::setResource('console', function () { ]); }, []); -App::setResource('connectionForProject', function () { - return null; -}); -App::setResource('connectionForConsole', function () { - return null; -}); -App::setResource('connectionForQueue', function () { - return null; -}); -App::setResource('connectionsForCache', function () { - return []; +App::setResource('connections', function () { + return new Connections(); }); -App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) { +App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Connections $connections) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -1154,9 +1143,7 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, ->get($project->getAttribute('database')) ->pop(); - App::setResource('connectionForProject', function () use ($connection) { - return $connection; - }); + $connections->add($connection); $dbAdapter = $connection->getResource(); @@ -1169,16 +1156,14 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; -}, ['pools', 'dbForConsole', 'cache', 'project']); +}, ['pools', 'dbForConsole', 'cache', 'project', 'connections']); -App::setResource('dbForConsole', function (Group $pools, Cache $cache) { +App::setResource('dbForConsole', function (Group $pools, Cache $cache, Connections $connections) { $connection = $pools ->get('console') ->pop(); - App::setResource('connectionForConsole', function () use ($connection) { - return $connection; - }); + $connections->add($connection); $dbAdapter = $connection->getResource(); @@ -1191,12 +1176,12 @@ App::setResource('dbForConsole', function (Group $pools, Cache $cache) { ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; -}, ['pools', 'cache']); +}, ['pools', 'cache', 'connections']); -App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { +App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { + $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -1208,8 +1193,6 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $database ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()) ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; @@ -1219,46 +1202,38 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, ->get($databaseName) ->pop(); - $dbAdapter = $connection->getResource(); + $connections->add($connection); - App::setResource('connectionForProject', function () use ($connection) { - return $connection; - }, []); + $dbAdapter = $connection->getResource(); $database = new Database($dbAdapter, $cache); $database ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()) ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; }; return $getProjectDB; -}, ['pools', 'dbForConsole', 'cache']); +}, ['pools', 'dbForConsole', 'cache', 'connections']); -App::setResource('cache', function (Group $pools) { +App::setResource('cache', function (Group $pools, Connections $connections) { $list = Config::getParam('pools-cache', []); $adapters = []; - $connections = []; foreach ($list as $value) { $connection = $pools ->get($value) ->pop(); - $connections[] = $connection; + $connections->add($connection); + $adapters[] = $connection->getResource(); } - App::setResource('connectionsForCache', function () use ($connections) { - return $connections; - }, []); - return new Cache(new Sharding($adapters)); -}, ['pools']); +}, ['pools', 'connections']); App::setResource('deviceLocal', function () { return new Local(); diff --git a/app/realtime.php b/app/realtime.php index 8a0b2f2c38..ce61f0f2e9 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -38,6 +38,8 @@ require_once __DIR__ . '/init.php'; Runtime::enableCoroutine(); +$redisConnections = []; + // Allows overriding if (!function_exists('getConsoleDB')) { /** @@ -60,11 +62,7 @@ if (!function_exists('getConsoleDB')) { [$cache, $reclaimCache] = getCache(); $database = new Database($dbAdapter, $cache); - - $database - ->setNamespace('_console') - ->setMetadata('host', \gethostname()) - ->setMetadata('project', '_console'); + $database->setNamespace('_console'); return [$database, function () use ($dbConnection, $reclaimCache) { $dbConnection->reclaim(); @@ -100,11 +98,7 @@ if (!function_exists('getProjectDB')) { [$cache, $reclaimCache] = getCache(); $database = new Database($dbAdapter, $cache); - - $database - ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()); + $database->setNamespace('_' . $project->getInternalId()); return [$database, function () use ($dbConnection, $reclaimCache) { $dbConnection->reclaim(); @@ -325,7 +319,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) { +$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, &$redisConnections) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; @@ -436,6 +430,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, */ [$redis, $reclaimForRedis] = getPubSub(); + $redisConnections[$workerId] = [$redis, $reclaimForRedis]; + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -514,9 +510,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, sleep(DATABASE_RECONNECT_SLEEP); continue; } finally { - if (isset($reclaimForRedis)) { - $reclaimForRedis(); - } if (isset($reclaimForConsole)) { $reclaimForConsole(); } @@ -529,6 +522,20 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Failed to restart pub/sub...'); }); +$server->onWorkerStop(function (int $workerId) use ($redisConnections) { + /** + * @var Redis $redis + * @var callable $reclaim + */ + [$redis, $reclaim] = $redisConnections[$workerId] ?? null; + + $redis?->unsubscribe(['realtime']); + + if ($reclaim) { + $reclaim(); + } +}); + $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) { $app = new App('UTC'); $request = new Request($request); diff --git a/composer.json b/composer.json index 0e8575e7c1..42eedfaba6 100644 --- a/composer.json +++ b/composer.json @@ -69,7 +69,7 @@ "utopia-php/storage": "0.18.*", "utopia-php/swoole": "0.5.*", "utopia-php/vcs": "0.6.*", - "utopia-php/websocket": "0.1.*", + "utopia-php/websocket": "dev-feat-worker-stop", "matomo/device-detector": "6.1.*", "dragonmantank/cron-expression": "3.3.2", "phpmailer/phpmailer": "6.8.0", diff --git a/composer.lock b/composer.lock index c8ce177133..70fe96499c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "e90287b00089ad8e8059e6e157f3a6c8", + "content-hash": "ce7a7f429d68836b8667e2b02440fc3f", "packages": [ { "name": "adhocore/jwt", @@ -2273,22 +2273,24 @@ }, { "name": "utopia-php/websocket", - "version": "0.1.0", + "version": "dev-feat-worker-stop", "source": { "type": "git", "url": "https://github.com/utopia-php/websocket.git", - "reference": "51fcb86171400d8aa40d76c54593481fd273dab5" + "reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/websocket/zipball/51fcb86171400d8aa40d76c54593481fd273dab5", - "reference": "51fcb86171400d8aa40d76c54593481fd273dab5", + "url": "https://api.github.com/repos/utopia-php/websocket/zipball/b7adfab69d4c48a60272fa4cb3328f9b400b0ffa", + "reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa", "shasum": "" }, "require": { "php": ">=8.0" }, "require-dev": { + "laravel/pint": "^1.15", + "phpstan/phpstan": "^1.8", "phpunit/phpunit": "^9.5.5", "swoole/ide-helper": "4.6.6", "textalk/websocket": "1.5.2", @@ -2305,16 +2307,6 @@ "license": [ "MIT" ], - "authors": [ - { - "name": "Eldad Fux", - "email": "eldad@appwrite.io" - }, - { - "name": "Torsten Dittmann", - "email": "torsten@appwrite.io" - } - ], "description": "A simple abstraction for WebSocket servers.", "keywords": [ "framework", @@ -2325,9 +2317,9 @@ ], "support": { "issues": "https://github.com/utopia-php/websocket/issues", - "source": "https://github.com/utopia-php/websocket/tree/0.1.0" + "source": "https://github.com/utopia-php/websocket/tree/feat-worker-stop" }, - "time": "2021-12-20T10:50:09+00:00" + "time": "2024-04-09T04:48:45+00:00" }, { "name": "webmozart/assert", @@ -5145,7 +5137,9 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": { + "utopia-php/websocket": 20 + }, "prefer-stable": false, "prefer-lowest": false, "platform": { diff --git a/src/Appwrite/Utopia/Pools/Connections.php b/src/Appwrite/Utopia/Pools/Connections.php new file mode 100644 index 0000000000..bf9a44bdb8 --- /dev/null +++ b/src/Appwrite/Utopia/Pools/Connections.php @@ -0,0 +1,52 @@ + + */ + protected array $connections = []; + + /** + * @param Connection $connection + * @return self + */ + public function add(Connection $connection): self + { + $this->connections[$connection->getID()] = $connection; + return $this; + } + + /** + * @param string $id + * @return self + */ + public function remove(string $id): self + { + unset($this->connections[$id]); + return $this; + } + + public function count(): int + { + return \count($this->connections); + } + + /** + * @return self + * @throws \Exception + */ + public function reclaim(): self + { + foreach ($this->connections as $id => $connection) { + $connection->reclaim(); + unset($this->connections[$id]); + } + + return $this; + } +} diff --git a/tests/unit/Utopia/Pools/ConnectionsTest.php b/tests/unit/Utopia/Pools/ConnectionsTest.php new file mode 100644 index 0000000000..fb9c8bcc09 --- /dev/null +++ b/tests/unit/Utopia/Pools/ConnectionsTest.php @@ -0,0 +1,40 @@ +add($connection); + $this->assertEquals(1, $connections->count()); + } + + public function testRemove() + { + $connections = new Connections(); + $connection = new Connection('resource'); + $connections->add($connection); + $connections->remove($connection->getID()); + $this->assertEquals(0, $connections->count()); + } + + public function testReclaim() + { + $connections = new Connections(); + $pool = new Pool('test', 1, function () { + return 'resource'; + }); + $connection = $pool->pop(); + $connections->add($connection); + $connections->reclaim(); + $this->assertEquals(1, $pool->count()); + } +}