From 250ea93d3fe04db854258fac2b885e1042e8ca88 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 13 Nov 2022 15:14:00 +0200 Subject: [PATCH] some fixes --- app/tasks/schedule.php | 114 +++++++++++++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 28 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index a4ec1c5c04..ded7301208 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -7,12 +7,14 @@ use Cron\CronExpression; use Utopia\App; use Utopia\CLI\Console; use Utopia\Database\DateTime; +use Utopia\Database\Document; use Utopia\Database\Query; use Swoole\Timer; const FUNCTION_UPDATE_TIMER = 60; //seconds const FUNCTION_ENQUEUE_TIMER = 60; //seconds const FUNCTION_ENQUEUE_TIMEFRAME = 60 * 5; // 5 min +const FUNCTION_RESET_TIMER_TO = 50; // seconds sleep(4); @@ -38,7 +40,12 @@ $cli Console::title('Scheduler V1'); Console::success(APP_NAME . ' Scheduler v1 has started'); - $createQueue = function () use (&$functions, &$queue) { + $dbForConsole = getConsoleDB(); + + /** + * @return void + */ + $createQueue = function () use (&$functions, &$queue): void { $loadStart = \microtime(true); /** * Creating smaller functions list containing 5-min timeframe. @@ -47,24 +54,67 @@ $cli foreach ($functions as $function) { $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); + if ($next < $timeFrame) { $queue[$next][$function['resourceId']] = $function; } } $loadEnd = \microtime(true); Console::success("Queue was built in " . ($loadEnd - $loadStart) . " seconds"); + //var_dump($queue); }; - $removeFromQueue = function ($resourceId) use (&$queue) { + /** + * @param string $id + * @param string $resourceId + * @return void + */ + $removeFromQueue = function (string $id, string $resourceId) use (&$queue, &$functions, $dbForConsole) { + if (array_key_exists($resourceId, $functions)) { + unset($functions[$resourceId]); + $dbForConsole->deleteDocument('schedules', $id); + Console::error("Removing :{$resourceId} from functions list"); + } + foreach ($queue as $slot => $schedule) { if (array_key_exists($resourceId, $schedule)) { - Console::error("Unsetting :{$resourceId} from queue slot $slot"); unset($queue[$slot][$resourceId]); + Console::error("Removing :{$resourceId} from queue slot $slot"); } } }; - $dbForConsole = getConsoleDB(); + /** + * @param string $resourceId + * @param array $update + * @return void + */ + $updateQueue = function (string $resourceId, array $update) use (&$queue, &$functions): void { + + $functions[$resourceId] = $update; + Console::error("Updating :{$resourceId} in functions list"); + + foreach ($queue as $slot => $schedule) { + if (array_key_exists($resourceId, $schedule)) { + $queue[$slot][$resourceId] = $update; + Console::error("Updating :{$resourceId} in queue slot $slot"); + } + } + }; + + /** + * @var Document $schedule + * @return array + */ + function getsSheduleAttributes(Document $schedule): array + { + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + ]; + } + $limit = 10000; $sum = $limit; $functions = []; @@ -87,11 +137,7 @@ $cli $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $functions[$document['resourceId']] = [ - 'resourceId' => $document->getAttribute('resourceId'), - 'schedule' => $document->getAttribute('schedule'), - 'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'), - ]; + $functions[$document['resourceId']] = getsSheduleAttributes($document); } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; @@ -102,12 +148,20 @@ $cli $createQueue(); $lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_UPDATE_TIMER); + do { + $second = time() % 60; + } while ($second < FUNCTION_RESET_TIMER_TO); + + $time = DateTime::now(); + Console::success("Starting timers at {$time}"); + + /** * The timer updates $functions from db on last resourceUpdatedAt attr in X-min. */ Co\run( - function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { - Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { + function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { + Timer::tick(FUNCTION_UPDATE_TIMER * 1000, function () use ($updateQueue, $removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { $time = DateTime::now(); $limit = 1000; $sum = $limit; @@ -115,7 +169,7 @@ $cli $latestDocument = null; $timerStart = \microtime(true); - Console::warning("Update proc started at: $time last update was at $lastUpdate"); + //Console::warning("Update proc started at: $time last update was at $lastUpdate"); while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; @@ -125,7 +179,7 @@ $cli $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('resourceType', ['function']), - Query::greaterThan('resourceUpdatedAt', $lastUpdate), + Query::greaterThanEqual('resourceUpdatedAt', $lastUpdate), ])); $sum = count($results); @@ -134,17 +188,12 @@ $cli $org = isset($functions[$document['resourceId']]) ? strtotime($functions[$document['resourceId']]['resourceUpdatedAt']) : null; $new = strtotime($document['resourceUpdatedAt']); if ($document['active'] === false) { - Console::warning("Removing: {$document['resourceId']}"); - unset($functions[$document['resourceId']]); + //Console::warning("Removing: {$document['resourceId']}"); + $removeFromQueue($document->getId(), $document['resourceId']); } elseif ($new > $org) { - Console::warning("Updating: {$document['resourceId']}"); - $functions[$document['resourceId']] = [ - 'resourceId' => $document->getAttribute('resourceId'), - 'schedule' => $document->getAttribute('schedule'), - 'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'), - ]; + //Console::warning("Updating: {$document['resourceId']}"); + $updateQueue($document['resourceId'], getsSheduleAttributes($document)); } - $removeFromQueue($document['resourceId']); } $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; @@ -154,7 +203,7 @@ $cli $createQueue(); $timerEnd = \microtime(true); - Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); + //Console::warning("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); }); /** @@ -165,17 +214,27 @@ $cli $time = DateTime::now(); $timeFrame = DateTime::addSeconds(new \DateTime(), FUNCTION_ENQUEUE_TIMEFRAME); $slot = (new \DateTime())->format('Y-m-d H:i:00.000'); + $prepareStart = time(); Console::info("Enqueue proc started at: $time"); if (array_key_exists($slot, $queue)) { $schedule = $queue[$slot]; console::info(count($schedule) . " functions sent to worker for time slot " . $slot); + $totalPreparation = time() - $prepareStart; + + $wait = ((60 - FUNCTION_RESET_TIMER_TO) - $totalPreparation); + Console::info("Waiting for : {$wait} seconds"); + sleep($wait); + + $time = DateTime::now(); + Console::info("Start enqueueing at {$time}"); foreach ($schedule as $function) { - /** - * Enqueue function (here should be the Enqueue call - */ + if (empty($functions[$function['resourceId']])) { + continue; + } + $cron = new CronExpression($function['schedule']); $next = DateTime::format($cron->getNextRunDate()); @@ -185,8 +244,7 @@ $cli */ if ( $next < $timeFrame && - !empty($functions[$function['resourceId']] && - $function['schedule'] === $functions[$function['resourceId']]['schedule']) + $function['schedule'] ?? [] === $functions[$function['resourceId']]['schedule'] ) { $queue[$next][$function['resourceId']] = $function; }