From 7e8f72d267ba8cf8bf80f7a607dfd87a787fe09e Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:39:38 +0100 Subject: [PATCH] feat: implement scheduledAt in schedule --- .../Platform/Tasks/ScheduleFunctions.php | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index e8941c2ffa..c8e68e0d1e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -40,32 +40,39 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions = []; // Group executions with same delay to share one coroutine - foreach ($this->schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); // TODO: Allow schedule to be DateTime, like ScheduleMessaging.php - $nextDate = $cron->getNextRunDate(); - $next = DateTime::format($nextDate); + foreach ($this->schedules as $scheduleKey => $schedule) { + if (CronExpression::isValidExpression($schedule['schedule'])) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + } else { + try { + $nextDate = new \DateTime($schedule['schedule']); + $schedule['delete'] = true; + } catch (\Exception) { + Console::error('Failed to parse schedule: ' . $schedule['schedule']); + continue; + } + } + $next = DateTime::format($nextDate); $currentTick = $next < $timeFrame; if (!$currentTick) { continue; } - $total++; - - $promiseStart = \time(); // in seconds - $executionStart = $nextDate->getTimestamp(); // in seconds - $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + $total += 1; + $delay = $nextDate->getTimestamp() - \time(); // Time to wait from now until execution needs to be queued if (!isset($delayedExecutions[$delay])) { $delayedExecutions[$delay] = []; } - $delayedExecutions[$delay][] = $key; + $delayedExecutions[$delay][] = $scheduleKey; } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools) { + \go(function () use ($delay, $scheduleKeys, $pools, $dbForConsole) { \sleep($delay); // in seconds $queue = $pools->get('queue')->pop(); @@ -76,7 +83,6 @@ class ScheduleFunctions extends ScheduleBase if (!\array_key_exists($scheduleKey, $this->schedules)) { return; } - $schedule = $this->schedules[$scheduleKey]; $queueForFunctions = new Func($connection); @@ -88,6 +94,13 @@ class ScheduleFunctions extends ScheduleBase ->setPath('/') ->setProject($schedule['project']) ->trigger(); + + if ($schedule['delete']) { + $dbForConsole->deleteDocument( + 'schedules', + $schedule['$id'], + ); + } } $queue->reclaim();