appwrite/src/Appwrite/Platform/Tasks/ScheduleExecutions.php

90 lines
2.8 KiB
PHP
Raw Normal View History

2024-06-17 12:44:12 +00:00
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
2024-07-19 09:22:50 +00:00
use Swoole\Coroutine as Co;
2024-10-08 07:54:40 +00:00
use Utopia\Database\Database;
use Utopia\Pools\Group;
2024-06-17 12:44:12 +00:00
class ScheduleExecutions extends ScheduleBase
{
public const UPDATE_TIMER = 3; // seconds
public const ENQUEUE_TIMER = 4; // seconds
public static function getName(): string
{
return 'schedule-executions';
}
public static function getSupportedResource(): string
{
return 'execution';
}
2024-09-03 03:07:53 +00:00
public static function getCollectionId(): string
{
return 'executions';
}
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
2024-06-17 12:44:12 +00:00
{
2024-10-08 07:54:40 +00:00
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
2024-07-18 12:03:24 +00:00
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
2024-06-17 13:12:02 +00:00
2024-06-17 12:44:12 +00:00
foreach ($this->schedules as $schedule) {
2024-06-17 13:12:02 +00:00
if (!$schedule['active']) {
$dbForPlatform->deleteDocument(
2024-07-01 13:35:37 +00:00
'schedules',
$schedule['$id'],
);
unset($this->schedules[$schedule['$internalId']]);
2024-06-17 12:44:12 +00:00
continue;
}
$scheduledAt = new \DateTime($schedule['schedule']);
2024-07-18 12:03:24 +00:00
if ($scheduledAt <= $intervalEnd) {
2024-06-17 12:44:12 +00:00
continue;
}
$data = $dbForPlatform->getDocument(
2024-09-07 10:20:23 +00:00
'schedules',
$schedule['$id'],
)->getAttribute('data', []);
2024-07-18 12:03:24 +00:00
2024-09-07 10:20:23 +00:00
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
2024-07-19 09:22:50 +00:00
2024-12-17 14:19:26 +00:00
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
2024-10-08 07:54:40 +00:00
\go(function () use ($queueForFunctions, $schedule, $delay, $data) {
2024-07-19 09:22:50 +00:00
Co::sleep($delay);
2024-07-18 12:03:24 +00:00
2024-09-07 10:20:23 +00:00
$queueForFunctions->setType('schedule')
2024-07-18 12:03:24 +00:00
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
2025-02-03 12:43:35 +00:00
->setFunctionId($schedule['resource']['resourceId'])
2024-07-18 12:03:24 +00:00
->setExecution($schedule['resource'])
2024-09-07 10:20:23 +00:00
->setMethod($data['method'] ?? 'POST')
->setPath($data['path'] ?? '/')
->setHeaders($data['headers'] ?? [])
->setBody($data['body'] ?? '')
2024-07-18 12:03:24 +00:00
->setProject($schedule['project'])
2024-09-07 10:20:23 +00:00
->setUserId($data['userId'] ?? '')
2024-07-18 12:03:24 +00:00
->trigger();
2024-10-01 14:30:47 +00:00
});
2024-09-20 16:30:05 +00:00
$dbForPlatform->deleteDocument(
2024-10-08 07:54:40 +00:00
'schedules',
$schedule['$id'],
);
unset($this->schedules[$schedule['$internalId']]);
2024-06-17 12:44:12 +00:00
}
2024-06-17 13:12:02 +00:00
2024-10-08 07:54:40 +00:00
$queue->reclaim();
2024-06-17 12:44:12 +00:00
}
}