diff --git a/app/cli.php b/app/cli.php index f8a2d6ef87..0b76afc2e1 100644 --- a/app/cli.php +++ b/app/cli.php @@ -163,6 +163,10 @@ CLI::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); +CLI::setResource('poolForQueue', function (Group $pools) { + return $pools->get('queue'); +}, ['pools']); + CLI::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index a1b85c341f..a7015405ab 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,6 +11,7 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Pools\Pool; use Utopia\System\System; use function Swoole\Coroutine\run; @@ -25,7 +26,7 @@ abstract class ScheduleBase extends Action abstract public static function getName(): string; abstract public static function getSupportedResource(): string; abstract public static function getCollectionId(): string; - abstract protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void; + abstract protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void; public function __construct() { @@ -33,10 +34,10 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('pools') + ->inject('queuePool') ->inject('dbForConsole') ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + ->callback(fn (Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB) => $this->action($poolForQueue, $dbForConsole, $getProjectDB)); } /** @@ -44,7 +45,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + public function action(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -113,17 +114,17 @@ abstract class ScheduleBase extends Action $latestDocument = \end($results); } - $pools->reclaim(); + $poolForQueue->reclaim(); Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); - run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) { + run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue, $getProjectDB) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -172,17 +173,17 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); - $pools->reclaim(); + $poolForQueue->reclaim(); Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($pools, $dbForConsole, $getProjectDB) + fn () => $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB) ); - $this->enqueueResources($pools, $dbForConsole, $getProjectDB); + $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB); }); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 73a2814397..3996c54fed 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -6,6 +6,7 @@ use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Pools\Pool; class ScheduleExecutions extends ScheduleBase { @@ -27,9 +28,9 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } - protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void { - $queue = $pools->get('queue')->pop(); + $queue = $poolForQueue->pop(); $connection = $queue->getResource(); $queueForFunctions = new Func($connection); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 4d57902330..ff8d99c8e5 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } - protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void { $timerStart = \microtime(true); $time = DateTime::now(); @@ -70,10 +70,10 @@ class ScheduleFunctions extends ScheduleBase } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools) { + \go(function () use ($delay, $scheduleKeys, $poolForQueue) { \sleep($delay); // in seconds - $queue = $pools->get('queue')->pop(); + $queue = $poolForQueue->pop(); $connection = $queue->getResource(); foreach ($scheduleKeys as $scheduleKey) { diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index b9d8e2a282..79c2c44ec9 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Pools\Pool; class ScheduleMessages extends ScheduleBase { @@ -26,7 +27,7 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } - protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -40,8 +41,8 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForConsole) { - $queue = $pools->get('queue')->pop(); + \go(function () use ($schedule, $poolForQueue, $dbForConsole) { + $queue = $poolForQueue->pop(); $connection = $queue->getResource(); $queueForMessaging = new Messaging($connection);