diff --git a/app/cli.php b/app/cli.php index 0dec8bad04..9b85804349 100644 --- a/app/cli.php +++ b/app/cli.php @@ -156,15 +156,19 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, CLI::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); + CLI::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); + CLI::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); + CLI::setResource('queueForCertificates', function (Connection $queue) { return new Certificate($queue); }, ['queue']); + CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index e013220aa4..29ca74ee8e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,6 +11,7 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Platform\Action; use Utopia\Pools\Group; +use Utopia\Queue\Connection; use Utopia\System\System; use function Swoole\Coroutine\run; @@ -26,7 +27,7 @@ abstract class ScheduleBase extends Action abstract public static function getSupportedResource(): string; abstract protected function enqueueResources( - Group $pools, + Connection $queue, Database $dbForConsole ); @@ -36,10 +37,10 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('pools') + ->inject('queue') ->inject('dbForConsole') ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + ->callback(fn (Connection $queue, Database $dbForConsole, callable $getProjectDB) => $this->action($queue, $dbForConsole, $getProjectDB)); } /** @@ -47,7 +48,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + public function action(Connection $queue, Database $dbForConsole, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -127,17 +128,15 @@ abstract class ScheduleBase extends Action $latestDocument = \end($results); } - $pools->reclaim(); - Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); - run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -186,17 +185,15 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); - $pools->reclaim(); - Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($pools, $dbForConsole) + fn () => $this->enqueueResources($queue, $dbForConsole) ); - $this->enqueueResources($pools, $dbForConsole); + $this->enqueueResources($queue, $dbForConsole); }); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 0e618642d5..2822aea130 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -6,6 +6,7 @@ use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Queue\Connection; class ScheduleExecutions extends ScheduleBase { @@ -22,11 +23,10 @@ class ScheduleExecutions extends ScheduleBase return 'execution'; } - protected function enqueueResources(Group $pools, Database $dbForConsole): void + protected function enqueueResources(Connection $queue, Database $dbForConsole): void { - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - $queueForFunctions = new Func($connection); + + $queueForFunctions = new Func($queue); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -72,7 +72,5 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } - - $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index e2c278714f..a598bb5f45 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -8,6 +8,7 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Pools\Group; +use Utopia\Queue\Connection; class ScheduleFunctions extends ScheduleBase { @@ -26,7 +27,7 @@ class ScheduleFunctions extends ScheduleBase return 'function'; } - protected function enqueueResources(Group $pools, Database $dbForConsole): void + protected function enqueueResources(Connection $queue, Database $dbForConsole): void { $timerStart = \microtime(true); $time = DateTime::now(); @@ -65,12 +66,9 @@ class ScheduleFunctions extends ScheduleBase } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools) { + \go(function () use ($delay, $scheduleKeys, $queue) { \sleep($delay); // in seconds - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted if (!\array_key_exists($scheduleKey, $this->schedules)) { @@ -79,7 +77,7 @@ class ScheduleFunctions extends ScheduleBase $schedule = $this->schedules[$scheduleKey]; - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($queue); $queueForFunctions ->setType('schedule') @@ -89,8 +87,6 @@ class ScheduleFunctions extends ScheduleBase ->setProject($schedule['project']) ->trigger(); } - - $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 167f1282ed..cd136cf67d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Queue\Connection; class ScheduleMessages extends ScheduleBase { @@ -21,7 +22,7 @@ class ScheduleMessages extends ScheduleBase return 'message'; } - protected function enqueueResources(Group $pools, Database $dbForConsole): void + protected function enqueueResources(Connection $queue, Database $dbForConsole): void { foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -35,10 +36,9 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForConsole) { - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); + \go(function () use ($schedule, $queue, $dbForConsole) { + + $queueForMessaging = new Messaging($queue); $queueForMessaging ->setType(MESSAGE_SEND_TYPE_EXTERNAL) @@ -51,8 +51,6 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); - $queue->reclaim(); - unset($this->schedules[$schedule['$internalId']]); }); }