From 02b1c822260162f66c25194d8108d88eb55ff11b Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 29 Apr 2025 21:28:53 +1200 Subject: [PATCH] Remove all remaining pool usages --- src/Appwrite/Platform/Tasks/Doctor.php | 38 +++++++++++-------- .../Platform/Tasks/ScheduleExecutions.php | 33 ++++++++-------- .../Platform/Tasks/ScheduleFunctions.php | 24 ++++++------ .../Platform/Tasks/ScheduleMessages.php | 19 +++++----- 4 files changed, 60 insertions(+), 54 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index fba636028b..5263133eba 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,17 +3,19 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; -use Appwrite\PubSub\Adapter as PubSubAdapter; +use Appwrite\PubSub\Adapter\Pool as PubSubPool; use PHPMailer\PHPMailer\PHPMailer; use Utopia\App; +use Utopia\Cache\Adapter\Pool as CachePool; use Utopia\CLI\Console; use Utopia\Config\Config; -use Utopia\Database\Adapter as DatabaseAdapter; +use Utopia\Database\Adapter\Pool as DatabasePool; use Utopia\Domains\Domain; use Utopia\DSN\DSN; use Utopia\Logger\Logger; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Registry\Registry; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; @@ -136,13 +138,13 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $database) { try { - $pools->get($database)->use(function (DatabaseAdapter $adapter) use ($key, $database) { - if ($adapter->ping()) { - Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected'); - } else { - Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected'); - } - }); + $adapter = new DatabasePool($pools->get($database)); + + if ($adapter->ping()) { + Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected'); + } else { + Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected'); + } } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected'); } @@ -161,13 +163,17 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { - $pools->get($pool)->use(function (PubSubAdapter $adapter) use ($key, $pool) { - if ($adapter->ping()) { - Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected'); - } else { - Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); - } - }); + $adapter = match($key) { + 'Cache' => new CachePool($pools->get($pool)), + 'Queue' => new BrokerPool($pools->get($pool)), + 'PubSub' => new PubSubPool($pools->get($pool)), + }; + + if ($adapter->ping()) { + Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected'); + } else { + Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); + } } catch (\Throwable) { Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected'); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index f1a4c3ed79..ffde4f6df8 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -3,9 +3,9 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; -use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; class ScheduleExecutions extends ScheduleBase @@ -58,24 +58,23 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); \go(function () use ($schedule, $delay, $data, $pools) { - Co::sleep($delay); + \Co::sleep($delay); - $pools->get('publisher')->use(function (Publisher $publisher) use ($schedule, $data) { - $queueForFunctions = new Func($publisher); + $publisher = new BrokerPool($pools->get('publisher')); - $queueForFunctions->setType('schedule') - // Set functionId instead of function as we don't have $dbForProject - // TODO: Refactor to use function instead of functionId - ->setFunctionId($schedule['resource']['functionId']) - ->setExecution($schedule['resource']) - ->setMethod($data['method'] ?? 'POST') - ->setPath($data['path'] ?? '/') - ->setHeaders($data['headers'] ?? []) - ->setBody($data['body'] ?? '') - ->setProject($schedule['project']) - ->setUserId($data['userId'] ?? '') - ->trigger(); - }); + $queueForFunctions = new Func($publisher); + + $queueForFunctions->setType('schedule') + // Set functionId instead of function as we don't have $dbForProject + // TODO: Refactor to use function instead of functionId + ->setFunctionId($schedule['resource']['functionId']) + ->setExecution($schedule['resource']) + ->setMethod($data['method'] ?? 'POST') + ->setPath($data['path'] ?? '/') + ->setHeaders($data['headers'] ?? []) + ->setBody($data['body'] ?? '') + ->setProject($schedule['project']) + ->setUserId($data['userId'] ?? ''); }); $dbForPlatform->deleteDocument( diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 328c9f894c..9555d166b4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -8,6 +8,7 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; class ScheduleFunctions extends ScheduleBase @@ -72,8 +73,7 @@ class ScheduleFunctions extends ScheduleBase foreach ($delayedExecutions as $delay => $scheduleKeys) { \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { - \sleep($delay); // in seconds - + \Co::sleep($delay); // in seconds foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted @@ -85,17 +85,17 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $pools->get('publisher')->use(function (Publisher $publisher) use ($schedule) { - $queueForFunctions = new Func($publisher); + $publisher = new BrokerPool($pools->get('publisher')); - $queueForFunctions - ->setType('schedule') - ->setFunction($schedule['resource']) - ->setMethod('POST') - ->setPath('/') - ->setProject($schedule['project']) - ->trigger(); - }); + $queueForFunctions = new Func($publisher); + + $queueForFunctions + ->setType('schedule') + ->setFunction($schedule['resource']) + ->setMethod('POST') + ->setPath('/') + ->setProject($schedule['project']) + ->trigger(); } }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 75b4a34b87..508c442359 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; class ScheduleMessages extends ScheduleBase @@ -42,17 +43,17 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $pools, $dbForPlatform) { - $pools->get('publisher')->use(function (Publisher $publisher) use ($schedule, $dbForPlatform) { - $queueForMessaging = new Messaging($publisher); + $publisher = new BrokerPool($pools->get('publisher')); - $this->updateProjectAccess($schedule['project'], $dbForPlatform); + $queueForMessaging = new Messaging($publisher); - $queueForMessaging - ->setType(MESSAGE_SEND_TYPE_EXTERNAL) - ->setMessageId($schedule['resourceId']) - ->setProject($schedule['project']) - ->trigger(); - }); + $this->updateProjectAccess($schedule['project'], $dbForPlatform); + + $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($schedule['resourceId']) + ->setProject($schedule['project']) + ->trigger(); $dbForPlatform->deleteDocument( 'schedules',