Re-implement removed stuff during merge

This commit is contained in:
Matej Baco 2022-11-15 17:37:16 +01:00
parent eb47894970
commit 855c3d3b84

View file

@ -12,6 +12,8 @@ use Utopia\Database\Query;
use Swoole\Timer;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Client as Worker;
use Appwrite\Event\Event;
use function Swoole\Coroutine\run;
@ -158,7 +160,7 @@ class Schedule extends Action
* The timer to prepare soon-to-execute schedules.
*/
$lastEnqueueUpdate = null;
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) {
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) {
$timerStart = \microtime(true);
$time = DateTime::now();
@ -198,7 +200,7 @@ class Schedule extends Action
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
foreach ($scheduleKeys as $scheduleKey) {
@ -207,7 +209,21 @@ class Schedule extends Action
return;
}
Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue
$schedule = $schedules[$scheduleKey];
$queue = $pools->get('queue')->pop();
$worker = new Worker(Event::FUNCTIONS_QUEUE_NAME, $queue->getResource());
$worker
->enqueue([
'type' => 'schedule',
'value' => [
'project' => $schedule['project'],
'function' => $schedule['function'],
]
]);
$queue->reclaim();
}
});
}