Merge pull request #4659 from appwrite/refactor-functions-worker

Refactor functions worker
This commit is contained in:
Christy Jacob 2022-11-16 19:21:17 +05:30 committed by GitHub
commit 0c759e958d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 477 additions and 314 deletions

View file

@ -3,6 +3,7 @@
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/controllers/general.php';
use Appwrite\Event\Func;
use Appwrite\Platform\Appwrite;
use Utopia\CLI\CLI;
use Utopia\Database\Validator\Authorization;
@ -109,6 +110,10 @@ CLI::setResource('influxdb', function (Registry $register) {
return $database;
}, ['register']);
CLI::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -469,13 +469,13 @@ App::put('/v1/functions/:functionId')
'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]),
])));
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
/**
* In case we want to clear the schedule
*/
if (!empty($function->getAttribute('deployment'))) {
$schedule->setAttribute('resourceUpdatedAt', $function['scheduleUpdatedAt']);
$schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt'));
}
$schedule
@ -483,7 +483,7 @@ App::put('/v1/functions/:functionId')
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
$dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule);
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
$eventsInstance->setParam('functionId', $function->getId());
@ -537,7 +537,7 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId')
'deployment' => $deployment->getId()
])));
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$active = !empty($function->getAttribute('schedule'));
@ -588,7 +588,7 @@ App::delete('/v1/functions/:functionId')
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB');
}
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$schedule
->setAttribute('resourceUpdatedAt', DateTime::now())
@ -790,7 +790,7 @@ App::post('/v1/functions/:functionId/deployments')
* TODO Should we update also the function collection with the scheduleUpdatedAt attr?
*/
$schedule = $dbForConsole->getDocument('schedules', $function['scheduleId']);
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$active = !empty($function->getAttribute('schedule'));
@ -1034,8 +1034,6 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId')
$response->noContent();
});
App::post('/v1/functions/:functionId/executions')
->groups(['api', 'functions'])
->desc('Create Execution')
@ -1061,7 +1059,8 @@ App::post('/v1/functions/:functionId/executions')
->inject('events')
->inject('usage')
->inject('mode')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode) {
->inject('queueForFunctions')
->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Func $queueForFunctions) {
$function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId));
@ -1149,24 +1148,22 @@ App::post('/v1/functions/:functionId/executions')
->setContext('function', $function);
if ($async) {
$event = new Func();
$event
$queueForFunctions
->setType('http')
->setExecution($execution)
->setFunction($function)
->setData($data)
->setJWT($jwt)
->setProject($project)
->setUser($user);
$event->trigger();
->setUser($user)
->trigger();
return $response
->setStatusCode(Response::STATUS_CODE_ACCEPTED)
->dynamic($execution, Response::MODEL_EXECUTION);
}
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value') ?? '';
return $carry;
}, []);

View file

@ -5,6 +5,7 @@ use Appwrite\Event\Audit;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Usage\Stats;
@ -128,9 +129,9 @@ App::init()
}
}
/*
* Background Jobs
*/
/*
* Background Jobs
*/
$events
->setEvent($route->getLabel('event', ''))
->setProject($project)
@ -251,7 +252,8 @@ App::shutdown()
->inject('database')
->inject('mode')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject) use ($parseLabel) {
->inject('queueForFunctions')
->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $queueForFunctions) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -262,9 +264,8 @@ App::shutdown()
/**
* Trigger functions.
*/
$events
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($events)
->trigger();
/**

View file

@ -71,9 +71,11 @@ use Utopia\Pools\Group;
use Utopia\Pools\Pool;
use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Event\Func;
use MaxMind\Db\Reader;
use PHPMailer\PHPMailer\PHPMailer;
use Swoole\Database\PDOProxy;
use Utopia\Queue;
const APP_NAME = 'Appwrite';
const APP_DOMAIN = 'appwrite.io';
@ -644,14 +646,16 @@ $register->set('pools', function () {
};
$adapter->setDefaultDatabase($dsn->getDatabase());
break;
case 'queue':
$adapter = $resource();
break;
case 'pubsub':
$adapter = $resource();
break;
case 'queue':
$adapter = match ($dsn->getScheme()) {
'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()),
default => null
};
break;
case 'cache':
$adapter = match ($dsn->getScheme()) {
'redis' => new RedisCache($resource()),
@ -675,6 +679,7 @@ $register->set('pools', function () {
return $group;
});
$register->set('influxdb', function () {
// Register DB connection
$host = App::getEnv('_APP_INFLUXDB_HOST', '');
@ -847,10 +852,12 @@ App::setResource('mails', fn() => new Mail());
App::setResource('deletes', fn() => new Delete());
App::setResource('database', fn() => new EventDatabase());
App::setResource('messaging', fn() => new Phone());
App::setResource('queueForFunctions', function (Group $pools) {
return new Func($pools->get('queue')->pop()->getResource());
}, ['pools']);
App::setResource('usage', function ($register) {
return new Stats($register->get('statsd'));
}, ['register']);
App::setResource('clients', function ($request, $console, $project) {
$console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'),

View file

@ -36,6 +36,7 @@ foreach (
realpath(__DIR__ . '/../vendor/mongodb'),
realpath(__DIR__ . '/../vendor/utopia-php/websocket'), // TODO: remove workerman autoload
realpath(__DIR__ . '/../vendor/utopia-php/cache'), // TODO: remove memcached autoload
realpath(__DIR__ . '/../vendor/utopia-php/queue/src/Queue/Adapter/Workerman.php'), // TODO: remove workerman autoload
] as $key => $value
) {
if ($value !== false) {

145
app/worker.php Normal file
View file

@ -0,0 +1,145 @@
<?php
require_once __DIR__ . '/init.php';
use Appwrite\Event\Func;
use Swoole\Runtime;
use Utopia\App;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Queue\Adapter\Swoole;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Logger\Log;
use Utopia\Logger\Logger;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
global $register;
Server::setResource('register', fn() => $register);
Server::setResource('dbForConsole', function (Cache $cache, Registry $register) {
$pools = $register->get('pools');
$database = $pools
->get('console')
->pop()
->getResource()
;
$adapter = new Database($database, $cache);
$adapter->setNamespace('console');
return $adapter;
}, ['cache', 'register']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) {
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$pools = $register->get('pools');
$database = $pools
->get($project->getAttribute('database'))
->pop()
->getResource()
;
$adapter = new Database($database, $cache);
$adapter->setNamespace('_' . $project->getInternalId());
return $adapter;
}, ['cache', 'register', 'message', 'dbForConsole']);
Server::setResource('cache', function (Registry $register) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
}
return new Cache(new Sharding($adapters));
}, ['register']);
Server::setResource('queueForFunctions', function (Registry $register) {
$pools = $register->get('pools');
return new Func(
$pools
->get('queue')
->pop()
->getResource()
);
}, ['register']);
Server::setResource('logger', function ($register) {
return $register->get('logger');
}, ['register']);
Server::setResource('statsd', function ($register) {
return $register->get('statsd');
}, ['register']);
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop()->getResource();
$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6));
if (empty(App::getEnv('QUEUE'))) {
throw new Exception('Please configure "QUEUE" environemnt variable.');
}
$adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE'));
$server = new Server($adapter);
$server
->error()
->inject('error')
->inject('logger')
->action(function (Throwable $error, Logger $logger) {
$version = App::getEnv('_APP_VERSION', 'UNKNOWN');
if ($error instanceof PDOException) {
throw $error;
}
if ($error->getCode() >= 500 || $error->getCode() === 0) {
$log = new Log();
$log->setNamespace("appwrite-worker");
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-queue-' . App::getEnv('QUEUE'));
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles);
$isProduction = App::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$logger->addLog($log);
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
});

View file

@ -1,10 +1,10 @@
<?php
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Utopia\Response\Model\Deployment;
use Cron\CronExpression;
use Executor\Executor;
use Appwrite\Usage\Stats;
use Utopia\Database\DateTime;
@ -14,6 +14,7 @@ use Utopia\Database\ID;
use Utopia\Storage\Storage;
use Utopia\Database\Document;
use Utopia\Config\Config;
use Utopia\Database\Validator\Authorization;
require_once __DIR__ . '/../init.php';
@ -57,6 +58,8 @@ class BuildsV1 extends Worker
protected function buildDeployment(Document $project, Document $function, Document $deployment)
{
global $register;
$dbForProject = $this->getProjectDB($project);
$function = $dbForProject->getDocument('functions', $function->getId());
@ -118,10 +121,13 @@ class BuildsV1 extends Worker
->trigger();
/** Trigger Functions */
$deploymentUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$pools = $register->get('pools');
$connection = $pools->get('queue')->pop();
$functions = new Func($connection->getResource());
$functions
->from($deploymentUpdate)
->trigger();
$connection->reclaim();
/** Trigger Realtime */
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
@ -145,7 +151,7 @@ class BuildsV1 extends Worker
$source = $deployment->getAttribute('path');
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
@ -181,6 +187,8 @@ class BuildsV1 extends Worker
Console::success("Build id: $buildId created");
$function->setAttribute('scheduleUpdatedAt', DateTime::now());
/** Set auto deploy */
if ($deployment->getAttribute('activate') === true) {
$function->setAttribute('deployment', $deployment->getId());
@ -188,11 +196,16 @@ class BuildsV1 extends Worker
}
/** Update function schedule */
$schedule = $function->getAttribute('schedule', '');
$cron = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? new CronExpression($schedule) : null;
$next = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? DateTime::format($cron->getNextRunDate()) : null;
$function->setAttribute('scheduleNext', $next);
$function = $dbForProject->updateDocument('functions', $function->getId(), $function);
$dbForConsole = $this->getConsoleDB();
$schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId'));
$schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt'));
$schedule
->setAttribute('schedule', $function->getAttribute('schedule'))
->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment')));
Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule));
} catch (\Throwable $th) {
$endTime = DateTime::now();
$interval = (new \DateTime($endTime))->diff(new \DateTime($startTime));
@ -222,7 +235,6 @@ class BuildsV1 extends Worker
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);

View file

@ -468,19 +468,7 @@ class DeletesV1 extends Worker
Query::equal('functionId', [$functionId])
], $dbForProject);
/**
* Request executor to delete all deployment containers
* TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically.
*/
// Console::info("Requesting executor to delete all deployment containers for function " . $functionId);
// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
// foreach ($deploymentIds as $deploymentId) {
// try {
// $executor->deleteRuntime($projectId, $deploymentId);
// } catch (Throwable $th) {
// Console::error($th->getMessage());
// }
// }
// TODO: Request executor to delete runtime
}
/**
@ -520,17 +508,7 @@ class DeletesV1 extends Worker
}
});
/**
* Request executor to delete the deployment container.
* TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically.
*/
// Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId);
// try {
// $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
// $executor->deleteRuntime($projectId, $deploymentId);
// } catch (Throwable $th) {
// Console::error($th->getMessage());
// }
// TODO: Request executor to delete runtime
}

View file

@ -1,209 +1,45 @@
<?php
require_once __DIR__ . '/../worker.php';
use Utopia\Queue\Message;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Resque\Worker;
use Appwrite\Usage\Stats;
use Appwrite\Utopia\Response\Model\Execution;
use Cron\CronExpression;
use Domnikl\Statsd\Client;
use Executor\Executor;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\ID;
use Utopia\Database\Permission;
use Utopia\Database\Query;
use Utopia\Database\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Server;
require_once __DIR__ . '/../init.php';
Authorization::disable();
Authorization::setDefaultStatus(false);
Console::title('Functions V1 Worker');
Console::success(APP_NAME . ' functions worker v1 has started');
class FunctionsV1 extends Worker
{
private ?Executor $executor = null;
public array $args = [];
public array $allowed = [];
public function getName(): string
{
return "functions";
}
public function init(): void
{
$this->executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
}
public function run(): void
{
$type = $this->args['type'] ?? '';
$events = $this->args['events'] ?? [];
$project = new Document($this->args['project'] ?? []);
$user = new Document($this->args['user'] ?? []);
$payload = json_encode($this->args['payload'] ?? []);
if ($project->getId() === 'console') {
return;
}
$database = $this->getProjectDB($project);
/**
* Handle Event execution.
*/
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $database->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'event',
// Pass first, most verbose event pattern
event: $events[0],
eventData: $payload,
user: $user
);
Console::success('Triggered function: ' . $events[0]);
}
}
return;
}
/**
* Handle Schedule and HTTP execution.
*/
$user = new Document($this->args['user'] ?? []);
$project = new Document($this->args['project'] ?? []);
$execution = new Document($this->args['execution'] ?? []);
$function = new Document($this->args['function'] ?? []);
switch ($type) {
case 'http':
$jwt = $this->args['jwt'] ?? '';
$data = $this->args['data'] ?? '';
$function = $database->getDocument('functions', $execution->getAttribute('functionId'));
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
executionId: $execution->getId(),
trigger: 'http',
data: $data,
user: $user,
jwt: $jwt
);
break;
case 'schedule':
$functionOriginal = $function;
/*
* 1. Get Original Task
* 2. Check for updates
* If has updates skip task and don't reschedule
* If status not equal to play skip task
* 3. Check next run date, update task and add new job at the given date
* 4. Execute task (set optional timeout)
* 5. Update task response to log
* On success reset error count
* On failure add error count
* If error count bigger than allowed change status to pause
*/
// Reschedule
$function = $database->getDocument('functions', $function->getId());
if (empty($function->getId())) {
throw new Exception('Function not found (' . $function->getId() . ')');
}
if ($functionOriginal->getAttribute('schedule') !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run.
return;
}
if ($functionOriginal->getAttribute('scheduleUpdatedAt') !== $function->getAttribute('scheduleUpdatedAt')) { // Double execution due to rapid cron changes, ignore this run.
return;
}
$cron = new CronExpression($function->getAttribute('schedule'));
$next = DateTime::format($cron->getNextRunDate());
$function = $function
->setAttribute('scheduleNext', $next)
->setAttribute('schedulePrevious', DateTime::now());
$function = $database->updateDocument(
'functions',
$function->getId(),
$function
);
$reschedule = new Func();
$reschedule
->setFunction($function)
->setType('schedule')
->setUser($user)
->setProject($project)
->schedule(new \DateTime($next));
;
$this->execute(
project: $project,
function: $function,
dbForProject: $database,
trigger: 'schedule'
);
break;
}
}
private function execute(
Server::setResource('execute', function () {
return function (
Func $queueForFunctions,
Database $dbForProject,
Client $statsd,
Document $project,
Document $function,
Database $dbForProject,
string $trigger,
string $executionId = null,
string $event = null,
string $eventData = null,
string $data = null,
?Document $user = null,
string $jwt = null
string $jwt = null,
string $event = null,
string $eventData = null,
string $executionId = null,
) {
$user ??= new Document();
$functionId = $function->getId();
$deploymentId = $function->getAttribute('deployment', '');
@ -212,28 +48,28 @@ class FunctionsV1 extends Worker
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->getAttribute('resourceId') !== $functionId) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
if ($deployment->isEmpty()) {
throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404);
throw new Exception('Deployment not found. Create deployment before trying to execute a function');
}
/** Check if build has exists */
$build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', ''));
if ($build->isEmpty()) {
throw new Exception('Build not found', 404);
throw new Exception('Build not found');
}
if ($build->getAttribute('status') !== 'ready') {
throw new Exception('Build not ready', 400);
throw new Exception('Build not ready');
}
/** Check if runtime is supported */
$runtimes = Config::getParam('runtimes', []);
if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) {
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400);
throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported');
}
$runtime = $runtimes[$function->getAttribute('runtime')];
@ -256,14 +92,17 @@ class FunctionsV1 extends Worker
'search' => implode(' ', [$functionId, $executionId]),
]));
// TODO: @Meldiron Trigger executions.create event here
if ($execution->isEmpty()) {
throw new Exception('Failed to create or read execution');
}
}
$execution->setAttribute('status', 'processing');
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
$vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) {
$vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) {
$carry[$var->getAttribute('key')] = $var->getAttribute('value');
return $carry;
}, []);
@ -286,7 +125,8 @@ class FunctionsV1 extends Worker
/** Execute function */
try {
$executionResponse = $this->executor->createExecution(
$client = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$executionResponse = $client->createExecution(
projectId: $project->getId(),
deploymentId: $deploymentId,
payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '',
@ -330,9 +170,8 @@ class FunctionsV1 extends Worker
->trigger();
/** Trigger Functions */
$executionUpdate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($executionUpdate)
->trigger();
/** Trigger realtime event */
@ -361,12 +200,11 @@ class FunctionsV1 extends Worker
);
/** Update usage stats */
global $register;
if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') {
$statsd = $register->get('statsd');
$usage = new Stats($statsd);
$usage
->setParam('projectId', $project->getId())
->setParam('projectInternalId', $project->getInternalId())
->setParam('functionId', $function->getId())
->setParam('executions.{scope}.compute', 1)
->setParam('executionStatus', $execution->getAttribute('status', ''))
@ -375,9 +213,118 @@ class FunctionsV1 extends Worker
->setParam('networkResponseSize', 0)
->submit();
}
}
};
});
public function shutdown(): void
{
}
}
$server->job()
->inject('message')
->inject('dbForProject')
->inject('queueForFunctions')
->inject('statsd')
->inject('execute')
->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) {
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$type = $payload['type'] ?? '';
$events = $payload['events'] ?? [];
$data = $payload['data'] ?? '';
$eventData = $payload['payload'] ?? '';
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
$user = new Document($payload['user'] ?? []);
if ($project->getId() === 'console') {
return;
}
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
$functions = [];
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('name'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
Console::log('Fetched ' . $sum . ' functions...');
foreach ($functions as $function) {
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$execute(
statsd: $statsd,
dbForProject: $dbForProject,
project: $project,
function: $function,
queueForFunctions: $queueForFunctions,
trigger: 'event',
event: $events[0],
eventData: $eventData,
user: $user,
data: null,
executionId: null,
jwt: null
);
Console::success('Triggered function: ' . $events[0]);
}
}
return;
}
/**
* Handle Schedule and HTTP execution.
*/
switch ($type) {
case 'http':
$jwt = $payload['jwt'] ?? '';
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
$execute(
project: $project,
function: $function,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
trigger: 'http',
executionId: $execution->getId(),
event: null,
eventData: null,
data: $data,
user: $user,
jwt: $jwt,
statsd: $statsd,
);
break;
case 'schedule':
$execute(
project: $project,
function: $function,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
trigger: 'schedule',
executionId: null,
event: null,
eventData: null,
data: null,
user: null,
jwt: null,
statsd: $statsd,
);
break;
}
});
$server->workerStart();
$server->start();

View file

@ -1,10 +1,3 @@
#!/bin/sh
if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ]
then
REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
else
REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}"
fi
INTERVAL=0.1 QUEUE='v1-functions' APP_INCLUDE='/usr/src/code/app/workers/functions.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php
QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@

View file

@ -53,13 +53,14 @@
"utopia-php/domains": "1.1.*",
"utopia-php/framework": "0.25.*",
"utopia-php/image": "0.5.*",
"utopia-php/queue": "0.4.*",
"utopia-php/locale": "0.4.*",
"utopia-php/logger": "0.3.*",
"utopia-php/orchestration": "0.9.*",
"utopia-php/platform": "0.3.*",
"utopia-php/pools": "0.4.*",
"utopia-php/preloader": "0.2.*",
"utopia-php/registry": "0.5.0",
"utopia-php/registry": "0.5.*",
"utopia-php/storage": "0.11.*",
"utopia-php/swoole": "0.5.*",
"utopia-php/websocket": "0.1.0",

65
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "bf3e2ed6ee8e49ab74af97b368b89a63",
"content-hash": "a673091aa6bd8ef01380b63245427c93",
"packages": [
{
"name": "adhocore/jwt",
@ -2357,6 +2357,67 @@
},
"time": "2020-10-24T07:04:59+00:00"
},
{
"name": "utopia-php/queue",
"version": "0.4.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e",
"shasum": ""
},
"require": {
"php": ">=8.0",
"utopia-php/cli": "0.14.*",
"utopia-php/framework": "0.*.*"
},
"require-dev": {
"laravel/pint": "^0.2.3",
"phpstan/phpstan": "^1.8",
"phpunit/phpunit": "^9.5.5",
"swoole/ide-helper": "4.8.8",
"workerman/workerman": "^4.0"
},
"suggest": {
"ext-swoole": "Needed to support Swoole.",
"workerman/workerman": "Needed to support Workerman."
},
"type": "library",
"autoload": {
"psr-4": {
"Utopia\\Queue\\": "src/Queue"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Torsten Dittmann",
"email": "torsten@appwrite.io"
}
],
"description": "A powerful task queue.",
"keywords": [
"Tasks",
"framework",
"php",
"queue",
"upf",
"utopia"
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/0.4.1"
},
"time": "2022-11-15T16:56:37+00:00"
},
{
"name": "utopia-php/registry",
"version": "0.5.0",
@ -5241,5 +5302,5 @@
"platform-overrides": {
"php": "8.0"
},
"plugin-api-version": "2.3.0"
"plugin-api-version": "2.1.0"
}

View file

@ -258,6 +258,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -291,6 +292,7 @@ services:
- request-catcher
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_SYSTEM_SECURITY_EMAIL_ADDRESS
- _APP_REDIS_HOST
@ -321,6 +323,7 @@ services:
- ./src:/usr/src/code/src
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -376,6 +379,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -408,6 +412,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_EXECUTOR_SECRET
- _APP_EXECUTOR_HOST
@ -444,6 +449,7 @@ services:
- ./src:/usr/src/code/src
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DOMAIN
- _APP_DOMAIN_TARGET
@ -480,6 +486,7 @@ services:
- openruntimes-executor
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -517,6 +524,7 @@ services:
# - smtp
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_SYSTEM_EMAIL_NAME
- _APP_SYSTEM_EMAIL_ADDRESS
@ -547,6 +555,7 @@ services:
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
@ -571,6 +580,7 @@ services:
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_DOMAIN
- _APP_DOMAIN_TARGET
- _APP_OPENSSL_KEY_V1
@ -623,6 +633,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -661,6 +672,7 @@ services:
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
@ -696,6 +708,7 @@ services:
- redis
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER

View file

@ -337,8 +337,6 @@ class Event
default => false
};
return [
'type' => $type,
'resource' => $resource,

View file

@ -6,6 +6,8 @@ use DateTime;
use Resque;
use ResqueScheduler;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class Func extends Event
{
@ -15,7 +17,7 @@ class Func extends Event
protected ?Document $function = null;
protected ?Document $execution = null;
public function __construct()
public function __construct(protected Connection $connection)
{
parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME);
}
@ -143,7 +145,11 @@ class Func extends Event
*/
public function trigger(): string|bool
{
return Resque::enqueue($this->queue, $this->class, [
$client = new Client($this->queue, $this->connection);
$events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null;
return $client->enqueue([
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
@ -151,28 +157,26 @@ class Func extends Event
'type' => $this->type,
'jwt' => $this->jwt,
'payload' => $this->payload,
'data' => $this->data
'events' => $events,
'data' => $this->data,
]);
}
/**
* Schedules the function event and schedules it in the functions worker queue.
* Generate a function event from a base event
*
* @param Event $event
*
* @return self
*
* @param \DateTime|int $at
* @return void
* @throws \Resque_Exception
* @throws \ResqueScheduler_InvalidTimestampException
*/
public function schedule(DateTime|int $at): void
public function from(Event $event): self
{
ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [
'project' => $this->project,
'user' => $this->user,
'function' => $this->function,
'execution' => $this->execution,
'type' => $this->type,
'payload' => $this->payload,
'data' => $this->data
]);
$this->project = $event->getProject();
$this->user = $event->getUser();
$this->payload = $event->getPayload();
$this->event = $event->getEvent();
$this->params = $event->getParams();
return $this;
}
}

View file

@ -3,15 +3,16 @@
namespace Appwrite\Platform\Tasks;
use Cron\CronExpression;
use Swoole\Timer;
use Utopia\App;
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Swoole\Timer;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Func;
use function Swoole\Coroutine\run;
@ -96,7 +97,7 @@ class Schedule extends Action
$pools->reclaim();
Console::success("{$total} functions where loaded in " . (microtime(true) - $loadStart) . " seconds");
Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
@ -151,14 +152,14 @@ class Schedule extends Action
$pools->reclaim();
Console::log("Sync tick: {$total} schedules where updates in " . ($timerEnd - $timerStart) . " seconds");
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
/**
* The timer to prepare soon-to-execute schedules.
*/
$lastEnqueueUpdate = null;
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate) {
$enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) {
$timerStart = \microtime(true);
$time = DateTime::now();
@ -184,11 +185,9 @@ class Schedule extends Action
$total++;
$promiseStart = \microtime(true); // in seconds
$promiseStart = \time(); // in seconds
$executionStart = $nextDate->getTimestamp(); // in seconds
$executionSleep = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
$delay = \ceil(\intval($executionSleep));
$delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued
if (!isset($delayedExecutions[$delay])) {
$delayedExecutions[$delay] = [];
@ -198,23 +197,36 @@ class Schedule extends Action
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys) {
\go(function () use ($delay, $schedules, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!isset($schedules[$scheduleKey])) {
return;
}
Console::success("Executing function at " . DateTime::now()); // TODO: Send to worker queue
$schedule = $schedules[$scheduleKey];
$functions = new Func($connection);
$functions
->setType('schedule')
->setFunction($schedule['function'])
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}
$timerEnd = \microtime(true);
$lastEnqueueUpdate = $timerStart;
Console::log("Enqueue tick: {$total} executions where enqueued in " . ($timerEnd - $timerStart) . " seconds");
Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds");
};
Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions());

View file

@ -81,18 +81,6 @@ class Func extends Model
'default' => '',
'example' => '5 4 * * *',
])
->addRule('scheduleNext', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s next scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('schedulePrevious', [
'type' => self::TYPE_DATETIME,
'description' => 'Function\'s previous scheduled execution time in ISO 8601 format.',
'default' => '',
'example' => self::TYPE_DATETIME_EXAMPLE,
])
->addRule('timeout', [
'type' => self::TYPE_INTEGER,
'description' => 'Function execution timeout in seconds.',