appwrite/src/Appwrite/Platform/Tasks/ScheduleFunctions.php

116 lines
3.6 KiB
PHP
Raw Normal View History

2024-01-11 03:06:59 +00:00
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Cron\CronExpression;
use Utopia\CLI\Console;
2024-10-08 07:54:40 +00:00
use Utopia\Database\Database;
2024-01-11 03:06:59 +00:00
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
2024-01-11 03:06:59 +00:00
class ScheduleFunctions extends ScheduleBase
{
public const UPDATE_TIMER = 10; // seconds
public const ENQUEUE_TIMER = 60; // seconds
private ?float $lastEnqueueUpdate = null;
public static function getName(): string
{
return 'schedule-functions';
}
public static function getSupportedResource(): string
{
2024-01-11 06:32:24 +00:00
return 'function';
2024-01-11 03:06:59 +00:00
}
2024-05-29 15:59:10 +00:00
public static function getCollectionId(): string
{
return 'functions';
}
2025-07-16 17:11:04 +00:00
protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void
2024-01-11 03:06:59 +00:00
{
$timerStart = \microtime(true);
$time = DateTime::now();
$enqueueDiff = $this->lastEnqueueUpdate === null ? 0 : $timerStart - $this->lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), static::ENQUEUE_TIMER - $enqueueDiff);
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$total = 0;
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $key => $schedule) {
try {
$cron = new CronExpression($schedule['schedule']);
} catch (\InvalidArgumentException) {
// ignore invalid cron expressions
continue;
}
2024-01-11 03:06:59 +00:00
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate];
2024-01-11 03:06:59 +00:00
}
2025-05-14 06:27:51 +00:00
foreach ($delayedExecutions as $delay => $schedules) {
2025-05-21 14:21:11 +00:00
\go(function () use ($delay, $schedules, $dbForPlatform) {
2025-05-14 06:27:51 +00:00
\sleep($delay); // in seconds
foreach ($schedules as $delayConfig) {
$scheduleKey = $delayConfig['key'];
2025-04-17 05:09:08 +00:00
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
return;
2025-03-28 03:42:26 +00:00
}
2025-04-17 05:09:08 +00:00
$schedule = $this->schedules[$scheduleKey];
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($this->publisher);
2025-04-29 09:28:53 +00:00
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
$this->recordEnqueueDelay($delayConfig['nextDate']);
2025-04-17 05:09:08 +00:00
}
2024-01-11 03:06:59 +00:00
});
}
$timerEnd = \microtime(true);
2024-01-11 06:32:24 +00:00
// TODO: This was a bug before because it wasn't passed by reference, enabling it breaks scheduling
//$this->lastEnqueueUpdate = $timerStart;
2024-01-11 03:06:59 +00:00
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
}
}