appwrite/src/Appwrite/Platform/Tasks/ScheduleExecutions.php
Jake Barnby 3ec0f08db1
Merge remote-tracking branch 'origin/1.6.x' into feat-sync-1.6.x
# Conflicts:
#	composer.lock
#	src/Appwrite/Messaging/Adapter/Realtime.php
#	src/Appwrite/Platform/Workers/Deletes.php
2025-05-21 20:15:08 +12:00

87 lines
2.7 KiB
PHP

<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
class ScheduleExecutions extends ScheduleBase
{
public const UPDATE_TIMER = 3; // seconds
public const ENQUEUE_TIMER = 4; // seconds
public static function getName(): string
{
return 'schedule-executions';
}
public static function getSupportedResource(): string
{
return 'execution';
}
public static function getCollectionId(): string
{
return 'executions';
}
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
$queueForFunctions = new Func($this->publisher);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
$dbForPlatform->deleteDocument(
'schedules',
$schedule['$id'],
);
unset($this->schedules[$schedule['$internalId']]);
continue;
}
$scheduledAt = new \DateTime($schedule['schedule']);
if ($scheduledAt <= $intervalEnd) {
continue;
}
$data = $dbForPlatform->getDocument(
'schedules',
$schedule['$id'],
)->getAttribute('data', []);
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
\go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) {
Co::sleep($delay);
$queueForFunctions->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['resourceId'])
->setExecution($schedule['resource'])
->setMethod($data['method'] ?? 'POST')
->setPath($data['path'] ?? '/')
->setHeaders($data['headers'] ?? [])
->setBody($data['body'] ?? '')
->setProject($schedule['project'])
->setUserId($data['userId'] ?? '')
->trigger();
$this->recordEnqueueDelay($scheduledAt);
});
$dbForPlatform->deleteDocument(
'schedules',
$schedule['$id'],
);
unset($this->schedules[$schedule['$internalId']]);
}
}
}