diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php index 0e48d23883..39e0686da5 100644 --- a/src/Appwrite/Platform/Tasks/Schedule.php +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -12,6 +12,8 @@ use Utopia\Database\Query; use Swoole\Timer; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\Queue\Client as Worker; +use Appwrite\Event\Event; use function Swoole\Coroutine\run; @@ -158,7 +160,7 @@ class Schedule extends Action * The timer to prepare soon-to-execute schedules. */ $lastEnqueueUpdate = null; - $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) { + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) { $timerStart = \microtime(true); $time = DateTime::now(); @@ -198,7 +200,7 @@ class Schedule extends Action } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $schedules, $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys, $pools) { \sleep($delay); // in seconds foreach ($scheduleKeys as $scheduleKey) { @@ -207,7 +209,21 @@ class Schedule extends Action return; } - Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue + $schedule = $schedules[$scheduleKey]; + + $queue = $pools->get('queue')->pop(); + + $worker = new Worker(Event::FUNCTIONS_QUEUE_NAME, $queue->getResource()); + $worker + ->enqueue([ + 'type' => 'schedule', + 'value' => [ + 'project' => $schedule['project'], + 'function' => $schedule['function'], + ] + ]); + + $queue->reclaim(); } }); }