From ea42b823d8563c2c0638eaedce3694c3b3ac57db Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 30 Apr 2025 22:47:41 +1200 Subject: [PATCH 1/9] Revert name merge issue --- src/Appwrite/Messaging/Adapter/Realtime.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 18b0d94e7d..be263aa655 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -36,12 +36,12 @@ class Realtime extends MessagingAdapter */ public array $subscriptions = []; - private PubSubPool $redis; + private PubSubPool $pubSubPool; public function __construct() { global $register; - $this->redis = new PubSubPool($register->get('pools')->get('pubsub')); + $this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub')); } /** @@ -148,7 +148,7 @@ class Realtime extends MessagingAdapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $this->redis->publish('realtime', json_encode([ + $this->pubSubPool->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, From 406630cb61eaa54b314e183a99f44c932199847a Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 16:40:47 +1200 Subject: [PATCH 2/9] Update deps --- composer.lock | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/composer.lock b/composer.lock index cf15b7657e..5a3d58a047 100644 --- a/composer.lock +++ b/composer.lock @@ -1109,16 +1109,16 @@ }, { "name": "open-telemetry/api", - "version": "1.2.3", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/api.git", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1" + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", - "reference": "199d7ddda88f5f5619fa73463f1a5a7149ccd1f1", + "url": "https://api.github.com/repos/opentelemetry-php/api/zipball/4e3bb38e069876fb73c2ce85c89583bf2b28cd86", + "reference": "4e3bb38e069876fb73c2ce85c89583bf2b28cd86", "shasum": "" }, "require": { @@ -1175,7 +1175,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-05T21:42:54+00:00" + "time": "2025-05-07T12:32:21+00:00" }, { "name": "open-telemetry/context", @@ -1238,16 +1238,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.2.1", + "version": "1.3.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747" + "reference": "19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/b7580440b7481a98da97aceabeb46e1b276c8747", - "reference": "b7580440b7481a98da97aceabeb46e1b276c8747", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c", + "reference": "19adf03d2b0f91f9e9b1c7f93db6c755c737cf6c", "shasum": "" }, "require": { @@ -1298,7 +1298,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-03-06T23:21:56+00:00" + "time": "2025-05-12T00:36:35+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -1365,16 +1365,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.3.0", + "version": "1.4.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "05d9ceb6773b5bddcf485af6d4a6f543bbeb980b" + "reference": "939d3a28395c249a763676458140dad44b3a8011" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/05d9ceb6773b5bddcf485af6d4a6f543bbeb980b", - "reference": "05d9ceb6773b5bddcf485af6d4a6f543bbeb980b", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/939d3a28395c249a763676458140dad44b3a8011", + "reference": "939d3a28395c249a763676458140dad44b3a8011", "shasum": "" }, "require": { @@ -1451,7 +1451,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-05-01T23:20:43+00:00" + "time": "2025-05-07T12:32:21+00:00" }, { "name": "open-telemetry/sem-conv", From 3e914a4f1b698485176bbcd171acbc8de9a0ed64 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 30 Apr 2025 22:47:41 +1200 Subject: [PATCH 3/9] Revert name merge issue --- src/Appwrite/Messaging/Adapter/Realtime.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 1963bdedd6..96793b2683 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -36,12 +36,12 @@ class Realtime extends Adapter */ public array $subscriptions = []; - private Pool $pubsubPool; + private PubSubPool $pubSubPool; public function __construct() { global $register; - $this->pubsubPool = $register->get('pools')->get('pubsub'); + $this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub')); } /** @@ -147,7 +147,7 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $message = [ + $this->pubSubPool->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, 'permissionsChanged' => $permissionsChanged, From 16b2449787954c2211f431323137d252356c3f3e Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:14:07 +1200 Subject: [PATCH 4/9] Revert "Merge pull request #9703 from appwrite/revert-9659-feat-pool-adapter" This reverts commit bf9deb09f588b59399b7207fc93311f3ec2b8fe3, reversing changes made to d312fe22ff264558e240565cac4fcb2f6a24a401. # Conflicts: # app/cli.php # app/init/registers.php # composer.lock # src/Appwrite/Messaging/Adapter/Realtime.php # src/Appwrite/Platform/Tasks/ScheduleBase.php # src/Appwrite/Platform/Tasks/ScheduleExecutions.php # src/Appwrite/Platform/Tasks/ScheduleFunctions.php # src/Appwrite/Platform/Tasks/ScheduleMessages.php --- app/cli.php | 42 ++---- app/controllers/api/health.php | 137 +++++++++--------- app/controllers/api/projects.php | 3 +- app/http.php | 19 +-- app/init/registers.php | 4 +- app/init/resources.php | 52 +++---- app/realtime.php | 110 ++++++++------ app/worker.php | 69 +++------ composer.json | 2 +- composer.lock | 31 ++-- src/Appwrite/Event/Event.php | 7 - src/Appwrite/Messaging/Adapter/Realtime.php | 25 ++-- src/Appwrite/Platform/Tasks/Doctor.php | 50 ++++--- src/Appwrite/Platform/Tasks/ScheduleBase.php | 6 +- .../Platform/Tasks/ScheduleExecutions.php | 12 +- .../Platform/Tasks/ScheduleFunctions.php | 13 +- .../Platform/Tasks/ScheduleMessages.php | 7 +- src/Appwrite/Platform/Workers/Deletes.php | 5 +- src/Appwrite/Platform/Workers/StatsUsage.php | 12 +- .../Platform/Workers/StatsUsageDump.php | 15 +- src/Appwrite/PubSub/Adapter/Pool.php | 46 ++++++ tests/resources/docker/docker-compose.yml | 32 +--- 22 files changed, 317 insertions(+), 382 deletions(-) create mode 100644 src/Appwrite/PubSub/Adapter/Pool.php diff --git a/app/cli.php b/app/cli.php index 9ade97e90c..61d8a90100 100644 --- a/app/cli.php +++ b/app/cli.php @@ -10,12 +10,13 @@ use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Executor\Executor; -use Swoole\Timer; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\CLI; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; @@ -23,11 +24,12 @@ use Utopia\DSN\DSN; use Utopia\Logger\Log; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; - +use Swoole\Timer; use function Swoole\Coroutine\run; // Overwriting runtimes to be architecture agnostic for CLI @@ -45,10 +47,7 @@ CLI::setResource('cache', function ($pools) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -68,12 +67,8 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $attempts++; try { // Prepare database connection - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $dbForPlatform = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); $dbForPlatform ->setNamespace('_console') @@ -91,7 +86,6 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) { $ready = true; } catch (\Throwable $err) { Console::warning($err->getMessage()); - $pools->get('console')->reclaim(); sleep($sleep); } } while ($attempts < $maxAttempts && !$ready); @@ -141,12 +135,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -172,21 +162,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; + return function (?Document $project = null) use ($pools, $cache, $database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -210,7 +194,7 @@ CLI::setResource('queueForStatsResources', function (Publisher $publisher) { return new StatsResources($publisher); }, ['publisher']); CLI::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); CLI::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 3602ab8ff9..5fe2de5549 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -3,13 +3,16 @@ use Appwrite\ClamAV\Network; use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Document; use Utopia\Domains\Validator\PublicDomain; use Utopia\Pools\Group; @@ -34,8 +37,8 @@ App::get('/v1/health') namespace: 'health', group: 'health', name: 'get', - auth: [AuthType::KEY], description: '/docs/references/health/get.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -70,11 +73,11 @@ App::get('/v1/health/db') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getDB', description: '/docs/references/health/get-db.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -86,8 +89,8 @@ App::get('/v1/health/db') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -97,7 +100,7 @@ App::get('/v1/health/db') foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); $checkStart = \microtime(true); @@ -108,16 +111,16 @@ App::get('/v1/health/db') 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $failure[] = $database; + $failures[] = $database; } - } catch (\Throwable $th) { - $failure[] = $database; + } catch (\Throwable) { + $failures[] = $database; } } } - if (!empty($failure)) { - throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failure)); + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failures)); } $response->dynamic(new Document([ @@ -131,11 +134,11 @@ App::get('/v1/health/cache') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCache', description: '/docs/references/health/get-cache.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -147,44 +150,39 @@ App::get('/v1/health/cache') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'Cache' => Config::getParam('pools-cache'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $cache) { try { - /** @var \Utopia\Cache\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new CachePool($pools->get($cache)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($cache)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $cache; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $cache; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Cache failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -196,11 +194,11 @@ App::get('/v1/health/pubsub') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getPubSub', description: '/docs/references/health/get-pubsub.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -212,44 +210,39 @@ App::get('/v1/health/pubsub') ->inject('response') ->inject('pools') ->action(function (Response $response, Group $pools) { - $output = []; + $failures = []; $configs = [ 'PubSub' => Config::getParam('pools-pubsub'), ]; foreach ($configs as $key => $config) { - foreach ($config as $database) { + foreach ($config as $pubsub) { try { - /** @var \Appwrite\PubSub\Adapter $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new PubSubPool($pools->get($pubsub)); $checkStart = \microtime(true); if ($adapter->ping()) { $output[] = new Document([ - 'name' => $key . " ($database)", + 'name' => $key . " ($pubsub)", 'status' => 'pass', 'ping' => \round((\microtime(true) - $checkStart) / 1000) ]); } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + $failures[] = $pubsub; } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); + } catch (\Throwable) { + $failures[] = $pubsub; } } } + if (!empty($failures)) { + throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Pubsub failure on: ' . implode(", ", $failures)); + } + $response->dynamic(new Document([ 'statuses' => $output, 'total' => count($output), @@ -261,11 +254,11 @@ App::get('/v1/health/time') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getTime', description: '/docs/references/health/get-time.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -325,11 +318,11 @@ App::get('/v1/health/queue/webhooks') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueWebhooks', description: '/docs/references/health/get-queue-webhooks.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -351,18 +344,18 @@ App::get('/v1/health/queue/webhooks') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/logs') ->desc('Get logs queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueLogs', description: '/docs/references/health/get-queue-logs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -384,18 +377,18 @@ App::get('/v1/health/queue/logs') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/certificate') ->desc('Get the SSL certificate for a domain') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getCertificate', description: '/docs/references/health/get-certificate.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -441,18 +434,18 @@ App::get('/v1/health/certificate') 'validTo' => $certificatePayload['validTo_time_t'], 'signatureTypeSN' => $certificatePayload['signatureTypeSN'], ]), Response::MODEL_HEALTH_CERTIFICATE); - }, ['response']); + }); App::get('/v1/health/queue/certificates') ->desc('Get certificates queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueCertificates', description: '/docs/references/health/get-queue-certificates.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -474,18 +467,18 @@ App::get('/v1/health/queue/certificates') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/builds') ->desc('Get builds queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueBuilds', description: '/docs/references/health/get-queue-builds.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -507,18 +500,18 @@ App::get('/v1/health/queue/builds') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/databases') ->desc('Get databases queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDatabases', description: '/docs/references/health/get-queue-databases.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -541,18 +534,18 @@ App::get('/v1/health/queue/databases') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/deletes') ->desc('Get deletes queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueDeletes', description: '/docs/references/health/get-queue-deletes.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -574,18 +567,18 @@ App::get('/v1/health/queue/deletes') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/mails') ->desc('Get mails queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMails', description: '/docs/references/health/get-queue-mails.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -607,18 +600,18 @@ App::get('/v1/health/queue/mails') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/messaging') ->desc('Get messaging queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMessaging', description: '/docs/references/health/get-queue-messaging.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -640,18 +633,18 @@ App::get('/v1/health/queue/messaging') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/migrations') ->desc('Get migrations queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueMigrations', description: '/docs/references/health/get-queue-migrations.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -673,18 +666,18 @@ App::get('/v1/health/queue/migrations') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/functions') ->desc('Get functions queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueFunctions', description: '/docs/references/health/get-queue-functions.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -706,18 +699,18 @@ App::get('/v1/health/queue/functions') } $response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE); - }, ['response']); + }); App::get('/v1/health/queue/stats-resources') ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueStatsResources', description: '/docs/references/health/get-queue-stats-resources.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -746,11 +739,11 @@ App::get('/v1/health/queue/stats-usage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getQueueUsage', description: '/docs/references/health/get-queue-stats-usage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -779,11 +772,11 @@ App::get('/v1/health/storage/local') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorageLocal', description: '/docs/references/health/get-storage-local.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -829,11 +822,11 @@ App::get('/v1/health/storage') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'storage', name: 'getStorage', description: '/docs/references/health/get-storage.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -877,11 +870,11 @@ App::get('/v1/health/anti-virus') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'health', name: 'getAntivirus', description: '/docs/references/health/get-storage-anti-virus.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, @@ -923,11 +916,11 @@ App::get('/v1/health/queue/failed/:name') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( - auth: [AuthType::KEY], namespace: 'health', group: 'queue', name: 'getFailedJobs', description: '/docs/references/health/get-failed-queue-jobs.md', + auth: [AuthType::KEY], responses: [ new SDKResponse( code: Response::STATUS_CODE_OK, diff --git a/app/controllers/api/projects.php b/app/controllers/api/projects.php index 839a51a764..c4f0b6a9df 100644 --- a/app/controllers/api/projects.php +++ b/app/controllers/api/projects.php @@ -24,6 +24,7 @@ use Utopia\App; use Utopia\Audit\Audit; use Utopia\Cache\Cache; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -223,7 +224,7 @@ App::post('/v1/projects') $sharedTables = $sharedTablesV1 || $sharedTablesV2; if (!$sharedTablesV2) { - $adapter = $pools->get($dsn->getHost())->pop()->getResource(); + $adapter = new DatabasePool($pools->get($dsn->getHost())); $dbForProject = new Database($adapter, $cache); if ($sharedTables) { diff --git a/app/http.php b/app/http.php index 963f550b8b..4d78837a8a 100644 --- a/app/http.php +++ b/app/http.php @@ -14,9 +14,11 @@ use Utopia\App; use Utopia\Audit\Audit; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; +use Utopia\Database\Exception\Duplicate as DuplicateException; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; @@ -167,7 +169,7 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co $sleep = 1; $attempts = 0; - do { + while (true) { try { $attempts++; $resource = $app->getResource($resourceKey); @@ -176,13 +178,12 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co break; // exit loop on success } catch (\Exception $e) { Console::warning(" └── Database not ready. Retrying connection ({$attempts})..."); - $pools->reclaim(); if ($attempts >= $max) { throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage()); } sleep($sleep); } - } while ($attempts < $max); + } Console::success("[Setup] - $dbName database init started..."); @@ -318,11 +319,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg $cache = $app->getResource('cache'); foreach ($sharedTablesV2 as $hostname) { - $adapter = $pools - ->get($hostname) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($hostname)); $dbForProject = (new Database($adapter, $cache)) ->setDatabase('appwrite') ->setSharedTables(true) @@ -332,7 +329,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg try { Console::success('[Setup] - Creating project database: ' . $hostname . '...'); $dbForProject->create(); - } catch (Duplicate) { + } catch (DuplicateException) { Console::success('[Setup] - Skip: metadata table already exists'); } @@ -358,7 +355,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg } } - $pools->reclaim(); Console::success('[Setup] - Server database init completed...'); }); @@ -473,6 +469,7 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool Console::error('[Error] Message: ' . $th->getMessage()); Console::error('[Error] File: ' . $th->getFile()); Console::error('[Error] Line: ' . $th->getLine()); + Console::error('[Error] Trace: ' . $th->getTraceAsString()); $swooleResponse->setStatusCode(500); @@ -490,8 +487,6 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool ]; $swooleResponse->end(\json_encode($output)); - } finally { - $pools->reclaim(); } }); diff --git a/app/init/registers.php b/app/init/registers.php index 1adaaf35ce..415730f936 100644 --- a/app/init/registers.php +++ b/app/init/registers.php @@ -216,13 +216,13 @@ $register->set('pools', function () { 'mysql', 'mariadb' => function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { return new PDOProxy(function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) { - return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, array( + return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, [ \PDO::ATTR_TIMEOUT => 3, // Seconds \PDO::ATTR_PERSISTENT => false, \PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC, \PDO::ATTR_EMULATE_PREPARES => true, \PDO::ATTR_STRINGIFY_FETCHES => true - )); + ]); }); }, 'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) { diff --git a/app/init/resources.php b/app/init/resources.php index c719a47344..f48efbe177 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -24,10 +24,12 @@ use Appwrite\Utopia\Request; use Executor\Executor; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; @@ -37,6 +39,7 @@ use Utopia\DSN\DSN; use Utopia\Locale\Locale; use Utopia\Logger\Log; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\Storage\Device; use Utopia\Storage\Device\AWS; @@ -72,10 +75,10 @@ App::setResource('localeCodes', function () { // Queues App::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); App::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); @@ -329,12 +332,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $database ->setMetadata('host', \gethostname()) @@ -360,12 +359,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform }, ['pools', 'dbForPlatform', 'cache', 'project']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, $cache); $database ->setNamespace('_console') @@ -378,7 +373,7 @@ App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { }, ['pools', 'cache']); App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) { - $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + $databases = []; return function (Document $project) use ($pools, $dbForPlatform, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { @@ -420,12 +415,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; $configure($database); @@ -435,21 +426,15 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform App::setResource('getLogsDB', function (Group $pools, Cache $cache) { $database = null; - return function (?Document $project = null) use ($pools, $cache, $database) { + + return function (?Document $project = null) use ($pools, $cache, &$database) { if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') { $database->setTenant($project->getInternalId()); return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -473,10 +458,7 @@ App::setResource('cache', function (Group $pools, Telemetry $telemetry) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource(); + $adapters[] = new CachePool($pools->get($value)); } $cache = new Cache(new Sharding($adapters)); diff --git a/app/realtime.php b/app/realtime.php index 86f9c85fdd..7e6fc0e311 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -5,6 +5,7 @@ use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Http\Request as SwooleRequest; @@ -15,10 +16,12 @@ use Swoole\Timer; use Utopia\Abuse\Abuse; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -28,13 +31,15 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\DSN\DSN; use Utopia\Logger\Log; +use Utopia\Pools\Group; +use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; use Utopia\WebSocket\Adapter; use Utopia\WebSocket\Server; /** - * @var \Utopia\Registry\Registry $register + * @var Registry $register */ require_once __DIR__ . '/init.php'; @@ -46,17 +51,17 @@ if (!function_exists('getConsoleDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $database = null; + + if ($database !== null) { + return $database; + } + + /** @var Group $pools */ $pools = $register->get('pools'); - $dbAdapter = $pools - ->get('console') - ->pop() - ->getResource() - ; - - $database = new Database($dbAdapter, getCache()); - + $adapter = new DatabasePool($pools->get('console')); + $database = new Database($adapter, getCache()); $database ->setNamespace('_console') ->setMetadata('host', \gethostname()) @@ -72,7 +77,13 @@ if (!function_exists('getProjectDB')) { { global $register; - /** @var \Utopia\Pools\Group $pools */ + static $databases = []; + + if (isset($databases[$project->getInternalId()])) { + return $databases[$project->getInternalId()]; + } + + /** @var Group $pools */ $pools = $register->get('pools'); if ($project->isEmpty() || $project->getId() === 'console') { @@ -86,11 +97,7 @@ if (!function_exists('getProjectDB')) { $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, getCache()); $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); @@ -111,7 +118,7 @@ if (!function_exists('getProjectDB')) { ->setMetadata('host', \gethostname()) ->setMetadata('project', $project->getId()); - return $database; + return $databases[$project->getInternalId()] = $database; } } @@ -121,20 +128,22 @@ if (!function_exists('getCache')) { { global $register; - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + static $cache = null; + + if ($cache !== null) { + return $cache; + } + + $pools = $register->get('pools'); /** @var Group $pools */ $list = Config::getParam('pools-cache', []); $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } - return new Cache(new Sharding($adapters)); + return $cache = new Cache(new Sharding($adapters)); } } @@ -142,6 +151,12 @@ if (!function_exists('getCache')) { if (!function_exists('getRedis')) { function getRedis(): \Redis { + static $redis = null; + + if ($redis !== null) { + return $redis; + } + $host = System::getEnv('_APP_REDIS_HOST', 'localhost'); $port = System::getEnv('_APP_REDIS_PORT', 6379); $pass = System::getEnv('_APP_REDIS_PASS', ''); @@ -160,21 +175,39 @@ if (!function_exists('getRedis')) { if (!function_exists('getTimelimit')) { function getTimelimit(): TimeLimitRedis { - return new TimeLimitRedis("", 0, 1, getRedis()); + static $timelimit = null; + + if ($timelimit !== null) { + return $timelimit; + } + + return $timelimit = new TimeLimitRedis("", 0, 1, getRedis()); } } if (!function_exists('getRealtime')) { function getRealtime(): Realtime { - return new Realtime(); + static $realtime = null; + + if ($realtime !== null) { + return $realtime; + } + + return $realtime = new Realtime(); } } if (!function_exists('getTelemetry')) { function getTelemetry(int $workerId): Utopia\Telemetry\Adapter { - return new NoTelemetry(); + static $telemetry = null; + + if ($telemetry !== null) { + return $telemetry; + } + + return $telemetry = new NoTelemetry(); } } @@ -273,7 +306,6 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume sleep(DATABASE_RECONNECT_SLEEP); } } while (true); - $register->get('pools')->reclaim(); }); /** @@ -299,9 +331,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument)); } catch (Throwable $th) { - call_user_func($logError, $th, "updateWorkerDocument"); - } finally { - $register->get('pools')->reclaim(); + $logError($th, "updateWorkerDocument"); } }); } @@ -370,8 +400,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, 'data' => $event['data'] ])); } - - $register->get('pools')->reclaim(); } } /** @@ -407,8 +435,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - /** @var \Appwrite\PubSub\Adapter $pubsub */ - $pubsub = $register->get('pools')->get('pubsub')->pop()->getResource(); + $pubsub = new PubSubPool($register->get('pools')->get('pubsub')); + if ($pubsub->ping(true)) { $attempts = 0; Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); @@ -436,8 +464,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $realtime->unsubscribe($connection); $realtime->subscribe($projectId, $connection, $roles, $channels); - - $register->get('pools')->reclaim(); } } @@ -463,14 +489,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } }); } catch (Throwable $th) { - call_user_func($logError, $th, "pubSubConnection"); + $logError($th, "pubSubConnection"); Console::error('Pub/sub error: ' . $th->getMessage()); $attempts++; sleep(DATABASE_RECONNECT_SLEEP); continue; - } finally { - $register->get('pools')->reclaim(); } } @@ -572,7 +596,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $stats->incr($project->getId(), 'connections'); $stats->incr($project->getId(), 'connectionsTotal'); } catch (Throwable $th) { - call_user_func($logError, $th, "initServer"); + $logError($th, "initServer"); // Handle SQL error code is 'HY000' $code = $th->getCode(); @@ -596,8 +620,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, Console::error('[Error] Code: ' . $response['data']['code']); Console::error('[Error] Message: ' . $response['data']['message']); } - } finally { - $register->get('pools')->reclaim(); } }); @@ -696,8 +718,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re if ($th->getCode() === 1008) { $server->close($connection, $th->getCode()); } - } finally { - $register->get('pools')->reclaim(); } }); diff --git a/app/worker.php b/app/worker.php index 232e0b3684..1ae2108a62 100644 --- a/app/worker.php +++ b/app/worker.php @@ -20,10 +20,12 @@ use Appwrite\Platform\Appwrite; use Executor\Executor; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; @@ -33,6 +35,7 @@ use Utopia\Logger\Log; use Utopia\Logger\Logger; use Utopia\Platform\Service; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Server; @@ -40,21 +43,17 @@ use Utopia\Registry\Registry; use Utopia\System\System; Authorization::disable(); -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(); Server::setResource('register', fn () => $register); Server::setResource('dbForPlatform', function (Cache $cache, Registry $register) { $pools = $register->get('pools'); - $database = $pools - ->get('console') - ->pop() - ->getResource(); + $adapter = new DatabasePool($pools->get('console')); + $dbForPlatform = new Database($adapter, $cache); + $dbForPlatform->setNamespace('_console'); - $adapter = new Database($database, $cache); - $adapter->setNamespace('_console'); - - return $adapter; + return $dbForPlatform; }, ['cache', 'register']); Server::setResource('project', function (Message $message, Database $dbForPlatform) { @@ -82,20 +81,9 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register, $dsn = new DSN('mysql://' . $project->getAttribute('database')); } - $adapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - + $adapter = new DatabasePool($pools->get($dsn->getHost())); $database = new Database($adapter, $cache); - try { - $dsn = new DSN($project->getAttribute('database')); - } catch (\InvalidArgumentException) { - // TODO: Temporary until all projects are using shared tables - $dsn = new DSN('mysql://' . $project->getAttribute('database')); - } - $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); if (\in_array($dsn->getHost(), $sharedTables)) { @@ -150,12 +138,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf return $database; } - $dbAdapter = $pools - ->get($dsn->getHost()) - ->pop() - ->getResource(); - - $database = new Database($dbAdapter, $cache); + $adapter = new DatabasePool($pools->get($dsn->getHost())); + $database = new Database($adapter, $cache); $databases[$dsn->getHost()] = $database; @@ -187,15 +171,8 @@ Server::setResource('getLogsDB', function (Group $pools, Cache $cache) { return $database; } - $dbAdapter = $pools - ->get('logs') - ->pop() - ->getResource(); - - $database = new Database( - $dbAdapter, - $cache - ); + $adapter = new DatabasePool($pools->get('logs')); + $database = new Database($adapter, $cache); $database ->setSharedTables(true) @@ -233,11 +210,7 @@ Server::setResource('cache', function (Registry $register) { $adapters = []; foreach ($list as $value) { - $adapters[] = $pools - ->get($value) - ->pop() - ->getResource() - ; + $adapters[] = new CachePool($pools->get($value)); } return new Cache(new Sharding($adapters)); @@ -267,11 +240,11 @@ Server::setResource('timelimit', function (\Redis $redis) { Server::setResource('log', fn () => new Log()); Server::setResource('publisher', function (Group $pools) { - return $pools->get('publisher')->pop()->getResource(); + return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); Server::setResource('consumer', function (Group $pools) { - return $pools->get('consumer')->pop()->getResource(); + return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); Server::setResource('queueForStatsUsage', function (Publisher $publisher) { @@ -448,13 +421,6 @@ try { $worker = $platform->getWorker(); -$worker - ->shutdown() - ->inject('pools') - ->action(function (Group $pools) { - $pools->reclaim(); - }); - $worker ->error() ->inject('error') @@ -462,8 +428,7 @@ $worker ->inject('log') ->inject('pools') ->inject('project') - ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($queueName) { - $pools->reclaim(); + ->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($worker, $queueName) { $version = System::getEnv('_APP_VERSION', 'UNKNOWN'); if ($logger) { diff --git a/composer.json b/composer.json index 9b2cf7a1ab..914e664d7f 100644 --- a/composer.json +++ b/composer.json @@ -65,7 +65,7 @@ "utopia-php/platform": "0.7.*", "utopia-php/pools": "0.8.*", "utopia-php/preloader": "0.2.*", - "utopia-php/queue": "0.9.*", + "utopia-php/queue": "0.10.*", "utopia-php/registry": "0.5.*", "utopia-php/storage": "0.18.*", "utopia-php/swoole": "0.8.*", diff --git a/composer.lock b/composer.lock index e6bcd919e2..d04d67366e 100644 --- a/composer.lock +++ b/composer.lock @@ -4059,16 +4059,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.4", + "version": "0.7.5", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "a5b93d8177702ec458c3af9137663133c012b71b" + "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/a5b93d8177702ec458c3af9137663133c012b71b", - "reference": "a5b93d8177702ec458c3af9137663133c012b71b", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", + "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", "shasum": "" }, "require": { @@ -4077,11 +4077,11 @@ "php": ">=8.0", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", - "utopia-php/queue": "0.9.*" + "utopia-php/queue": "0.10.*" }, "require-dev": { - "laravel/pint": "1.2.*", - "phpunit/phpunit": "^9.3" + "laravel/pint": "1.*", + "phpunit/phpunit": "9.*" }, "type": "library", "autoload": { @@ -4103,9 +4103,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.4" + "source": "https://github.com/utopia-php/platform/tree/0.7.5" }, - "time": "2025-03-13T13:00:12+00:00" + "time": "2025-04-17T12:20:16+00:00" }, { "name": "utopia-php/pools", @@ -4214,16 +4214,16 @@ }, { "name": "utopia-php/queue", - "version": "0.9.1", + "version": "0.10.0", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32" + "reference": "0eccc559168ea72241c39a4c482d868314666be1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32", - "reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/0eccc559168ea72241c39a4c482d868314666be1", + "reference": "0eccc559168ea72241c39a4c482d868314666be1", "shasum": "" }, "require": { @@ -4232,6 +4232,7 @@ "utopia-php/cli": "0.15.*", "utopia-php/fetch": "0.4.*", "utopia-php/framework": "0.33.*", + "utopia-php/pools": "0.8.*", "utopia-php/telemetry": "0.1.*" }, "require-dev": { @@ -4273,9 +4274,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/0.9.1" + "source": "https://github.com/utopia-php/queue/tree/0.10.0" }, - "time": "2025-03-28T19:49:36+00:00" + "time": "2025-04-17T12:15:52+00:00" }, { "name": "utopia-php/registry", diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index d699a45417..2c735ef2d4 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -286,13 +286,6 @@ class Event return $this; } - public function setParamSensitive(string $key): self - { - $this->sensitive[$key] = true; - - return $this; - } - /** * Get param of event. * diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 96793b2683..be263aa655 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -2,14 +2,14 @@ namespace Appwrite\Messaging\Adapter; -use Appwrite\Messaging\Adapter; +use Appwrite\Messaging\Adapter as MessagingAdapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; -use Utopia\Pools\Pool; -class Realtime extends Adapter +class Realtime extends MessagingAdapter { /** * Connection Tree @@ -132,11 +132,12 @@ class Realtime extends Adapter * Sends an event to the Realtime Server * @param string $projectId * @param array $payload - * @param string $event + * @param array $events * @param array $channels * @param array $roles * @param array $options * @return void + * @throws \Exception */ public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void { @@ -158,9 +159,7 @@ class Realtime extends Adapter 'timestamp' => DateTime::formatTz(DateTime::now()), 'payload' => $payload ] - ]; - - $this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message))); + ])); } /** @@ -175,8 +174,9 @@ class Realtime extends Adapter * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions * * @param array $event + * @return int[]|string[] */ - public function getSubscribers(array $event) + public function getSubscribers(array $event): array { $receivers = []; @@ -230,7 +230,7 @@ class Realtime extends Adapter foreach ($channels as $key => $value) { switch (true) { - case strpos($key, 'account.') === 0: + case \str_starts_with($key, 'account.'): unset($channels[$key]); break; @@ -272,6 +272,7 @@ class Realtime extends Adapter $channels[] = 'account.' . $parts[1]; $roles = [Role::user(ID::custom($parts[1]))->toString()]; break; + case 'migrations': case 'rules': $channels[] = 'console'; $channels[] = 'projects.' . $project->getId(); @@ -352,12 +353,6 @@ class Realtime extends Adapter $roles = [Role::team($project->getAttribute('teamId'))->toString()]; } - break; - case 'migrations': - $channels[] = 'console'; - $channels[] = 'projects.' . $project->getId(); - $projectId = 'console'; - $roles = [Role::team($project->getAttribute('teamId'))->toString()]; break; } diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index c43afea527..5263133eba 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,14 +3,19 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; -use Appwrite\PubSub\Adapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; +use PHPMailer\PHPMailer\PHPMailer; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\CLI\Console; use Utopia\Config\Config; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Domains\Domain; use Utopia\DSN\DSN; use Utopia\Logger\Logger; use Utopia\Platform\Action; +use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Registry\Registry; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; @@ -76,9 +81,9 @@ class Doctor extends Action Console::log('🟢 Abuse protection is enabled'); } - $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT', null); - $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS', null); - $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS', null); + $authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT'); + $authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS'); + $authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS'); if ( empty($authWhitelistRoot) @@ -114,19 +119,16 @@ class Doctor extends Action } else { Console::log('🟢 Logging adapter is enabled (' . $providerName . ')'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::log('🔴 Logging adapter is misconfigured'); } \usleep(200 * 1000); // Sleep for 0.2 seconds - try { - Console::log("\n" . '[Connectivity]'); - } catch (\Throwable $th) { - //throw $th; - } + Console::log("\n" . '[Connectivity]'); - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); $configs = [ 'Console.DB' => Config::getParam('pools-console'), @@ -136,20 +138,22 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $adapter = $pools->get($database)->pop()->getResource(); + $adapter = new DatabasePool($pools->get($database)); if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected'); } } } - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ + /** @var Group $pools */ + $pools = $register->get('pools'); + $configs = [ 'Cache' => Config::getParam('pools-cache'), 'Queue' => Config::getParam('pools-queue'), @@ -159,15 +163,18 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { - /** @var Adapter $adapter */ - $adapter = $pools->get($pool)->pop()->getResource(); + $adapter = match($key) { + 'Cache' => new CachePool($pools->get($pool)), + 'Queue' => new BrokerPool($pools->get($pool)), + 'PubSub' => new PubSubPool($pools->get($pool)), + }; if ($adapter->ping()) { Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected'); } else { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } } @@ -185,13 +192,14 @@ class Doctor extends Action } else { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected'); } } try { - $mail = $register->get('smtp'); /* @var $mail \PHPMailer\PHPMailer\PHPMailer */ + /* @var PHPMailer $mail */ + $mail = $register->get('smtp'); $mail->addAddress('demo@example.com', 'Example.com'); $mail->Subject = 'Test SMTP Connection'; @@ -200,7 +208,7 @@ class Doctor extends Action $mail->send(); Console::success('🟢 ' . str_pad("SMTP", 50, '.') . 'connected'); - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('🔴 ' . str_pad("SMTP", 47, '.') . 'disconnected'); } @@ -274,7 +282,7 @@ class Doctor extends Action Console::error('Failed to check for a newer version' . "\n"); } } - } catch (\Throwable $th) { + } catch (\Throwable) { Console::error('Failed to check for a newer version' . "\n"); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index afd7d9d22a..3e9907f877 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -13,6 +13,7 @@ use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Gauge; @@ -25,6 +26,8 @@ abstract class ScheduleBase extends Action protected array $schedules = []; + protected BrokerPool $publisher; + private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; private ?Gauge $scheduleTelemetryCount = null; @@ -71,6 +74,7 @@ abstract class ScheduleBase extends Action Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + $this->publisher = new BrokerPool($pools->get('publisher')); $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); @@ -122,8 +126,6 @@ abstract class ScheduleBase extends Action $schedule->getAttribute('resourceId') ); - $pools->reclaim(); - return [ '$internalId' => $schedule->getInternalId(), '$id' => $schedule->getId(), diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 89d1609a33..331928d265 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -3,7 +3,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; -use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; @@ -29,9 +28,6 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForFunctions = new Func($connection); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -59,8 +55,10 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { - Co::sleep($delay); + \go(function () use ($schedule, $delay, $data, $pools) { + \Co::sleep($delay); + + $queueForFunctions = new Func($this->publisher); $queueForFunctions->setType('schedule') // Set functionId instead of function as we don't have $dbForProject @@ -85,7 +83,5 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } - - $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6788748f3d..649fdb333a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -75,12 +75,9 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } - foreach ($delayedExecutions as $delay => $schedules) { - \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { - \sleep($delay); // in seconds - - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { + \Co::sleep($delay); // in seconds foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; @@ -93,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($this->publisher); $queueForFunctions ->setType('schedule') @@ -105,8 +102,6 @@ class ScheduleFunctions extends ScheduleBase $this->recordEnqueueDelay($delayConfig['nextDate']); } - - $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index a15df6ed5b..4f7dfab6a0 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,10 +40,8 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { - $queue = $pools->get('publisher')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); + \go(function () use ($schedule, $pools, $dbForPlatform) { + $queueForMessaging = new Messaging($this->publisher); $this->updateProjectAccess($schedule['project'], $dbForPlatform); @@ -58,7 +56,6 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); - $queue->reclaim(); $this->recordEnqueueDelay($scheduledAt); unset($this->schedules[$schedule['$internalId']]); }); diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index a61db63de6..427772a6e0 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -61,10 +61,7 @@ class Deletes extends Action ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback( - fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log) => - $this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executor, $executionRetention, $auditRetention, $log) - ); + ->callback([$this, 'action']); } /** diff --git a/src/Appwrite/Platform/Workers/StatsUsage.php b/src/Appwrite/Platform/Workers/StatsUsage.php index 66f285fcf5..60fab5d2ea 100644 --- a/src/Appwrite/Platform/Workers/StatsUsage.php +++ b/src/Appwrite/Platform/Workers/StatsUsage.php @@ -325,7 +325,7 @@ class StatsUsage extends Action break; } } catch (Throwable $e) { - console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); + Console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}"); } } @@ -344,7 +344,7 @@ class StatsUsage extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { foreach ($stats['keys'] ?? [] as $key => $value) { @@ -381,7 +381,7 @@ class StatsUsage extends Action } } } catch (Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } @@ -405,7 +405,7 @@ class StatsUsage extends Action } - protected function prepareForLogsDB(Document $project, Document $stat) + protected function prepareForLogsDB(Document $project, Document $stat): void { if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') { return; @@ -430,8 +430,7 @@ class StatsUsage extends Action return; } - $dbForLogs = call_user_func($this->getLogsDB); - $dbForLogs + $dbForLogs = ($this->getLogsDB)() ->setTenant(null) ->setTenantPerDocument(true); @@ -446,6 +445,5 @@ class StatsUsage extends Action } catch (Throwable $th) { Console::error($th->getMessage()); } - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/Platform/Workers/StatsUsageDump.php b/src/Appwrite/Platform/Workers/StatsUsageDump.php index b9d486e0d8..77ec3f13e6 100644 --- a/src/Appwrite/Platform/Workers/StatsUsageDump.php +++ b/src/Appwrite/Platform/Workers/StatsUsageDump.php @@ -70,9 +70,9 @@ class StatsUsageDump extends Action ]; /** - * @var callable + * @var callable(Document): Database */ - protected mixed $getLogsDB; + protected $getLogsDB; protected array $periods = [ '1h' => 'Y-m-d H:00', @@ -126,10 +126,10 @@ class StatsUsageDump extends Action continue; } - console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); + Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys); try { - /** @var \Utopia\Database\Database $dbForProject */ + /** @var Database $dbForProject */ $dbForProject = $getProjectDB($project); foreach ($stats['keys'] ?? [] as $key => $value) { if ($value == 0) { @@ -169,7 +169,7 @@ class StatsUsageDump extends Action } } } catch (\Exception $e) { - console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); + Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage()); } } } @@ -190,8 +190,7 @@ class StatsUsageDump extends Action } } - /** @var \Utopia\Database\Database $dbForLogs*/ - $dbForLogs = call_user_func($this->getLogsDB, $project); + $dbForLogs = ($this->getLogsDB)($project); try { $dbForLogs->createOrUpdateDocumentsWithIncrease( @@ -203,7 +202,5 @@ class StatsUsageDump extends Action } catch (\Throwable $th) { Console::error($th->getMessage()); } - - $this->register->get('pools')->get('logs')->reclaim(); } } diff --git a/src/Appwrite/PubSub/Adapter/Pool.php b/src/Appwrite/PubSub/Adapter/Pool.php new file mode 100644 index 0000000000..a498118dae --- /dev/null +++ b/src/Appwrite/PubSub/Adapter/Pool.php @@ -0,0 +1,46 @@ +delegate(__FUNCTION__, \func_get_args()); + } + + public function subscribe($channels, $callback): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + public function publish($channel, $message): void + { + $this->delegate(__FUNCTION__, \func_get_args()); + } + + /** + * Forward method calls to the internal adapter instance via the pool. + * + * Required because __call() can't be used to implement abstract methods. + * + * @param string $method + * @param array $args + * @return mixed + * @throws DatabaseException + */ + public function delegate(string $method, array $args): mixed + { + return $this->pool->use(function (Adapter $adapter) use ($method, $args) { + return $adapter->{$method}(...$args); + }); + } +} diff --git a/tests/resources/docker/docker-compose.yml b/tests/resources/docker/docker-compose.yml index e549ac27a5..779d63d6ed 100644 --- a/tests/resources/docker/docker-compose.yml +++ b/tests/resources/docker/docker-compose.yml @@ -35,7 +35,7 @@ services: - VERSION=dev restart: unless-stopped ports: - - 9501:80 + - "9501:80" networks: - appwrite labels: @@ -52,15 +52,12 @@ services: - ./phpunit.xml:/usr/src/code/phpunit.xml - ./tests:/usr/src/code/tests - ./app:/usr/src/code/app - # - ./vendor:/usr/src/code/vendor - ./docs:/usr/src/code/docs - ./public:/usr/src/code/public - ./src:/usr/src/code/src - - ./debug:/tmp depends_on: - mariadb - redis - # - clamav environment: - _APP_COMPRESSION_MIN_SIZE_BYTES - _APP_ENV @@ -355,33 +352,6 @@ services: volumes: - appwrite-redis:/data:rw - # clamav: - # image: appwrite/clamav:1.2.0 - # container_name: appwrite-clamav - # restart: unless-stopped - # networks: - # - appwrite - # volumes: - # - appwrite-uploads:/storage/uploads - - - # redis-commander: - # image: rediscommander/redis-commander:latest - # restart: unless-stopped - # networks: - # - appwrite - # environment: - # - REDIS_HOSTS=redis - # ports: - # - "8081:8081" - - # webgrind: - # image: 'jokkedk/webgrind:latest' - # volumes: - # - './debug:/tmp' - # ports: - # - '3001:80' - networks: gateway: appwrite: From af3388a51f9226455f0924b596047eb97f84c5d6 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:16:54 +1200 Subject: [PATCH 5/9] Format --- app/cli.php | 3 ++- composer.lock | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/app/cli.php b/app/cli.php index 61d8a90100..663423b57a 100644 --- a/app/cli.php +++ b/app/cli.php @@ -10,6 +10,7 @@ use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; use Executor\Executor; +use Swoole\Timer; use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; @@ -29,7 +30,7 @@ use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; use Utopia\Telemetry\Adapter\None as NoTelemetry; -use Swoole\Timer; + use function Swoole\Coroutine\run; // Overwriting runtimes to be architecture agnostic for CLI diff --git a/composer.lock b/composer.lock index d04d67366e..945948739a 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1eeb5a0f3560aefd8f71bd0955e30360", + "content-hash": "24b02f9535e6a202fde735137f0ec900", "packages": [ { "name": "adhocore/jwt", From 09152a08e1756620fcc18072e7ff96b54876ec76 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 18:27:51 +1200 Subject: [PATCH 6/9] Fix merge --- src/Appwrite/Platform/Tasks/ScheduleFunctions.php | 6 +++--- src/Appwrite/Platform/Workers/Deletes.php | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 649fdb333a..19e068107a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -75,9 +75,9 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } - foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { - \Co::sleep($delay); // in seconds + foreach ($delayedExecutions as $delay => $schedules) { + \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { + \sleep($delay); // in seconds foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 427772a6e0..d47ba9cb15 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -61,7 +61,7 @@ class Deletes extends Action ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback([$this, 'action']); + ->callback($this->action(...)); } /** From 33fbab7e3d0128afd17a9bdec1f50da4d1377f90 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 21:28:59 +1200 Subject: [PATCH 7/9] Fix missing capture --- src/Appwrite/Platform/Tasks/ScheduleMessages.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 4f7dfab6a0..5e65f7a8a6 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForPlatform) { + \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { $queueForMessaging = new Messaging($this->publisher); $this->updateProjectAccess($schedule['project'], $dbForPlatform); From 55236524e336b8108b94e6e8c0d332dce6db9735 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 14 May 2025 21:42:39 +1200 Subject: [PATCH 8/9] Fix merge --- src/Appwrite/Platform/Tasks/ScheduleExecutions.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 331928d265..99c84e829a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; +use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; @@ -28,6 +29,7 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { + $queueForFunctions = new Func($this->publisher); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -55,10 +57,8 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($schedule, $delay, $data, $pools) { - \Co::sleep($delay); - - $queueForFunctions = new Func($this->publisher); + \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { + Co::sleep($delay); $queueForFunctions->setType('schedule') // Set functionId instead of function as we don't have $dbForProject From 149dfb965153d8f51126017e1f3a2456fb53241c Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 20 May 2025 20:07:11 +1200 Subject: [PATCH 9/9] Update lock --- composer.lock | 50 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/composer.lock b/composer.lock index c52ecba812..432676ca3e 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "24b02f9535e6a202fde735137f0ec900", + "content-hash": "63feb1a7cf4cfa2cc7fa0870236e61ea", "packages": [ { "name": "adhocore/jwt", @@ -3499,16 +3499,16 @@ }, { "name": "utopia-php/database", - "version": "0.69.2", + "version": "0.69.5", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "60591ab073bb80bb9843338754b679bb8169e4ed" + "reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/60591ab073bb80bb9843338754b679bb8169e4ed", - "reference": "60591ab073bb80bb9843338754b679bb8169e4ed", + "url": "https://api.github.com/repos/utopia-php/database/zipball/4abe53609dfc23b2ea82884d12b149df6a8af2f5", + "reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5", "shasum": "" }, "require": { @@ -3549,9 +3549,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/0.69.2" + "source": "https://github.com/utopia-php/database/tree/0.69.5" }, - "time": "2025-05-14T07:51:44+00:00" + "time": "2025-05-17T08:01:51+00:00" }, { "name": "utopia-php/domains", @@ -3701,16 +3701,16 @@ }, { "name": "utopia-php/framework", - "version": "0.33.19", + "version": "0.33.20", "source": { "type": "git", "url": "https://github.com/utopia-php/http.git", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0" + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/http/zipball/64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", - "reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0", + "url": "https://api.github.com/repos/utopia-php/http/zipball/e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", + "reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131", "shasum": "" }, "require": { @@ -3742,9 +3742,9 @@ ], "support": { "issues": "https://github.com/utopia-php/http/issues", - "source": "https://github.com/utopia-php/http/tree/0.33.19" + "source": "https://github.com/utopia-php/http/tree/0.33.20" }, - "time": "2025-03-06T11:37:49+00:00" + "time": "2025-05-18T23:51:21+00:00" }, { "name": "utopia-php/image", @@ -4059,16 +4059,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.5", + "version": "0.7.6", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf" + "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", - "reference": "8febd7b6e0c0f2cbd2f4289447bcca97496e4aaf", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", + "reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b", "shasum": "" }, "require": { @@ -4103,9 +4103,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.5" + "source": "https://github.com/utopia-php/platform/tree/0.7.6" }, - "time": "2025-04-17T12:20:16+00:00" + "time": "2025-05-18T20:31:24+00:00" }, { "name": "utopia-php/pools", @@ -4771,16 +4771,16 @@ "packages-dev": [ { "name": "appwrite/sdk-generator", - "version": "0.40.16", + "version": "0.40.17", "source": { "type": "git", "url": "https://github.com/appwrite/sdk-generator.git", - "reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d" + "reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/f1f506da74033f0cb5a11e3dffcfd1ee8daf237d", - "reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d", + "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de", + "reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de", "shasum": "" }, "require": { @@ -4816,9 +4816,9 @@ "description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms", "support": { "issues": "https://github.com/appwrite/sdk-generator/issues", - "source": "https://github.com/appwrite/sdk-generator/tree/0.40.16" + "source": "https://github.com/appwrite/sdk-generator/tree/0.40.17" }, - "time": "2025-05-09T12:06:09+00:00" + "time": "2025-05-16T15:10:54+00:00" }, { "name": "doctrine/annotations",