lastEnqueueUpdate === null ? 0 : $timerStart - $this->lastEnqueueUpdate; $timeFrame = DateTime::addSeconds(new \DateTime(), static::ENQUEUE_TIMER - $enqueueDiff); Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); $total = 0; $delayedExecutions = []; // Group executions with same delay to share one coroutine foreach ($this->schedules as $key => $schedule) { try { $cron = new CronExpression($schedule['schedule']); } catch (\InvalidArgumentException) { // ignore invalid cron expressions continue; } $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate); $currentTick = $next < $timeFrame; if (!$currentTick) { continue; } $total++; $promiseStart = \time(); // in seconds $executionStart = $nextDate->getTimestamp(); // in seconds $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued if (!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; } $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } foreach ($delayedExecutions as $delay => $schedules) { \go(function () use ($delay, $schedules, $dbForPlatform) { \sleep($delay); // in seconds foreach ($schedules as $delayConfig) { $scheduleKey = $delayConfig['key']; // Ensure schedule was not deleted if (!\array_key_exists($scheduleKey, $this->schedules)) { return; } $schedule = $this->schedules[$scheduleKey]; $this->updateProjectAccess($schedule['project'], $dbForPlatform); $queueForFunctions = new Func($this->publisherFunctions); $queueForFunctions ->setType('schedule') ->setFunction($schedule['resource']) ->setMethod('POST') ->setPath('/') ->setProject($schedule['project']) ->trigger(); $this->recordEnqueueDelay($delayConfig['nextDate']); } }); } $timerEnd = \microtime(true); // TODO: This was a bug before because it wasn't passed by reference, enabling it breaks scheduling //$this->lastEnqueueUpdate = $timerStart; Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds"); } }