From b8b81a9bd1a1ffc8cd6600a1dd214324a8620a6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Fri, 7 Jun 2024 19:05:29 +0000 Subject: [PATCH] WIP: Schedulded executions --- app/controllers/api/functions.php | 59 +++++++++++++------ app/controllers/api/messaging.php | 6 +- .../Platform/Tasks/ScheduleFunctions.php | 2 +- 3 files changed, 46 insertions(+), 21 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 392cf034f9..900147398a 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -32,6 +32,7 @@ use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; +use Utopia\Database\Validator\Datetime as DatetimeValidator; use Utopia\Database\Validator\Roles; use Utopia\Database\Validator\UID; use Utopia\Storage\Device; @@ -1511,16 +1512,21 @@ App::post('/v1/functions/:functionId/executions') ->param('path', '/', new Text(2048), 'HTTP path of execution. Path can include query params. Default value is /', true) ->param('method', 'POST', new Whitelist(['GET', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'], true), 'HTTP method of execution. Default value is GET.', true) ->param('headers', [], new Assoc(), 'HTTP headers of execution. Defaults to empty.', true) + ->param('scheduledAt', null, new DatetimeValidator(requireDateInFuture: true), 'Scheduled execution time in [ISO 8601](https://www.iso.org/iso-8601-date-and-time-format.html) format. DateTime value must be in future.', true) ->inject('response') ->inject('project') ->inject('dbForProject') + ->inject('dbForConsole') ->inject('user') ->inject('queueForEvents') ->inject('queueForUsage') - ->inject('mode') ->inject('queueForFunctions') ->inject('geodb') - ->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, Response $response, Document $project, Database $dbForProject, Document $user, Event $queueForEvents, Usage $queueForUsage, string $mode, Func $queueForFunctions, Reader $geodb) { + ->action(function (string $functionId, string $body, bool $async, string $path, string $method, array $headers, ?string $scheduledAt, Response $response, Document $project, Database $dbForProject, Database $dbForConsole, Document $user, Event $queueForEvents, Usage $queueForUsage, Func $queueForFunctions, Reader $geodb) { + + if(!$async && !is_null($scheduledAt)) { + throw new Exception(Exception::GENERAL_QUERY_INVALID, 'Scheduled executions must run asynchronously. Don\'t set scheduledAt to execute immediately, or set async to true.'); + } $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1625,6 +1631,12 @@ App::post('/v1/functions/:functionId/executions') $executionId = ID::unique(); + $status = $async ? 'waiting' : 'processing'; + + if(!is_null($scheduledAt)) { + $status = 'scheduled'; + } + $execution = new Document([ '$id' => $executionId, '$permissions' => !$user->isEmpty() ? [Permission::read(Role::user($user->getId()))] : [], @@ -1633,7 +1645,7 @@ App::post('/v1/functions/:functionId/executions') 'deploymentInternalId' => $deployment->getInternalId(), 'deploymentId' => $deployment->getId(), 'trigger' => 'http', // http / schedule / event - 'status' => $async ? 'waiting' : 'processing', // waiting / processing / completed / failed + 'status' => $status, // waiting / processing / completed / failed 'responseStatusCode' => 0, 'responseHeaders' => [], 'requestPath' => $path, @@ -1656,20 +1668,33 @@ App::post('/v1/functions/:functionId/executions') $execution = Authorization::skip(fn () => $dbForProject->createDocument('executions', $execution)); } - $queueForFunctions - ->setType('http') - ->setExecution($execution) - ->setFunction($function) - ->setBody($body) - ->setHeaders($headers) - ->setPath($path) - ->setMethod($method) - ->setJWT($jwt) - ->setProject($project) - ->setUser($user) - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->trigger(); + if(is_null($scheduledAt)) { + $queueForFunctions + ->setType('http') + ->setExecution($execution) + ->setFunction($function) + ->setBody($body) + ->setHeaders($headers) + ->setPath($path) + ->setMethod($method) + ->setJWT($jwt) + ->setProject($project) + ->setUser($user) + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->trigger(); + } else { + $dbForConsole->createDocument('schedules', new Document([ + 'region' => System::getEnv('_APP_REGION', 'default'), + 'resourceType' => 'function', + 'resourceId' => $function->getId(), + 'resourceInternalId' => $function->getInternalId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $scheduledAt, + 'active' => true, + ])); + } return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index e3696cc7e0..7da0348a8f 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -2697,7 +2697,7 @@ App::post('/v1/messaging/messages/email') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); @@ -2813,7 +2813,7 @@ App::post('/v1/messaging/messages/sms') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); @@ -2989,7 +2989,7 @@ App::post('/v1/messaging/messages/push') 'resourceInternalId' => $message->getInternalId(), 'resourceUpdatedAt' => DateTime::now(), 'projectId' => $project->getId(), - 'schedule' => $scheduledAt, + 'schedule' => $scheduledAt, 'active' => true, ])); diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index e2c278714f..e8941c2ffa 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -41,7 +41,7 @@ class ScheduleFunctions extends ScheduleBase $delayedExecutions = []; // Group executions with same delay to share one coroutine foreach ($this->schedules as $key => $schedule) { - $cron = new CronExpression($schedule['schedule']); + $cron = new CronExpression($schedule['schedule']); // TODO: Allow schedule to be DateTime, like ScheduleMessaging.php $nextDate = $cron->getNextRunDate(); $next = DateTime::format($nextDate);