diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 093c2740ba..afd7d9d22a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -91,13 +91,18 @@ abstract class ScheduleBase extends Action }); while (true) { - $this->enqueueResources($pools, $dbForPlatform, $getProjectDB); - $this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]); - sleep(static::ENQUEUE_TIMER); + try { + go(fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)); + $this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]); + sleep(static::ENQUEUE_TIMER); + } catch (\Throwable $th) { + Console::error('Failed to enqueue resources: ' . $th->getMessage()); + } + } } - private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, ?string &$lastSyncUpdate): void + private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void { // If we haven't synced yet, load all active schedules $initialLoad = $lastSyncUpdate === "0"; @@ -202,10 +207,8 @@ abstract class ScheduleBase extends Action Console::success("{$total} resources were loaded in " . $duration . " seconds"); } - protected function recordEnqueueDelay(string $expectedExecutionSchedule): void + protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void { - $now = strtotime('now'); - $scheduledAt = strtotime($expectedExecutionSchedule); - $this->enqueueDelayTelemetry->record($now - $scheduledAt, ['resourceType' => static::getSupportedResource()]); + $this->enqueueDelayTelemetry->record(time() - $expectedExecutionSchedule->getTimestamp(), ['resourceType' => static::getSupportedResource()]); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 79e983f0c3..89d1609a33 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -59,7 +59,7 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($queueForFunctions, $schedule, $delay, $data) { + \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { Co::sleep($delay); $queueForFunctions->setType('schedule') @@ -75,7 +75,7 @@ class ScheduleExecutions extends ScheduleBase ->setUserId($data['userId'] ?? '') ->trigger(); - $this->recordEnqueueDelay($schedule['schedule']); + $this->recordEnqueueDelay($scheduledAt); }); $dbForPlatform->deleteDocument( diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index abcfe132e3..6788748f3d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -46,7 +46,13 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions = []; // Group executions with same delay to share one coroutine foreach ($this->schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); + try { + $cron = new CronExpression($schedule['schedule']); + } catch (\InvalidArgumentException) { + // ignore invalid cron expressions + continue; + } + $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate); @@ -66,17 +72,18 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions[$delay] = []; } - $delayedExecutions[$delay][] = $key; + $delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate]; } - foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { + foreach ($delayedExecutions as $delay => $schedules) { + \go(function () use ($delay, $schedules, $pools, $dbForPlatform) { \sleep($delay); // in seconds $queue = $pools->get('publisher')->pop(); $connection = $queue->getResource(); - foreach ($scheduleKeys as $scheduleKey) { + foreach ($schedules as $delayConfig) { + $scheduleKey = $delayConfig['key']; // Ensure schedule was not deleted if (!\array_key_exists($scheduleKey, $this->schedules)) { return; @@ -96,7 +103,7 @@ class ScheduleFunctions extends ScheduleBase ->setProject($schedule['project']) ->trigger(); - $this->recordEnqueueDelay($schedule['schedule']); + $this->recordEnqueueDelay($delayConfig['nextDate']); } $queue->reclaim(); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 9b962c99ee..a15df6ed5b 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForPlatform) { + \go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) { $queue = $pools->get('publisher')->pop(); $connection = $queue->getResource(); $queueForMessaging = new Messaging($connection); @@ -59,7 +59,7 @@ class ScheduleMessages extends ScheduleBase ); $queue->reclaim(); - $this->recordEnqueueDelay($schedule['schedule']); + $this->recordEnqueueDelay($scheduledAt); unset($this->schedules[$schedule['$internalId']]); }); }