feat: implement scheduledAt in schedule

This commit is contained in:
loks0n 2024-06-11 13:39:38 +01:00
parent b8b81a9bd1
commit 7e8f72d267

View file

@ -40,32 +40,39 @@ class ScheduleFunctions extends ScheduleBase
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']); // TODO: Allow schedule to be DateTime, like ScheduleMessaging.php
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
foreach ($this->schedules as $scheduleKey => $schedule) {
if (CronExpression::isValidExpression($schedule['schedule'])) {
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
} else {
try {
$nextDate = new \DateTime($schedule['schedule']);
$schedule['delete'] = true;
} catch (\Exception) {
Console::error('Failed to parse schedule: ' . $schedule['schedule']);
continue;
}
}
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
$total += 1;
$delay = $nextDate->getTimestamp() - \time(); // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $key;
$delayedExecutions[$delay][] = $scheduleKey;
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForConsole) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
@ -76,7 +83,6 @@ class ScheduleFunctions extends ScheduleBase
if (!\array_key_exists($scheduleKey, $this->schedules)) {
return;
}
$schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($connection);
@ -88,6 +94,13 @@ class ScheduleFunctions extends ScheduleBase
->setPath('/')
->setProject($schedule['project'])
->trigger();
if ($schedule['delete']) {
$dbForConsole->deleteDocument(
'schedules',
$schedule['$id'],
);
}
}
$queue->reclaim();