From 273ea0abccff5e7629cabe85a36bf5a8090d1387 Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Thu, 6 Feb 2025 13:18:18 +0100 Subject: [PATCH] fix(schedule-tasks): revert back to direct pool usage --- src/Appwrite/Platform/Tasks/ScheduleBase.php | 22 +++++++++++-------- .../Platform/Tasks/ScheduleExecutions.php | 10 ++++++--- .../Platform/Tasks/ScheduleFunctions.php | 14 ++++++++---- .../Platform/Tasks/ScheduleMessages.php | 14 ++++++++---- 4 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index abcad8c02e..dad2db0d9a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,7 +11,7 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; -use Utopia\Queue\Publisher; +use Utopia\Pools\Group; use Utopia\System\System; use function Swoole\Coroutine\run; @@ -26,7 +26,7 @@ abstract class ScheduleBase extends Action abstract public static function getName(): string; abstract public static function getSupportedResource(): string; abstract public static function getCollectionId(): string; - abstract protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void; + abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void; public function __construct() { @@ -34,10 +34,10 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('publisher') + ->inject('pools') ->inject('dbForPlatform') ->inject('getProjectDB') - ->callback(fn (Publisher $publisher, Database $dbForPlatform, callable $getProjectDB) => $this->action($publisher, $dbForPlatform, $getProjectDB)); + ->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB)); } protected function updateProjectAccess(Document $project, Database $dbForPlatform): void @@ -56,7 +56,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void + public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -125,15 +125,17 @@ abstract class ScheduleBase extends Action $latestDocument = \end($results); } + $pools->reclaim(); + Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); - run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $publisher, $getProjectDB) { + run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -182,15 +184,17 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); + $pools->reclaim(); + Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB) + fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB) ); - $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB); + $this->enqueueResources($pools, $dbForPlatform, $getProjectDB); }); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 25de721b38..7cd76b480d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -5,7 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; -use Utopia\Queue\Publisher; +use Utopia\Pools\Group; class ScheduleExecutions extends ScheduleBase { @@ -27,9 +27,11 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } - protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { - $queueForFunctions = new Func($publisher); + $queue = $pools->get('publisher')->pop(); + $connection = $queue->getResource(); + $queueForFunctions = new Func($connection); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -81,5 +83,7 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } + + $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 4f8579862f..5b8e3027a7 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -7,7 +7,7 @@ use Cron\CronExpression; use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; -use Utopia\Queue\Publisher; +use Utopia\Pools\Group; class ScheduleFunctions extends ScheduleBase { @@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } - protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { $timerStart = \microtime(true); $time = DateTime::now(); @@ -70,9 +70,12 @@ class ScheduleFunctions extends ScheduleBase } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $publisher, $dbForPlatform) { + \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { \sleep($delay); // in seconds + $queue = $pools->get('publisher')->pop(); + $connection = $queue->getResource(); + foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted if (!\array_key_exists($scheduleKey, $this->schedules)) { @@ -83,7 +86,8 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($publisher); + $queueForFunctions = new Func($connection); + $queueForFunctions ->setType('schedule') ->setFunction($schedule['resource']) @@ -92,6 +96,8 @@ class ScheduleFunctions extends ScheduleBase ->setProject($schedule['project']) ->trigger(); } + + $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index a575fb819a..201d5eab53 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -4,7 +4,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; -use Utopia\Queue\Publisher; +use Utopia\Pools\Group; class ScheduleMessages extends ScheduleBase { @@ -26,7 +26,7 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } - protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -40,9 +40,13 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $publisher, $dbForPlatform) { + \go(function () use ($schedule, $pools, $dbForPlatform) { + $queue = $pools->get('publisher')->pop(); + $connection = $queue->getResource(); + $queueForMessaging = new Messaging($connection); + $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForMessaging = new Messaging($publisher); + $queueForMessaging ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId']) @@ -54,6 +58,8 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); + $queue->reclaim(); + unset($this->schedules[$schedule['$internalId']]); }); }