From 3768912412b46557d354863c3b1f93fe90a85dd1 Mon Sep 17 00:00:00 2001 From: shimon Date: Mon, 7 Nov 2022 16:39:42 +0200 Subject: [PATCH] replacing offset with pagination --- app/tasks/schedule.php | 65 ++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 24 deletions(-) diff --git a/app/tasks/schedule.php b/app/tasks/schedule.php index d556c30528..2f58b9845d 100644 --- a/app/tasks/schedule.php +++ b/app/tasks/schedule.php @@ -69,62 +69,70 @@ $cli }; $dbForConsole = getConsoleDB(); - $limit = 200; + $limit = 10000; $sum = $limit; $functions = []; $queue = []; - $count = 0; - $loadStart = \microtime(true); $total = 0; - /** - * Initial run fill $functions list - */ + $loadStart = \microtime(true); + $latestDocument = null; + while ($sum === $limit) { - $results = $dbForConsole->find('schedules', [ + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('resourceType', ['function']), Query::equal('active', [true]), - Query::offset($count * $limit), - Query::limit($limit), - ]); + ])); $sum = count($results); $total = $total + $sum; foreach ($results as $document) { - $functions[$document['resourceId']] = $document; + $functions[$document['resourceId']] = [ + 'resourceId' => $document->getAttribute('resourceId'), + 'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'), + 'schedule' => $document->getAttribute('schedule'), + ]; } - $count++; + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } $loadEnd = \microtime(true); Console::error("{$total} functions where loaded in " . ($loadEnd - $loadStart) . " seconds"); $createQueue(); - $lastUpdate = DateTime::addSeconds(new \DateTime(), -FUNCTION_VALIDATION_TIMER); + /** + * The timer updates $functions from db on last resourceUpdatedAt attr in X-min. + */ Co\run( function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { Timer::tick(FUNCTION_VALIDATION_TIMER * 1000, function () use ($removeFromQueue, $createQueue, $dbForConsole, &$functions, &$queue, &$lastUpdate) { $time = DateTime::now(); - $count = 0; $limit = 200; $sum = $limit; $total = 0; + $latestDocument = null; $timerStart = \microtime(true); Console::info("Update proc run at: $time last update was at $lastUpdate"); - /** - * Updating functions list from DB. - */ - while (!empty($sum)) { - $results = $dbForConsole->find('schedules', [ + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ Query::equal('region', [App::getEnv('_APP_REGION')]), Query::equal('resourceType', ['function']), Query::greaterThan('resourceUpdatedAt', $lastUpdate), - Query::limit($limit), - Query::offset($count * $limit), - ]); + ])); + $sum = count($results); $total = $total + $sum; foreach ($results as $document) { @@ -135,12 +143,18 @@ $cli unset($functions[$document['resourceId']]); } elseif ($new > $org) { Console::error("Updating: {$document['resourceId']}"); - $functions[$document['resourceId']] = $document; + $functions[$document['resourceId']] = [ + 'resourceId' => $document->getAttribute('resourceId'), + 'resourceUpdatedAt' => $document->getAttribute('resourceUpdatedAt'), + 'schedule' => $document->getAttribute('schedule'), + ]; } $removeFromQueue($document['resourceId']); } - $count++; + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } + $lastUpdate = DateTime::now(); $createQueue(); $timerEnd = \microtime(true); @@ -148,6 +162,9 @@ $cli Console::error("Update timer: {$total} functions where updated in " . ($timerEnd - $timerStart) . " seconds"); }); + /** + * The timer sends to worker every 1 min and re-enqueue matched functions. + */ Timer::tick(FUNCTION_ENQUEUE_TIMER * 1000, function () use ($dbForConsole, &$functions, &$queue) { $timerStart = \microtime(true); $time = DateTime::now();