diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index bb9a64ed62..ed42a45e4e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -178,8 +178,10 @@ abstract class ScheduleBase extends Action Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); - Timer::tick(static::ENQUEUE_TIMER * 1000, fn() => - $this->enqueueResources($pools, $dbForConsole)); + Timer::tick( + static::ENQUEUE_TIMER * 1000, + fn() => $this->enqueueResources($pools, $dbForConsole) + ); $this->enqueueResources($pools, $dbForConsole); }); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 6d938a8b4d..9ede91279d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -16,7 +16,7 @@ use function Swoole\Coroutine\run; class ScheduleMessages extends ScheduleBase { - public const UPDATE_TIMER = 3; // seconds + public const UPDATE_TIMER = 10; // seconds public const ENQUEUE_TIMER = 60; // seconds public static function getName(): string @@ -32,6 +32,13 @@ class ScheduleMessages extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForConsole): void { foreach ($this->schedules as $schedule) { + $now = DateTime::now(); + $scheduledAt = DateTime::formatTz($schedule['scheduledAt']); + + if ($scheduledAt > $now) { + continue; + } + \go(function () use ($schedule, $pools, $dbForConsole) { $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); @@ -50,7 +57,7 @@ class ScheduleMessages extends ScheduleBase $queue->reclaim(); - unset($this->schedules[$schedule->getId()]); + unset($this->schedules[$schedule['resourceId']]); }); } }