From 730e7319df25ffb0c1314086eca9df912e4730d6 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 17:03:28 +1200 Subject: [PATCH 1/7] Use connection group to reclaim all use connections for request --- app/http.php | 27 +++------- app/init.php | 65 +++++++---------------- src/Appwrite/Utopia/Queue/Connections.php | 56 +++++++++++++++++++ 3 files changed, 83 insertions(+), 65 deletions(-) create mode 100644 src/Appwrite/Utopia/Queue/Connections.php diff --git a/app/http.php b/app/http.php index 05e989b330..91da4e8cab 100644 --- a/app/http.php +++ b/app/http.php @@ -2,6 +2,7 @@ require_once __DIR__ . '/../vendor/autoload.php'; +use Appwrite\Utopia\Queue\Connections; use Appwrite\Utopia\Response; use Swoole\Process; use Swoole\Http\Server; @@ -323,27 +324,13 @@ $http->on('request', function (SwooleRequest $swooleRequest, SwooleResponse $swo $swooleResponse->end(\json_encode($output)); } finally { - $connectionForConsole = $app->getResource('connectionForConsole'); - $connectionForProject = $app->getResource('connectionForProject'); - $connectionForQueue = $app->getResource('connectionForQueue'); - $connectionsForCache = $app->getResource('connectionsForCache'); + /** + * @var Connections $connections + */ + $connections = $app->getResource('connections'); - if (!is_null($connectionForConsole)) { - $connectionForConsole->reclaim(); - } - - if (!is_null($connectionForProject)) { - $connectionForProject->reclaim(); - } - - if (!is_null($connectionForQueue)) { - $connectionForQueue->reclaim(); - } - - if (!empty($connectionsForCache)) { - foreach ($connectionsForCache as $connection) { - $connection->reclaim(); - } + if (!empty($connections)) { + $connections->reclaim(); } } }); diff --git a/app/init.php b/app/init.php index 5844cafa5c..1830fd1763 100644 --- a/app/init.php +++ b/app/init.php @@ -33,6 +33,7 @@ use Appwrite\Network\Validator\Email; use Appwrite\Network\Validator\Origin; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; +use Appwrite\Utopia\Queue\Connections; use Utopia\App; use Utopia\Logger\Logger; use Utopia\Cache\Adapter\Redis as RedisCache; @@ -881,17 +882,14 @@ App::setResource('locale', fn() => new Locale(App::getEnv('_APP_LOCALE', 'en'))) App::setResource('localeCodes', function () { return array_map(fn($locale) => $locale['code'], Config::getParam('locale-codes', [])); }); - // Queues -App::setResource('queue', function (Group $pools) { +App::setResource('queue', function (Group $pools, Connections $connections) { $connection = $pools->get('queue')->pop(); - App::setResource('connectionForQueue', function () use ($connection) { - return $connection; - }); + $connections->add($connection); return $connection->getResource(); -}, ['pools']); +}, ['pools', 'connections']); App::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); }, ['queue']); @@ -1132,20 +1130,11 @@ App::setResource('console', function () { ]); }, []); -App::setResource('connectionForProject', function () { - return null; -}); -App::setResource('connectionForConsole', function () { - return null; -}); -App::setResource('connectionForQueue', function () { - return null; -}); -App::setResource('connectionsForCache', function () { - return []; +App::setResource('connections', function () { + return new Connections(); }); -App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project) { +App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, Cache $cache, Document $project, Connections $connections) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -1154,9 +1143,7 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, ->get($project->getAttribute('database')) ->pop(); - App::setResource('connectionForProject', function () use ($connection) { - return $connection; - }); + $connections->add($connection); $dbAdapter = $connection->getResource(); @@ -1169,16 +1156,14 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole, ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; -}, ['pools', 'dbForConsole', 'cache', 'project']); +}, ['pools', 'dbForConsole', 'cache', 'project', 'connections']); -App::setResource('dbForConsole', function (Group $pools, Cache $cache) { +App::setResource('dbForConsole', function (Group $pools, Cache $cache, Connections $connections) { $connection = $pools ->get('console') ->pop(); - App::setResource('connectionForConsole', function () use ($connection) { - return $connection; - }); + $connections->add($connection); $dbAdapter = $connection->getResource(); @@ -1191,12 +1176,12 @@ App::setResource('dbForConsole', function (Group $pools, Cache $cache) { ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; -}, ['pools', 'cache']); +}, ['pools', 'cache', 'connections']); -App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { +App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { + $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -1208,8 +1193,6 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $database ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()) ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; @@ -1219,44 +1202,36 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, ->get($databaseName) ->pop(); - $dbAdapter = $connection->getResource(); + $connections->add($connection); - App::setResource('connectionForProject', function () use ($connection) { - return $connection; - }, []); + $dbAdapter = $connection->getResource(); $database = new Database($dbAdapter, $cache); $database ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()) ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); return $database; }; return $getProjectDB; -}, ['pools', 'dbForConsole', 'cache']); +}, ['pools', 'dbForConsole', 'cache', 'connections']); -App::setResource('cache', function (Group $pools) { +App::setResource('cache', function (Group $pools, Connections $connections) { $list = Config::getParam('pools-cache', []); $adapters = []; - $connections = []; foreach ($list as $value) { $connection = $pools ->get($value) ->pop(); - $connections[] = $connection; + $connections->add($connection); + $adapters[] = $connection->getResource(); } - App::setResource('connectionsForCache', function () use ($connections) { - return $connections; - }, []); - return new Cache(new Sharding($adapters)); }, ['pools']); diff --git a/src/Appwrite/Utopia/Queue/Connections.php b/src/Appwrite/Utopia/Queue/Connections.php new file mode 100644 index 0000000000..e3d5740b97 --- /dev/null +++ b/src/Appwrite/Utopia/Queue/Connections.php @@ -0,0 +1,56 @@ + + */ + protected array $connections = []; + + /** + * @param Connection $connection + * @return self + */ + public function add(Connection $connection): self + { + $this->connections[$connection->getID()] = $connection; + return $this; + } + + /** + * @param string $id + * @return Connection + * @throws \Exception + */ + public function get(string $id): Connection + { + return $this->connections[$id] ?? throw new \Exception("Connection '{$id}' not found"); + } + + /** + * @param string $id + * @return self + */ + public function remove(string $id): self + { + unset($this->connections[$id]); + return $this; + } + + /** + * @return self + * @throws \Exception + */ + public function reclaim(): self + { + foreach ($this->connections as $connection) { + $connection->reclaim(); + } + + return $this; + } +} From fefb60557df8a220bf1efc104e3adc867d3471b7 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 17:04:31 +1200 Subject: [PATCH 2/7] Reclaim only used connections for workers --- app/worker.php | 99 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/app/worker.php b/app/worker.php index 32ce35e033..2982f7af1f 100644 --- a/app/worker.php +++ b/app/worker.php @@ -16,6 +16,7 @@ use Appwrite\Event\Phone; use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; +use Appwrite\Utopia\Queue\Connections; use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -36,25 +37,32 @@ use Utopia\Pools\Group; use Utopia\Queue\Connection; Authorization::disable(); -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(); Server::setResource('register', fn () => $register); -Server::setResource('dbForConsole', function (Cache $cache, Registry $register) { +Server::setResource('connections', function () { + return new Connections(); +}); + +Server::setResource('dbForConsole', function (Cache $cache, Registry $register, Connections $connections) { $pools = $register->get('pools'); - $database = $pools + + $connection = $pools ->get('console') - ->pop() - ->getResource() - ; + ->pop(); + + $connections->add($connection); + + $database = $connection->getResource(); $adapter = new Database($database, $cache); $adapter->setNamespace('_console'); return $adapter; -}, ['cache', 'register']); +}, ['cache', 'register', 'connections']); -Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) { +Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole, Connections $connections) { $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); @@ -63,16 +71,19 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, } $pools = $register->get('pools'); - $database = $pools + + $connection = $pools ->get($project->getAttribute('database')) - ->pop() - ->getResource() - ; + ->pop(); + + $database = $connection->getResource(); + + $connections->add($connection); $adapter = new Database($database, $cache); $adapter->setNamespace('_' . $project->getInternalId()); return $adapter; -}, ['cache', 'register', 'message', 'dbForConsole']); +}, ['cache', 'register', 'message', 'dbForConsole', 'connections']); Server::setResource('project', function (Message $message, Database $dbForConsole) { $payload = $message->getPayload() ?? []; @@ -81,14 +92,14 @@ Server::setResource('project', function (Message $message, Database $dbForConsol if ($project->getId() === 'console') { return $project; } + return $dbForConsole->getDocument('projects', $project->getId()); - ; }, ['message', 'dbForConsole']); -Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { +Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools - return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases): Database { + return function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases): Database { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -101,10 +112,13 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso return $database; } - $dbAdapter = $pools + $dbConnection = $pools ->get($databaseName) - ->pop() - ->getResource(); + ->pop(); + + $dbAdapter = $dbConnection->getResource(); + + $connections->add($dbConnection); $database = new Database($dbAdapter, $cache); @@ -114,7 +128,7 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso return $database; }; -}, ['pools', 'dbForConsole', 'cache']); +}, ['pools', 'dbForConsole', 'cache', 'connections']); Server::setResource('abuseRetention', function () { return DateTime::addSeconds(new \DateTime(), -1 * App::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400)); @@ -128,82 +142,104 @@ Server::setResource('executionRetention', function () { return DateTime::addSeconds(new \DateTime(), -1 * App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', 1209600)); }); -Server::setResource('cache', function (Registry $register) { +Server::setResource('cache', function (Registry $register, Connections $connections) { $pools = $register->get('pools'); $list = Config::getParam('pools-cache', []); $adapters = []; foreach ($list as $value) { - $adapters[] = $pools + $connection = $pools ->get($value) - ->pop() - ->getResource() - ; + ->pop(); + + $connections->add($connection); + + $adapters[] = $connection->getResource(); } return new Cache(new Sharding($adapters)); -}, ['register']); +}, ['register', 'connections']); + Server::setResource('log', fn() => new Log()); + Server::setResource('queueForUsage', function (Connection $queue) { return new Usage($queue); }, ['queue']); + Server::setResource('queueForUsageDump', function (Connection $queue) { return new UsageDump($queue); }, ['queue']); + Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); + Server::setResource('queueForDatabase', function (Connection $queue) { return new EventDatabase($queue); }, ['queue']); + Server::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); }, ['queue']); + Server::setResource('queueForMails', function (Connection $queue) { return new Mail($queue); }, ['queue']); + Server::setResource('queueForBuilds', function (Connection $queue) { return new Build($queue); }, ['queue']); + Server::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); + Server::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); + Server::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); + Server::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); + Server::setResource('queueForCertificates', function (Connection $queue) { return new Certificate($queue); }, ['queue']); + Server::setResource('queueForMigrations', function (Connection $queue) { return new Migration($queue); }, ['queue']); + Server::setResource('logger', function (Registry $register) { return $register->get('logger'); }, ['register']); + Server::setResource('pools', function (Registry $register) { return $register->get('pools'); }, ['register']); + Server::setResource('getFunctionsDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId); }; }); + Server::setResource('getFilesDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId); }; }); + Server::setResource('getBuildsDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId); }; }); + Server::setResource('getCacheDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_CACHE . '/app-' . $projectId); @@ -254,9 +290,9 @@ $worker = $platform->getWorker(); $worker ->shutdown() - ->inject('pools') - ->action(function (Group $pools) { - $pools->reclaim(); + ->inject('connections') + ->action(function (Connections $connections) { + $connections->reclaim(); }); $worker @@ -265,7 +301,10 @@ $worker ->inject('logger') ->inject('log') ->inject('project') - ->action(function (Throwable $error, ?Logger $logger, Log $log, Document $project) { + ->inject('connections') + ->action(function (Throwable $error, ?Logger $logger, Log $log, Document $project, Connections $connections) { + $connections->reclaim(); + $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); if ($error instanceof PDOException) { From 4bef6125f700cbc4158f51dae663565c729aca15 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 17:05:44 +1200 Subject: [PATCH 3/7] Use on worker stop to properly reclaim realtime redis connections and unsubscribe --- app/realtime.php | 35 +++++++++++++++++++++-------------- composer.json | 2 +- composer.lock | 30 ++++++++++++------------------ 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 8a0b2f2c38..ce61f0f2e9 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -38,6 +38,8 @@ require_once __DIR__ . '/init.php'; Runtime::enableCoroutine(); +$redisConnections = []; + // Allows overriding if (!function_exists('getConsoleDB')) { /** @@ -60,11 +62,7 @@ if (!function_exists('getConsoleDB')) { [$cache, $reclaimCache] = getCache(); $database = new Database($dbAdapter, $cache); - - $database - ->setNamespace('_console') - ->setMetadata('host', \gethostname()) - ->setMetadata('project', '_console'); + $database->setNamespace('_console'); return [$database, function () use ($dbConnection, $reclaimCache) { $dbConnection->reclaim(); @@ -100,11 +98,7 @@ if (!function_exists('getProjectDB')) { [$cache, $reclaimCache] = getCache(); $database = new Database($dbAdapter, $cache); - - $database - ->setNamespace('_' . $project->getInternalId()) - ->setMetadata('host', \gethostname()) - ->setMetadata('project', $project->getId()); + $database->setNamespace('_' . $project->getInternalId()); return [$database, function () use ($dbConnection, $reclaimCache) { $dbConnection->reclaim(); @@ -325,7 +319,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume }); }); -$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime) { +$server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime, &$redisConnections) { Console::success('Worker ' . $workerId . ' started successfully'); $attempts = 0; @@ -436,6 +430,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, */ [$redis, $reclaimForRedis] = getPubSub(); + $redisConnections[$workerId] = [$redis, $reclaimForRedis]; + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { @@ -514,9 +510,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, sleep(DATABASE_RECONNECT_SLEEP); continue; } finally { - if (isset($reclaimForRedis)) { - $reclaimForRedis(); - } if (isset($reclaimForConsole)) { $reclaimForConsole(); } @@ -529,6 +522,20 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, Console::error('Failed to restart pub/sub...'); }); +$server->onWorkerStop(function (int $workerId) use ($redisConnections) { + /** + * @var Redis $redis + * @var callable $reclaim + */ + [$redis, $reclaim] = $redisConnections[$workerId] ?? null; + + $redis?->unsubscribe(['realtime']); + + if ($reclaim) { + $reclaim(); + } +}); + $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $register, $stats, &$realtime) { $app = new App('UTC'); $request = new Request($request); diff --git a/composer.json b/composer.json index 0e8575e7c1..42eedfaba6 100644 --- a/composer.json +++ b/composer.json @@ -69,7 +69,7 @@ "utopia-php/storage": "0.18.*", "utopia-php/swoole": "0.5.*", "utopia-php/vcs": "0.6.*", - "utopia-php/websocket": "0.1.*", + "utopia-php/websocket": "dev-feat-worker-stop", "matomo/device-detector": "6.1.*", "dragonmantank/cron-expression": "3.3.2", "phpmailer/phpmailer": "6.8.0", diff --git a/composer.lock b/composer.lock index c8ce177133..70fe96499c 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "e90287b00089ad8e8059e6e157f3a6c8", + "content-hash": "ce7a7f429d68836b8667e2b02440fc3f", "packages": [ { "name": "adhocore/jwt", @@ -2273,22 +2273,24 @@ }, { "name": "utopia-php/websocket", - "version": "0.1.0", + "version": "dev-feat-worker-stop", "source": { "type": "git", "url": "https://github.com/utopia-php/websocket.git", - "reference": "51fcb86171400d8aa40d76c54593481fd273dab5" + "reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/websocket/zipball/51fcb86171400d8aa40d76c54593481fd273dab5", - "reference": "51fcb86171400d8aa40d76c54593481fd273dab5", + "url": "https://api.github.com/repos/utopia-php/websocket/zipball/b7adfab69d4c48a60272fa4cb3328f9b400b0ffa", + "reference": "b7adfab69d4c48a60272fa4cb3328f9b400b0ffa", "shasum": "" }, "require": { "php": ">=8.0" }, "require-dev": { + "laravel/pint": "^1.15", + "phpstan/phpstan": "^1.8", "phpunit/phpunit": "^9.5.5", "swoole/ide-helper": "4.6.6", "textalk/websocket": "1.5.2", @@ -2305,16 +2307,6 @@ "license": [ "MIT" ], - "authors": [ - { - "name": "Eldad Fux", - "email": "eldad@appwrite.io" - }, - { - "name": "Torsten Dittmann", - "email": "torsten@appwrite.io" - } - ], "description": "A simple abstraction for WebSocket servers.", "keywords": [ "framework", @@ -2325,9 +2317,9 @@ ], "support": { "issues": "https://github.com/utopia-php/websocket/issues", - "source": "https://github.com/utopia-php/websocket/tree/0.1.0" + "source": "https://github.com/utopia-php/websocket/tree/feat-worker-stop" }, - "time": "2021-12-20T10:50:09+00:00" + "time": "2024-04-09T04:48:45+00:00" }, { "name": "webmozart/assert", @@ -5145,7 +5137,9 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": [], + "stability-flags": { + "utopia-php/websocket": 20 + }, "prefer-stable": false, "prefer-lowest": false, "platform": { From 388f450d895d919e8964043562fb6bae249d6b27 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 18:11:08 +1200 Subject: [PATCH 4/7] Fix missing injection --- app/init.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/init.php b/app/init.php index 1830fd1763..86f5ad5b1e 100644 --- a/app/init.php +++ b/app/init.php @@ -1233,7 +1233,7 @@ App::setResource('cache', function (Group $pools, Connections $connections) { } return new Cache(new Sharding($adapters)); -}, ['pools']); +}, ['pools', 'connections']); App::setResource('deviceLocal', function () { return new Local(); From b50ec49ac13b0b13f43dab8a6cb01cd3dddea233 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 20:26:42 +1200 Subject: [PATCH 5/7] Move `Connections` to pools namespace --- app/http.php | 2 +- app/init.php | 2 +- app/worker.php | 2 +- .../Utopia/{Queue => Pools}/Connections.php | 20 ++++++++----------- 4 files changed, 11 insertions(+), 15 deletions(-) rename src/Appwrite/Utopia/{Queue => Pools}/Connections.php (69%) diff --git a/app/http.php b/app/http.php index 91da4e8cab..008a355550 100644 --- a/app/http.php +++ b/app/http.php @@ -2,7 +2,7 @@ require_once __DIR__ . '/../vendor/autoload.php'; -use Appwrite\Utopia\Queue\Connections; +use Appwrite\Utopia\Pools\Connections; use Appwrite\Utopia\Response; use Swoole\Process; use Swoole\Http\Server; diff --git a/app/init.php b/app/init.php index 86f5ad5b1e..829911f904 100644 --- a/app/init.php +++ b/app/init.php @@ -33,7 +33,7 @@ use Appwrite\Network\Validator\Email; use Appwrite\Network\Validator\Origin; use Appwrite\OpenSSL\OpenSSL; use Appwrite\URL\URL as AppwriteURL; -use Appwrite\Utopia\Queue\Connections; +use Appwrite\Utopia\Pools\Connections; use Utopia\App; use Utopia\Logger\Logger; use Utopia\Cache\Adapter\Redis as RedisCache; diff --git a/app/worker.php b/app/worker.php index 2982f7af1f..99f4154c98 100644 --- a/app/worker.php +++ b/app/worker.php @@ -16,7 +16,7 @@ use Appwrite\Event\Phone; use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; -use Appwrite\Utopia\Queue\Connections; +use Appwrite\Utopia\Pools\Connections; use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; diff --git a/src/Appwrite/Utopia/Queue/Connections.php b/src/Appwrite/Utopia/Pools/Connections.php similarity index 69% rename from src/Appwrite/Utopia/Queue/Connections.php rename to src/Appwrite/Utopia/Pools/Connections.php index e3d5740b97..bf9a44bdb8 100644 --- a/src/Appwrite/Utopia/Queue/Connections.php +++ b/src/Appwrite/Utopia/Pools/Connections.php @@ -1,6 +1,6 @@ connections[$id] ?? throw new \Exception("Connection '{$id}' not found"); - } - /** * @param string $id * @return self @@ -41,14 +31,20 @@ class Connections return $this; } + public function count(): int + { + return \count($this->connections); + } + /** * @return self * @throws \Exception */ public function reclaim(): self { - foreach ($this->connections as $connection) { + foreach ($this->connections as $id => $connection) { $connection->reclaim(); + unset($this->connections[$id]); } return $this; From 9a3f6e7f717ecfa2778814f730acac70e698887a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 20:27:30 +1200 Subject: [PATCH 6/7] Add connections test --- tests/unit/Utopia/Pools/ConnectionsTest.php | 40 +++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/unit/Utopia/Pools/ConnectionsTest.php diff --git a/tests/unit/Utopia/Pools/ConnectionsTest.php b/tests/unit/Utopia/Pools/ConnectionsTest.php new file mode 100644 index 0000000000..fb9c8bcc09 --- /dev/null +++ b/tests/unit/Utopia/Pools/ConnectionsTest.php @@ -0,0 +1,40 @@ +add($connection); + $this->assertEquals(1, $connections->count()); + } + + public function testRemove() + { + $connections = new Connections(); + $connection = new Connection('resource'); + $connections->add($connection); + $connections->remove($connection->getID()); + $this->assertEquals(0, $connections->count()); + } + + public function testReclaim() + { + $connections = new Connections(); + $pool = new Pool('test', 1, function () { + return 'resource'; + }); + $connection = $pool->pop(); + $connections->add($connection); + $connections->reclaim(); + $this->assertEquals(1, $pool->count()); + } +} From 3859f037a1885e54d6e08456288c804dc3aaba1c Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 9 Apr 2024 20:50:33 +1200 Subject: [PATCH 7/7] Revert "Reclaim only used connections for workers" This reverts commit fefb60557df8a220bf1efc104e3adc867d3471b7. # Conflicts: # app/worker.php --- app/worker.php | 99 +++++++++++++++----------------------------------- 1 file changed, 30 insertions(+), 69 deletions(-) diff --git a/app/worker.php b/app/worker.php index 99f4154c98..32ce35e033 100644 --- a/app/worker.php +++ b/app/worker.php @@ -16,7 +16,6 @@ use Appwrite\Event\Phone; use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; use Appwrite\Platform\Appwrite; -use Appwrite\Utopia\Pools\Connections; use Swoole\Runtime; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -37,32 +36,25 @@ use Utopia\Pools\Group; use Utopia\Queue\Connection; Authorization::disable(); -Runtime::enableCoroutine(); +Runtime::enableCoroutine(SWOOLE_HOOK_ALL); Server::setResource('register', fn () => $register); -Server::setResource('connections', function () { - return new Connections(); -}); - -Server::setResource('dbForConsole', function (Cache $cache, Registry $register, Connections $connections) { +Server::setResource('dbForConsole', function (Cache $cache, Registry $register) { $pools = $register->get('pools'); - - $connection = $pools + $database = $pools ->get('console') - ->pop(); - - $connections->add($connection); - - $database = $connection->getResource(); + ->pop() + ->getResource() + ; $adapter = new Database($database, $cache); $adapter->setNamespace('_console'); return $adapter; -}, ['cache', 'register', 'connections']); +}, ['cache', 'register']); -Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole, Connections $connections) { +Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) { $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); @@ -71,19 +63,16 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, } $pools = $register->get('pools'); - - $connection = $pools + $database = $pools ->get($project->getAttribute('database')) - ->pop(); - - $database = $connection->getResource(); - - $connections->add($connection); + ->pop() + ->getResource() + ; $adapter = new Database($database, $cache); $adapter->setNamespace('_' . $project->getInternalId()); return $adapter; -}, ['cache', 'register', 'message', 'dbForConsole', 'connections']); +}, ['cache', 'register', 'message', 'dbForConsole']); Server::setResource('project', function (Message $message, Database $dbForConsole) { $payload = $message->getPayload() ?? []; @@ -92,14 +81,14 @@ Server::setResource('project', function (Message $message, Database $dbForConsol if ($project->getId() === 'console') { return $project; } - return $dbForConsole->getDocument('projects', $project->getId()); + ; }, ['message', 'dbForConsole']); -Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, Cache $cache, Connections $connections) { +Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools - return function (Document $project) use ($pools, $dbForConsole, $cache, $connections, &$databases): Database { + return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases): Database { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } @@ -112,13 +101,10 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso return $database; } - $dbConnection = $pools + $dbAdapter = $pools ->get($databaseName) - ->pop(); - - $dbAdapter = $dbConnection->getResource(); - - $connections->add($dbConnection); + ->pop() + ->getResource(); $database = new Database($dbAdapter, $cache); @@ -128,7 +114,7 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso return $database; }; -}, ['pools', 'dbForConsole', 'cache', 'connections']); +}, ['pools', 'dbForConsole', 'cache']); Server::setResource('abuseRetention', function () { return DateTime::addSeconds(new \DateTime(), -1 * App::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400)); @@ -142,104 +128,82 @@ Server::setResource('executionRetention', function () { return DateTime::addSeconds(new \DateTime(), -1 * App::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', 1209600)); }); -Server::setResource('cache', function (Registry $register, Connections $connections) { +Server::setResource('cache', function (Registry $register) { $pools = $register->get('pools'); $list = Config::getParam('pools-cache', []); $adapters = []; foreach ($list as $value) { - $connection = $pools + $adapters[] = $pools ->get($value) - ->pop(); - - $connections->add($connection); - - $adapters[] = $connection->getResource(); + ->pop() + ->getResource() + ; } return new Cache(new Sharding($adapters)); -}, ['register', 'connections']); - +}, ['register']); Server::setResource('log', fn() => new Log()); - Server::setResource('queueForUsage', function (Connection $queue) { return new Usage($queue); }, ['queue']); - Server::setResource('queueForUsageDump', function (Connection $queue) { return new UsageDump($queue); }, ['queue']); - Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); - Server::setResource('queueForDatabase', function (Connection $queue) { return new EventDatabase($queue); }, ['queue']); - Server::setResource('queueForMessaging', function (Connection $queue) { return new Phone($queue); }, ['queue']); - Server::setResource('queueForMails', function (Connection $queue) { return new Mail($queue); }, ['queue']); - Server::setResource('queueForBuilds', function (Connection $queue) { return new Build($queue); }, ['queue']); - Server::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); - Server::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); - Server::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); - Server::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); - Server::setResource('queueForCertificates', function (Connection $queue) { return new Certificate($queue); }, ['queue']); - Server::setResource('queueForMigrations', function (Connection $queue) { return new Migration($queue); }, ['queue']); - Server::setResource('logger', function (Registry $register) { return $register->get('logger'); }, ['register']); - Server::setResource('pools', function (Registry $register) { return $register->get('pools'); }, ['register']); - Server::setResource('getFunctionsDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId); }; }); - Server::setResource('getFilesDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId); }; }); - Server::setResource('getBuildsDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId); }; }); - Server::setResource('getCacheDevice', function () { return function (string $projectId) { return getDevice(APP_STORAGE_CACHE . '/app-' . $projectId); @@ -290,9 +254,9 @@ $worker = $platform->getWorker(); $worker ->shutdown() - ->inject('connections') - ->action(function (Connections $connections) { - $connections->reclaim(); + ->inject('pools') + ->action(function (Group $pools) { + $pools->reclaim(); }); $worker @@ -301,10 +265,7 @@ $worker ->inject('logger') ->inject('log') ->inject('project') - ->inject('connections') - ->action(function (Throwable $error, ?Logger $logger, Log $log, Document $project, Connections $connections) { - $connections->reclaim(); - + ->action(function (Throwable $error, ?Logger $logger, Log $log, Document $project) { $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); if ($error instanceof PDOException) {