From 44092175213a575f67dc55723af4908b428d21d7 Mon Sep 17 00:00:00 2001 From: shimon Date: Mon, 7 Nov 2022 14:20:02 +0200 Subject: [PATCH] queue --- app/tasks/schedule.php | 55 ++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index 2fc7acd1e9..d556c30528 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -145,14 +145,14 @@ $cli $createQueue(); $timerEnd = \microtime(true); - Console::error("Update timer: {$total} functions where updated in " . ($timerStart - $timerEnd) . " seconds"); + Console::error("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { $timerStart = \microtime(true); $time = DateTime::now(); $timeFrame = DateTime::addSeconds(new \DateTime(), ENQUEUE_TIME_FRAME); /** 5 min */ - $now = (new \DateTime())->format('Y-m-d H:i:00.000'); + $slot = (new \DateTime())->format('Y-m-d H:i:00.000'); Console::info("Enqueue proc run at: $time"); // Debug @@ -163,39 +163,36 @@ $cli // } // } - /** - * Lopping time slots - */ + if (array_key_exists($slot, $queue)) { + $schedule = $queue[$slot]; + console::log("Number of function sent to worker (" . count($schedule)); - foreach ($queue as $slot => $schedule) { - if ($now === $slot) { - foreach ($schedule as $function) { - /** - * Enqueue function (here should be the Enqueue call - */ - Console::warning("Enqueueing :{$function['resourceId']}"); - $cron = new CronExpression($function['schedule']); - $next = DateTime::format($cron->getNextRunDate()); + foreach ($schedule as $function) { + /** + * Enqueue function (here should be the Enqueue call + */ + //Console::warning("Enqueueing :{$function['resourceId']}"); + $cron = new CronExpression($function['schedule']); + $next = DateTime::format($cron->getNextRunDate()); - /** - * If next schedule is in 5-min timeframe - * and it was not removed or changed, re-enqueue the function. - */ - if ( - $next < $timeFrame && - !empty($functions[$function['resourceId']] && - $function['schedule'] === $functions[$function['resourceId']]['schedule']) - ) { - Console::warning("re-enqueueing :{$function['resourceId']}"); - $queue[$next][$function['resourceId']] = $function; - } - unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ + /** + * If next schedule is in 5-min timeframe + * and it was not removed or changed, re-enqueue the function. + */ + if ( + $next < $timeFrame && + !empty($functions[$function['resourceId']] && + $function['schedule'] === $functions[$function['resourceId']]['schedule']) + ) { + //Console::warning("re-enqueueing :{$function['resourceId']}"); + $queue[$next][$function['resourceId']] = $function; } - unset($queue[$slot]); /** removing slot */ + unset($queue[$slot][$function['resourceId']]); /** removing function from slot */ } + unset($queue[$slot]); /** removing slot */ } $timerEnd = \microtime(true); - Console::error("Queue timer: finished in " . ($timerStart - $timerEnd) . " seconds"); + Console::error("Queue timer: finished in " . ($timerEnd - $timerStart) . " seconds"); }); } );