From b390485dd9434e3b3a4664a9aacd96490271c974 Mon Sep 17 00:00:00 2001 From: Binyamin Yawitz <316103+byawitz@users.noreply.github.com> Date: Fri, 6 Sep 2024 15:52:20 -0400 Subject: [PATCH] fix: coroutine access in schedulers --- src/Appwrite/Platform/Tasks/ScheduleBase.php | 22 +++++++++++-------- .../Platform/Tasks/ScheduleExecutions.php | 18 ++++++++------- .../Platform/Tasks/ScheduleFunctions.php | 3 +-- .../Platform/Tasks/ScheduleMessages.php | 8 +++---- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 2fbd26d4fd..c93c6ef7ba 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Utopia\Queue\Connections; use Swoole\Timer; use Utopia\CLI\Console; -use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Exception; @@ -27,7 +26,7 @@ abstract class ScheduleBase extends Action abstract protected function enqueueResources( array $pools, - Database $dbForConsole + callable $getConsoleDB ); public function __construct() @@ -39,9 +38,9 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") ->inject('pools') - ->inject('dbForConsole') + ->inject('getConsoleDB') ->inject('getProjectDB') - ->callback(fn (array $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + ->callback(fn (array $pools, callable $getConsoleDB, callable $getProjectDB) => $this->action($pools, $getConsoleDB, $getProjectDB)); }); } @@ -50,11 +49,12 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(array $pools, Database $dbForConsole, callable $getProjectDB): void + public function action(array $pools, callable $getConsoleDB, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + [$_, $_, $dbForConsole] = $getConsoleDB(); /** * Extract only necessary attributes to lower memory used. * @@ -135,7 +135,11 @@ abstract class ScheduleBase extends Action Console::success("Starting timers at " . DateTime::now()); - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($getConsoleDB, &$lastSyncUpdate, $getSchedule, $pools) { + [$connection,$pool, $dbForConsole] = $getConsoleDB(); + $connections = new Connections(); + $connections->add($connection, $pool); + $time = DateTime::now(); $timerStart = \microtime(true); @@ -184,15 +188,15 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); - + $connections->reclaim(); Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($pools, $dbForConsole) + fn () => $this->enqueueResources($pools, $getConsoleDB) ); - $this->enqueueResources($pools, $dbForConsole); + $this->enqueueResources($pools, $getConsoleDB); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 0ef53bcdf2..01a65ad57f 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -4,7 +4,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Swoole\Coroutine as Co; -use Utopia\Database\Database; use Utopia\Queue\Connection\Redis; class ScheduleExecutions extends ScheduleBase @@ -22,8 +21,11 @@ class ScheduleExecutions extends ScheduleBase return 'execution'; } - protected function enqueueResources(array $pools, Database $dbForConsole): void + protected function enqueueResources(array $pools, callable $getConsoleDB): void { + [$connection,$pool, $dbForConsole] = $getConsoleDB(); + $this->connections->add($connection, $pool); + $pool = $pools['pools-queue-queue']['pool']; $connection = $pool->get(); $this->connections->add($connection, $pool); @@ -50,7 +52,7 @@ class ScheduleExecutions extends ScheduleBase $delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp(); - \go(function () use ($queueForFunctions, $schedule, $delay) { + \go(function () use ($queueForFunctions, $schedule, $delay, $dbForConsole) { Co::sleep($delay); $queueForFunctions @@ -65,12 +67,12 @@ class ScheduleExecutions extends ScheduleBase ->setBody($schedule['data']['body'] ?? '') ->setProject($schedule['project']) ->trigger(); - }); - $dbForConsole->deleteDocument( - 'schedules', - $schedule['$id'], - ); + $dbForConsole->deleteDocument( + 'schedules', + $schedule['$id'], + ); + }); unset($this->schedules[$schedule['resourceId']]); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index cfa6869662..450551400e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Cron\CronExpression; use Utopia\CLI\Console; -use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Queue\Connection\Redis; @@ -26,7 +25,7 @@ class ScheduleFunctions extends ScheduleBase return 'function'; } - protected function enqueueResources(array $pools, Database $dbForConsole): void + protected function enqueueResources(array $pools, callable $getConsoleDB): void { $timerStart = \microtime(true); $time = DateTime::now(); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index fc76d2a543..8203bcc9c9 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -3,7 +3,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; -use Utopia\Database\Database; use Utopia\Queue\Connection\Redis; class ScheduleMessages extends ScheduleBase @@ -21,8 +20,11 @@ class ScheduleMessages extends ScheduleBase return 'message'; } - protected function enqueueResources(array $pools, Database $dbForConsole): void + protected function enqueueResources(array $pools, callable $getConsoleDB): void { + [$connection,$pool, $dbForConsole] = $getConsoleDB(); + $this->connections->add($connection, $pool); + foreach ($this->schedules as $schedule) { if (!$schedule['active']) { continue; @@ -57,8 +59,6 @@ class ScheduleMessages extends ScheduleBase ); $this->connections->reclaim(); - // $queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource - unset($this->schedules[$schedule['resourceId']]); }); }