Merge pull request #4641 from appwrite/refactor-scheduler

Refactor scheduler
This commit is contained in:
Christy Jacob 2022-11-16 19:40:25 +05:30 committed by GitHub
commit a027e6eceb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 907 additions and 383 deletions

3
.env
View file

@ -87,5 +87,6 @@ _APP_USAGE_DATABASE_INTERVAL=15
_APP_USAGE_STATS=enabled
_APP_LOGGING_PROVIDER=
_APP_LOGGING_CONFIG=
_APP_REGION=default
_APP_DOCKER_HUB_USERNAME=
_APP_DOCKER_HUB_PASSWORD=
_APP_DOCKER_HUB_PASSWORD=

View file

@ -3,6 +3,7 @@
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/controllers/general.php';
use Appwrite\Event\Func;
use Appwrite\Platform\Appwrite;
use Utopia\CLI\CLI;
use Utopia\Database\Validator\Authorization;
@ -56,20 +57,29 @@ CLI::setResource('dbForConsole', function ($pools, $cache) {
}, ['pools', 'cache']);
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) {
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
$getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$databaseName = $project->getAttribute('database');
if (isset($databases[$databaseName])) {
return $databases[$databaseName];
}
$dbAdapter = $pools
->get($project->getAttribute('database'))
->get($databaseName)
->pop()
->getResource()
;
->getResource();
$database = new Database($dbAdapter, $cache);
$database->setNamespace('_' . $project->getInternalId());
$databases[$databaseName] = $database;
return $database;
};
@ -100,6 +110,10 @@ CLI::setResource('influxdb', function (Registry $register) {
return $database;
}, ['register']);
CLI::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -751,6 +751,107 @@ $collections = [
],
],
'schedules' => [
'$collection' => ID::custom(Database::METADATA),
'$id' => ID::custom('schedules'),
'name' => 'schedules',
'attributes' => [
[
'$id' => ID::custom('resourceType'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 100,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('resourceId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('resourceUpdatedAt'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('projectId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('schedule'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 100,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('active'),
'type' => Database::VAR_BOOLEAN,
'signed' => true,
'size' => 0,
'format' => '',
'filters' => [],
'required' => false,
'default' => null,
'array' => false,
],
[
'$id' => ID::custom('region'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 10,
'signed' => true,
'required' => true,
'default' => null,
'array' => false,
'filters' => [],
],
],
'indexes' => [
[
'$id' => ID::custom('_key_region_resourceType_resourceUpdatedAt'),
'type' => Database::INDEX_KEY,
'attributes' => ['region', 'resourceType','resourceUpdatedAt'],
'lengths' => [],
'orders' => [],
],
[
'$id' => ID::custom('_key_region_resourceType_projectId_resourceId'),
'type' => Database::INDEX_KEY,
'attributes' => ['region', 'resourceType', 'projectId', 'resourceId'],
'lengths' => [],
'orders' => [],
],
],
],
'platforms' => [
'$collection' => ID::custom(Database::METADATA),
'$id' => ID::custom('platforms'),
@ -2148,6 +2249,17 @@ $collections = [
'array' => true,
'filters' => [],
],
[
'$id' => ID::custom('scheduleId'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => Database::LENGTH_KEY,
'signed' => true,
'required' => false,
'default' => null,
'array' => false,
'filters' => [],
],
[
'$id' => ID::custom('schedule'),
'type' => Database::VAR_STRING,
@ -2160,29 +2272,7 @@ $collections = [
'filters' => [],
],
[
'$id' => ID::custom('scheduleUpdatedAt'), // Used to fix duplicate executions bug. Can be removed once new queue library is used
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('schedulePrevious'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
'signed' => false,
'required' => false,
'default' => null,
'array' => false,
'filters' => ['datetime'],
],
[
'$id' => ID::custom('scheduleNext'),
'$id' => ID::custom('scheduleUpdatedAt'),
'type' => Database::VAR_DATETIME,
'format' => '',
'size' => 0,
@ -2258,20 +2348,6 @@ $collections = [
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_scheduleNext'),
'type' => Database::INDEX_KEY,
'attributes' => ['scheduleNext'],
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_schedulePrevious'),
'type' => Database::INDEX_KEY,
'attributes' => ['schedulePrevious'],
'lengths' => [],
'orders' => [Database::ORDER_ASC],
],
[
'$id' => ID::custom('_key_timeout'),
'type' => Database::INDEX_KEY,

View file

@ -36,7 +36,6 @@ use Utopia\Validator\Text;
use Utopia\Validator\Range;
use Utopia\Validator\WhiteList;
use Utopia\Config\Config;
use Cron\CronExpression;
use Executor\Executor;
use Utopia\CLI\Console;
use Utopia\Database\Validator\Roles;
@ -72,10 +71,8 @@ App::post('/v1/functions')
->inject('project')
->inject('user')
->inject('events')
->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) {
$cron = !empty($schedule) ? new CronExpression($schedule) : null;
$next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null;
->inject('dbForConsole')
->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
$function = $dbForProject->createDocument('functions', new Document([
@ -88,22 +85,24 @@ App::post('/v1/functions')
'events' => $events,
'schedule' => $schedule,
'scheduleUpdatedAt' => DateTime::now(),
'schedulePrevious' => null,
'scheduleNext' => $next,
'timeout' => $timeout,
'search' => implode(' ', [$functionId, $name, $runtime])
]));
if ($next) {
// Async task reschedule
$functionEvent = new Func();
$functionEvent
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
}
$schedule = Authorization::skip(
fn() => $dbForConsole->createDocument('schedules', new Document([
'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region
'resourceType' => 'function',
'resourceId' => $function->getId(),
'resourceUpdatedAt' => DateTime::now(),
'projectId' => $project->getId(),
'schedule' => $function->getAttribute('schedule'),
'active' => false,
]))
);
$function->setAttribute('scheduleId', $schedule->getId());
$dbForProject->updateDocument('functions', $function->getId(), $function);
$eventsInstance->setParam('functionId', $function->getId());
@ -448,7 +447,8 @@ App::put('/v1/functions/:functionId')
->inject('project')
->inject('user')
->inject('events')
->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) {
->inject('dbForConsole')
->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -456,9 +456,6 @@ App::put('/v1/functions/:functionId')
throw new Exception(Exception::FUNCTION_NOT_FOUND);
}
$cron = !empty($schedule) ? new CronExpression($schedule) : null;
$next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null;
$enabled ??= $function->getAttribute('enabled', true);
$function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [
@ -467,23 +464,27 @@ App::put('/v1/functions/:functionId')
'events' => $events,
'schedule' => $schedule,
'scheduleUpdatedAt' => DateTime::now(),
'scheduleNext' => $next,
'timeout' => $timeout,
'enabled' => $enabled,
'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]),
])));
if ($next) {
// Async task reschedule
$functionEvent = new Func();
$functionEvent
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
/**
* In case we want to clear the schedule
*/
if (!empty($function->getAttribute('deployment'))) {
$schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt'));
}
$schedule
->setAttribute('schedule', $function->getAttribute('schedule'))
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
$eventsInstance->setParam('functionId', $function->getId());
$response->dynamic($function, Response::MODEL_FUNCTION);
@ -509,7 +510,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
->inject('dbForProject')
->inject('project')
->inject('events')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events) {
->inject('dbForConsole')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
@ -535,6 +537,18 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
'deployment' => $deployment->getId()
])));
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$active = !empty($function->getAttribute('schedule'));
if ($active) {
$schedule->setAttribute('resourceUpdatedAt', datetime::now());
}
$schedule->setAttribute('active', $active);
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
$events
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId());
@ -560,7 +574,9 @@ App::delete('/v1/functions/:functionId')
->inject('dbForProject')
->inject('deletes')
->inject('events')
->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events) {
->inject('project')
->inject('dbForConsole')
->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events, Document $project, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -572,6 +588,15 @@ App::delete('/v1/functions/:functionId')
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
}
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$schedule
->setAttribute('resourceUpdatedAt', DateTime::now())
->setAttribute('active', false)
;
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
$deletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($function);
@ -608,7 +633,8 @@ App::post('/v1/functions/:functionId/deployments')
->inject('project')
->inject('deviceFunctions')
->inject('deviceLocal')
->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal) {
->inject('dbForConsole')
->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole) {
$function = $dbForProject->getDocument('functions', $functionId);
@ -760,6 +786,22 @@ App::post('/v1/functions/:functionId/deployments')
}
}
/**
* TODO Should we update also the function collection with the scheduleUpdatedAt attr?
*/
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$active = !empty($function->getAttribute('schedule'));
if ($active) {
$schedule->setAttribute('resourceUpdatedAt', datetime::now());
}
$schedule->setAttribute('active', $active);
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
$metadata = null;
$events
@ -992,8 +1034,6 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId')
$response->noContent();
});
App::post('/v1/functions/:functionId/executions')
->groups(['api', 'functions'])
->desc('Create Execution')
@ -1019,7 +1059,8 @@ App::post('/v1/functions/:functionId/executions')
->inject('events')
->inject('usage')
->inject('mode')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode) {
->inject('queueForFunctions')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Func $queueForFunctions) {
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
@ -1107,24 +1148,22 @@ App::post('/v1/functions/:functionId/executions')
->setContext('function', $function);
if ($async) {
$event = new Func();
$event
$queueForFunctions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setData($data)
->setJWT($jwt)
->setProject($project)
->setUser($user);
$event->trigger();
->setUser($user)
->trigger();
return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($execution, Response::MODEL_EXECUTION);
}
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value') ?? '';
return $carry;
}, []);

View file

@ -5,6 +5,7 @@ use Appwrite\Event\Audit;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
@ -128,9 +129,9 @@ App::init()
}
}
/*
* Background Jobs
*/
/*
* Background Jobs
*/
$events
->setEvent($route->getLabel('event', ''))
->setProject($project)
@ -251,7 +252,8 @@ App::shutdown()
->inject('database')
->inject('mode')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject) use ($parseLabel) {
->inject('queueForFunctions')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $queueForFunctions) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -262,9 +264,8 @@ App::shutdown()
/**
* Trigger functions.
*/
$events
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($events)
->trigger();
/**

View file

@ -71,9 +71,11 @@ use Utopia\Pools\Group;
use Utopia\Pools\Pool;
use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Event\Func;
use MaxMind\Db\Reader;
use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Database\PDOProxy;
use Utopia\Queue;
const APP_NAME = 'Appwrite';
const APP_DOMAIN = 'appwrite.io';
@ -644,14 +646,16 @@ $register->set('pools', function () {
};
$adapter->setDefaultDatabase($dsn->getDatabase());
break;
case 'queue':
$adapter = $resource();
break;
case 'pubsub':
$adapter = $resource();
break;
case 'queue':
$adapter = match ($dsn->getScheme()) {
'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()),
default => null
};
break;
case 'cache':
$adapter = match ($dsn->getScheme()) {
'redis' => new RedisCache($resource()),
@ -675,6 +679,7 @@ $register->set('pools', function () {
return $group;
});
$register->set('influxdb', function () {
// Register DB connection
$host = App::getEnv('_APP_INFLUXDB_HOST', '');
@ -847,10 +852,12 @@ App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete());
App::setResource('database', fn() => new EventDatabase());
App::setResource('messaging', fn() => new Phone());
App::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
App::setResource('usage', function ($register) {
return new Stats($register->get('statsd'));
}, ['register']);
App::setResource('clients', function ($request, $console, $project) {
$console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'),

View file

@ -36,6 +36,7 @@ foreach (
realpath(__DIR__ . '/../vendor/mongodb'),
realpath(__DIR__ . '/../vendor/utopia-php/websocket'), // TODO: remove workerman autoload
realpath(__DIR__ . '/../vendor/utopia-php/cache'), // TODO: remove memcached autoload
realpath(__DIR__ . '/../vendor/utopia-php/queue/src/Queue/Adapter/Workerman.php'), // TODO: remove workerman autoload
] as $key => $value
) {
if ($value !== false) {

145
app/worker.php Normal file
View file

@ -0,0 +1,145 @@
<?php
require_once __DIR__ . '/init.php';
use Appwrite\Event\Func;
use Swoole\Runtime;
use Utopia\App;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Queue\Adapter\Swoole;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Logger\Log;
use Utopia\Logger\Logger;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
global $register;
Server::setResource('register', fn() => $register);
Server::setResource('dbForConsole', function (Cache $cache, Registry $register) {
$pools = $register->get('pools');
$database = $pools
->get('console')
->pop()
->getResource()
;
$adapter = new Database($database, $cache);
$adapter->setNamespace('console');
return $adapter;
}, ['cache', 'register']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) {
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$pools = $register->get('pools');
$database = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$adapter = new Database($database, $cache);
$adapter->setNamespace('_' . $project->getInternalId());
return $adapter;
}, ['cache', 'register', 'message', 'dbForConsole']);
Server::setResource('cache', function (Registry $register) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}, ['register']);
Server::setResource('queueForFunctions', function (Registry $register) {
$pools = $register->get('pools');
return new Func(
$pools
->get('queue')
->pop()
->getResource()
);
}, ['register']);
Server::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);
Server::setResource('statsd', function ($register) {
return $register->get('statsd');
}, ['register']);
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
if (empty(App::getEnv('QUEUE'))) {
throw new Exception('Please configure "QUEUE" environemnt variable.');
}
$adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE'));
$server = new Server($adapter);
$server
->error()
->inject('error')
->inject('logger')
->action(function (Throwable $error, Logger $logger) {
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
if ($error instanceof PDOException) {
throw $error;
}
if ($error->getCode() >= 500 || $error->getCode() === 0) {
$log = new Log();
$log->setNamespace("appwrite-worker");
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-queue-' . App::getEnv('QUEUE'));
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$logger->addLog($log);
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
});

View file

@ -1,10 +1,10 @@
<?php
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Utopia\Response\Model\Deployment;
use Cron\CronExpression;
use Executor\Executor;
use Appwrite\Usage\Stats;
use Utopia\Database\DateTime;
@ -14,6 +14,7 @@ use Utopia\Database\ID;
use Utopia\Storage\Storage;
use Utopia\Database\Document;
use Utopia\Config\Config;
use Utopia\Database\Validator\Authorization;
require_once __DIR__ . '/../init.php';
@ -57,6 +58,8 @@ class BuildsV1 extends Worker
protected function buildDeployment(Document $project, Document $function, Document $deployment)
{
global $register;
$dbForProject = $this->getProjectDB($project);
$function = $dbForProject->getDocument('functions', $function->getId());
@ -118,10 +121,13 @@ class BuildsV1 extends Worker
->trigger();
/** Trigger Functions */
$deploymentUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop();
$functions = new Func($connection->getResource());
$functions
->from($deploymentUpdate)
->trigger();
$connection->reclaim();
/** Trigger Realtime */
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
@ -145,7 +151,7 @@ class BuildsV1 extends Worker
$source = $deployment->getAttribute('path');
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
@ -181,6 +187,8 @@ class BuildsV1 extends Worker
Console::success("Build id: $buildId created");
$function->setAttribute('scheduleUpdatedAt', DateTime::now());
/** Set auto deploy */
if ($deployment->getAttribute('activate') === true) {
$function->setAttribute('deployment', $deployment->getId());
@ -188,11 +196,16 @@ class BuildsV1 extends Worker
}
/** Update function schedule */
$schedule = $function->getAttribute('schedule', '');
$cron = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? new CronExpression($schedule) : null;
$next = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? DateTime::format($cron->getNextRunDate()) : null;
$function->setAttribute('scheduleNext', $next);
$function = $dbForProject->updateDocument('functions', $function->getId(), $function);
$dbForConsole = $this->getConsoleDB();
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt'));
$schedule
->setAttribute('schedule', $function->getAttribute('schedule'))
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
} catch (\Throwable $th) {
$endTime = DateTime::now();
$interval = (new \DateTime($endTime))->diff(new \DateTime($startTime));
@ -222,7 +235,6 @@ class BuildsV1 extends Worker
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);

View file

@ -468,19 +468,7 @@ class DeletesV1 extends Worker
Query::equal('functionId', [$functionId])
], $dbForProject);
/**
* Request executor to delete all deployment containers
* TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically.
*/
// Console::info("Requesting executor to delete all deployment containers for function " . $functionId);
// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
// foreach ($deploymentIds as $deploymentId) {
// try {
// $executor->deleteRuntime($projectId, $deploymentId);
// } catch (Throwable $th) {
// Console::error($th->getMessage());
// }
// }
// TODO: Request executor to delete runtime
}
/**
@ -520,17 +508,7 @@ class DeletesV1 extends Worker
}
});
/**
* Request executor to delete the deployment container.
* TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically.
*/
// Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId);
// try {
// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
// $executor->deleteRuntime($projectId, $deploymentId);
// } catch (Throwable $th) {
// Console::error($th->getMessage());
// }
// TODO: Request executor to delete runtime
}

View file

@ -1,209 +1,45 @@
<?php
require_once __DIR__ . '/../worker.php';
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\Response\Model\Execution;
use Cron\CronExpression;
use Domnikl\Statsd\Client;
use Executor\Executor;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\ID;
use Utopia\Database\Permission;
use Utopia\Database\Query;
use Utopia\Database\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Functions V1 Worker');
Console::success(APP_NAME . ' functions worker v1 has started');
class FunctionsV1 extends Worker
{
private ?Executor $executor = null;
public array $args = [];
public array $allowed = [];
public function getName(): string
{
return "functions";
}
public function init(): void
{
$this->executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
}
public function run(): void
{
$type = $this->args['type'] ?? '';
$events = $this->args['events'] ?? [];
$project = new Document($this->args['project'] ?? []);
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
if ($project->getId() === 'console') {
return;
}
$database = $this->getProjectDB($project);
/**
* Handle Event execution.
*/
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $database->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'event',
// Pass first, most verbose event pattern
event: $events[0],
eventData: $payload,
user: $user
);
Console::success('Triggered function: ' . $events[0]);
}
}
return;
}
/**
* Handle Schedule and HTTP execution.
*/
$user = new Document($this->args['user'] ?? []);
$project = new Document($this->args['project'] ?? []);
$execution = new Document($this->args['execution'] ?? []);
$function = new Document($this->args['function'] ?? []);
switch ($type) {
case 'http':
$jwt = $this->args['jwt'] ?? '';
$data = $this->args['data'] ?? '';
$function = $database->getDocument('functions', $execution->getAttribute('functionId'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
executionId: $execution->getId(),
trigger: 'http',
data: $data,
user: $user,
jwt: $jwt
);
break;
case 'schedule':
$functionOriginal = $function;
/*
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
// Reschedule
$function = $database->getDocument('functions', $function->getId());
if (empty($function->getId())) {
throw new Exception('Function not found (' . $function->getId() . ')');
}
if ($functionOriginal->getAttribute('schedule') !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
return;
}
if ($functionOriginal->getAttribute('scheduleUpdatedAt') !== $function->getAttribute('scheduleUpdatedAt')) { // Double execution due to rapid cron changes, ignore this run.
return;
}
$cron = new CronExpression($function->getAttribute('schedule'));
$next = DateTime::format($cron->getNextRunDate());
$function = $function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', DateTime::now());
$function = $database->updateDocument(
'functions',
$function->getId(),
$function
);
$reschedule = new Func();
$reschedule
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
;
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'schedule'
);
break;
}
}
private function execute(
Server::setResource('execute', function () {
return function (
Func $queueForFunctions,
Database $dbForProject,
Client $statsd,
Document $project,
Document $function,
Database $dbForProject,
string $trigger,
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
string $jwt = null,
string $event = null,
string $eventData = null,
string $executionId = null,
) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
@ -212,28 +48,28 @@ class FunctionsV1 extends Worker
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
throw new Exception('Build not found');
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
throw new Exception('Build not ready');
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
$runtime = $runtimes[$function->getAttribute('runtime')];
@ -256,14 +92,17 @@ class FunctionsV1 extends Worker
'search' => implode(' ', [$functionId, $executionId]),
]));
// TODO: @Meldiron Trigger executions.create event here
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
@ -286,7 +125,8 @@ class FunctionsV1 extends Worker
/** Execute function */
try {
$executionResponse = $this->executor->createExecution(
$client = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$executionResponse = $client->createExecution(
projectId: $project->getId(),
deploymentId: $deploymentId,
payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
@ -330,9 +170,8 @@ class FunctionsV1 extends Worker
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($executionUpdate)
->trigger();
/** Trigger realtime event */
@ -361,12 +200,11 @@ class FunctionsV1 extends Worker
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('projectInternalId', $project->getInternalId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
@ -375,9 +213,118 @@ class FunctionsV1 extends Worker
->setParam('networkResponseSize', 0)
->submit();
}
}
};
});
public function shutdown(): void
{
}
}
$server->job()
->inject('message')
->inject('dbForProject')
->inject('queueForFunctions')
->inject('statsd')
->inject('execute')
->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$type = $payload['type'] ?? '';
$events = $payload['events'] ?? [];
$data = $payload['data'] ?? '';
$eventData = $payload['payload'] ?? '';
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
$user = new Document($payload['user'] ?? []);
if ($project->getId() === 'console') {
return;
}
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$execute(
statsd: $statsd,
dbForProject: $dbForProject,
project: $project,
function: $function,
queueForFunctions: $queueForFunctions,
trigger: 'event',
event: $events[0],
eventData: $eventData,
user: $user,
data: null,
executionId: null,
jwt: null
);
Console::success('Triggered function: ' . $events[0]);
}
}
return;
}
/**
* Handle Schedule and HTTP execution.
*/
switch ($type) {
case 'http':
$jwt = $payload['jwt'] ?? '';
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
$execute(
project: $project,
function: $function,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
trigger: 'http',
executionId: $execution->getId(),
event: null,
eventData: null,
data: $data,
user: $user,
jwt: $jwt,
statsd: $statsd,
);
break;
case 'schedule':
$execute(
project: $project,
function: $function,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
trigger: 'schedule',
executionId: null,
event: null,
eventData: null,
data: null,
user: null,
jwt: null,
statsd: $statsd,
);
break;
}
});
$server->workerStart();
$server->start();

View file

@ -1,10 +1,3 @@
#!/bin/sh
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
then
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
else
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
fi
INTERVAL=1 REDIS_BACKEND=$REDIS_BACKEND RESQUE_PHP='/usr/src/code/vendor/autoload.php' php /usr/src/code/vendor/bin/resque-scheduler
php /usr/src/code/app/cli.php schedule $@

View file

@ -1,10 +1,3 @@
#!/bin/sh
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
then
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
else
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
fi
INTERVAL=0.1 QUEUE='v1-functions' APP_INCLUDE='/usr/src/code/app/workers/functions.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@

View file

@ -53,13 +53,14 @@
"utopia-php/domains": "1.1.*",
"utopia-php/framework": "0.25.*",
"utopia-php/image": "0.5.*",
"utopia-php/queue": "0.4.*",
"utopia-php/locale": "0.4.*",
"utopia-php/logger": "0.3.*",
"utopia-php/orchestration": "0.9.*",
"utopia-php/platform": "0.3.*",
"utopia-php/pools": "0.4.*",
"utopia-php/preloader": "0.2.*",
"utopia-php/registry": "0.5.0",
"utopia-php/registry": "0.5.*",
"utopia-php/storage": "0.11.*",
"utopia-php/swoole": "0.5.*",
"utopia-php/websocket": "0.1.0",

65
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "bf3e2ed6ee8e49ab74af97b368b89a63",
"content-hash": "a673091aa6bd8ef01380b63245427c93",
"packages": [
{
"name": "adhocore/jwt",
@ -2357,6 +2357,67 @@
},
"time": "2020-10-24T07:04:59+00:00"
},
{
"name": "utopia-php/queue",
"version": "0.4.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"shasum": ""
},
"require": {
"php": ">=8.0",
"utopia-php/cli": "0.14.*",
"utopia-php/framework": "0.*.*"
},
"require-dev": {
"laravel/pint": "^0.2.3",
"phpstan/phpstan": "^1.8",
"phpunit/phpunit": "^9.5.5",
"swoole/ide-helper": "4.8.8",
"workerman/workerman": "^4.0"
},
"suggest": {
"ext-swoole": "Needed to support Swoole.",
"workerman/workerman": "Needed to support Workerman."
},
"type": "library",
"autoload": {
"psr-4": {
"Utopia\\Queue\\": "src/Queue"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Torsten Dittmann",
"email": "torsten@appwrite.io"
}
],
"description": "A powerful task queue.",
"keywords": [
"Tasks",
"framework",
"php",
"queue",
"upf",
"utopia"
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/0.4.1"
},
"time": "2022-11-15T16:56:37+00:00"
},
{
"name": "utopia-php/registry",
"version": "0.5.0",
@ -5241,5 +5302,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.3.0"
"plugin-api-version": "2.1.0"
}

View file

@ -186,6 +186,7 @@ services:
- _APP_MAINTENANCE_RETENTION_AUDIT
- _APP_SMS_PROVIDER
- _APP_SMS_FROM
- _APP_REGION
appwrite-realtime:
entrypoint: realtime
@ -257,6 +258,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -290,6 +292,7 @@ services:
- request-catcher
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_SYSTEM_SECURITY_EMAIL_ADDRESS
- _APP_REDIS_HOST
@ -320,6 +323,7 @@ services:
- ./src:/usr/src/code/src
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -375,6 +379,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -407,6 +412,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_EXECUTOR_SECRET
- _APP_EXECUTOR_HOST
@ -443,6 +449,7 @@ services:
- ./src:/usr/src/code/src
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DOMAIN
- _APP_DOMAIN_TARGET
@ -479,6 +486,7 @@ services:
- openruntimes-executor
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -516,6 +524,7 @@ services:
# - smtp
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_SYSTEM_EMAIL_NAME
- _APP_SYSTEM_EMAIL_ADDRESS
@ -546,6 +555,7 @@ services:
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
@ -570,6 +580,7 @@ services:
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_DOMAIN
- _APP_DOMAIN_TARGET
- _APP_OPENSSL_KEY_V1
@ -622,6 +633,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -660,6 +672,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -691,14 +704,25 @@ services:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_CONNECTIONS_DB_PROJECT
- _APP_CONNECTIONS_DB_CONSOLE
- _APP_CONNECTIONS_CACHE
- _APP_CONNECTIONS_QUEUE
- _APP_REGION
openruntimes-executor:
container_name: openruntimes-executor
@ -728,7 +752,7 @@ services:
mariadb:
image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p
container_name: mariadb
container_name: appwrite-mariadb
<<: *x-logging
networks:
- appwrite
@ -753,7 +777,7 @@ services:
# - RELAY_FROM_HOSTS=192.168.0.0/16 ; *.yourdomain.com
# - SMARTHOST_HOST=smtp
# - SMARTHOST_PORT=587
redis:
image: redis:7.0.4-alpine
<<: *x-logging

View file

@ -337,8 +337,6 @@ class Event
default => false
};
return [
'type' => $type,
'resource' => $resource,

View file

@ -6,6 +6,8 @@ use DateTime;
use Resque;
use ResqueScheduler;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Func extends Event
{
@ -15,7 +17,7 @@ class Func extends Event
protected ?Document $function = null;
protected ?Document $execution = null;
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME);
}
@ -143,7 +145,11 @@ class Func extends Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
$events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null;
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
@ -151,28 +157,26 @@ class Func extends Event
'type' => $this->type,
'jwt' => $this->jwt,
'payload' => $this->payload,
'data' => $this->data
'events' => $events,
'data' => $this->data,
]);
}
/**
* Schedules the function event and schedules it in the functions worker queue.
* Generate a function event from a base event
*
* @param Event $event
*
* @return self
*
* @param \DateTime|int $at
* @return void
* @throws \Resque_Exception
* @throws \ResqueScheduler_InvalidTimestampException
*/
public function schedule(DateTime|int $at): void
public function from(Event $event): self
{
ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
'execution' => $this->execution,
'type' => $this->type,
'payload' => $this->payload,
'data' => $this->data
]);
$this->project = $event->getProject();
$this->user = $event->getUser();
$this->payload = $event->getPayload();
$this->event = $event->getEvent();
$this->params = $event->getParams();
return $this;
}
}

View file

@ -7,6 +7,7 @@ use Appwrite\Platform\Tasks\Doctor;
use Appwrite\Platform\Tasks\Install;
use Appwrite\Platform\Tasks\Maintenance;
use Appwrite\Platform\Tasks\Migrate;
use Appwrite\Platform\Tasks\Schedule;
use Appwrite\Platform\Tasks\SDKs;
use Appwrite\Platform\Tasks\Specs;
use Appwrite\Platform\Tasks\SSL;
@ -28,6 +29,7 @@ class Tasks extends Service
->addAction(Doctor::getName(), new Doctor())
->addAction(Install::getName(), new Install())
->addAction(Maintenance::getName(), new Maintenance())
->addAction(Schedule::getName(), new Schedule())
->addAction(Migrate::getName(), new Migrate())
->addAction(SDKs::getName(), new SDKs())
->addAction(VolumeSync::getName(), new VolumeSync())

View file

@ -139,6 +139,8 @@ class Maintenance extends Action
notifyDeleteExpiredSessions();
renewCertificates($dbForConsole);
notifyDeleteCache($cacheRetention);
// TODO: @Meldiron Every probably 24h, look for schedules with active=false, that doesnt have function anymore. Dlete such schedule
}, $interval);
}
}

View file

@ -0,0 +1,237 @@
<?php
namespace Appwrite\Platform\Tasks;
use Cron\CronExpression;
use Swoole\Timer;
use Utopia\App;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Func;
use function Swoole\Coroutine\run;
class Schedule extends Action
{
public const FUNCTION_UPDATE_TIMER = 10; //seconds
public const FUNCTION_ENQUEUE_TIMER = 60; //seconds
public static function getName(): string
{
return 'schedule';
}
public function __construct()
{
$this
->desc('Execute functions scheduled in Appwrite')
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
* 1. Load all documents from 'schedules' collection to create local copy
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title('Scheduler V1');
Console::success(APP_NAME . ' Scheduler v1 has started');
/**
* Extract only nessessary attributes to lower memory used.
*
* @var Document $schedule
* @return array
*/
$getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array {
$project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId'));
$function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId'));
return [
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
];
};
$schedules = []; // Local copy of 'schedules' collection
$lastSyncUpdate = DateTime::now();
$limit = 10000;
$sum = $limit;
$total = 0;
$loadStart = \microtime(true);
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('resourceType', ['function']),
Query::equal('active', [true]),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$schedules[$document['resourceId']] = $getSchedule($document);
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$pools->reclaim();
Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(
function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
Console::log("Sync tick: Running at $time");
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument !== null) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [App::getEnv('_APP_REGION')]),
Query::equal('resourceType', ['function']),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null;
$org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null;
$new = strtotime($document['resourceUpdatedAt']);
if ($document['active'] === false) {
Console::info("Removing: {$document['resourceId']}");
unset($schedules[$document['resourceId']]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceId']}");
$schedules[$document['resourceId']] = $getSchedule($document);
}
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
/**
* The timer to prepare soon-to-execute schedules.
*/
$lastEnqueueUpdate = null;
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) {
$timerStart = \microtime(true);
$time = DateTime::now();
$enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate;
$timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff);
Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)");
$total = 0;
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']);
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
$currentTick = $next < $timeFrame;
if (!$currentTick) {
continue;
}
$total++;
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $key;
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!isset($schedules[$scheduleKey])) {
return;
}
$schedule = $schedules[$scheduleKey];
$functions = new Func($connection);
$functions
->setType('schedule')
->setFunction($schedule['function'])
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}
$timerEnd = \microtime(true);
$lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
};
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());
$enqueueFunctions();
}
);
}
}

View file

@ -81,18 +81,6 @@ class Func extends Model
'default' => '',
'example' => '5 4 * * *',
])
->addRule('scheduleNext', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s next scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('schedulePrevious', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s previous scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('timeout', [
'type' => self::TYPE_INTEGER,
'description' => 'Function execution timeout in seconds.',