diff --git a/.env b/.env index 11a2b4c7aa..5f02020f10 100644 --- a/.env +++ b/.env @@ -87,5 +87,6 @@ _APP_USAGE_DATABASE_INTERVAL=15 _APP_USAGE_STATS=enabled _APP_LOGGING_PROVIDER= _APP_LOGGING_CONFIG= +_APP_REGION=default _APP_DOCKER_HUB_USERNAME= -_APP_DOCKER_HUB_PASSWORD= \ No newline at end of file +_APP_DOCKER_HUB_PASSWORD= diff --git a/app/cli.php b/app/cli.php index b176326191..13709b9b57 100644 --- a/app/cli.php +++ b/app/cli.php @@ -3,6 +3,7 @@ require_once __DIR__ . '/init.php'; require_once __DIR__ . '/controllers/general.php'; +use Appwrite\Event\Func; use Appwrite\Platform\Appwrite; use Utopia\CLI\CLI; use Utopia\Database\Validator\Authorization; @@ -56,20 +57,29 @@ CLI::setResource('dbForConsole', function ($pools, $cache) { }, ['pools', 'cache']); CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { - $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache) { + $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools + + $getProjectDB = function (Document $project) use ($pools, $dbForConsole, $cache, &$databases) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } + $databaseName = $project->getAttribute('database'); + + if (isset($databases[$databaseName])) { + return $databases[$databaseName]; + } + $dbAdapter = $pools - ->get($project->getAttribute('database')) + ->get($databaseName) ->pop() - ->getResource() - ; + ->getResource(); $database = new Database($dbAdapter, $cache); $database->setNamespace('_' . $project->getInternalId()); + $databases[$databaseName] = $database; + return $database; }; @@ -100,6 +110,10 @@ CLI::setResource('influxdb', function (Registry $register) { return $database; }, ['register']); +CLI::setResource('queueForFunctions', function (Group $pools) { + return new Func($pools->get('queue')->pop()->getResource()); +}, ['pools']); + CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/config/collections.php b/app/config/collections.php index b4fabf0e65..a0da049b0b 100644 --- a/app/config/collections.php +++ b/app/config/collections.php @@ -751,6 +751,107 @@ $collections = [ ], ], + 'schedules' => [ + '$collection' => ID::custom(Database::METADATA), + '$id' => ID::custom('schedules'), + 'name' => 'schedules', + 'attributes' => [ + [ + '$id' => ID::custom('resourceType'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 100, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('resourceId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('resourceUpdatedAt'), + 'type' => Database::VAR_DATETIME, + 'format' => '', + 'size' => 0, + 'signed' => false, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => ['datetime'], + ], + [ + '$id' => ID::custom('projectId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('schedule'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 100, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + [ + '$id' => ID::custom('active'), + 'type' => Database::VAR_BOOLEAN, + 'signed' => true, + 'size' => 0, + 'format' => '', + 'filters' => [], + 'required' => false, + 'default' => null, + 'array' => false, + ], + [ + '$id' => ID::custom('region'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 10, + 'signed' => true, + 'required' => true, + 'default' => null, + 'array' => false, + 'filters' => [], + ], + ], + 'indexes' => [ + [ + '$id' => ID::custom('_key_region_resourceType_resourceUpdatedAt'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['region', 'resourceType','resourceUpdatedAt'], + 'lengths' => [], + 'orders' => [], + ], + [ + '$id' => ID::custom('_key_region_resourceType_projectId_resourceId'), + 'type' => Database::INDEX_KEY, + 'attributes' => ['region', 'resourceType', 'projectId', 'resourceId'], + 'lengths' => [], + 'orders' => [], + ], + ], + ], + 'platforms' => [ '$collection' => ID::custom(Database::METADATA), '$id' => ID::custom('platforms'), @@ -2148,6 +2249,17 @@ $collections = [ 'array' => true, 'filters' => [], ], + [ + '$id' => ID::custom('scheduleId'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => Database::LENGTH_KEY, + 'signed' => true, + 'required' => false, + 'default' => null, + 'array' => false, + 'filters' => [], + ], [ '$id' => ID::custom('schedule'), 'type' => Database::VAR_STRING, @@ -2160,29 +2272,7 @@ $collections = [ 'filters' => [], ], [ - '$id' => ID::custom('scheduleUpdatedAt'), // Used to fix duplicate executions bug. Can be removed once new queue library is used - 'type' => Database::VAR_DATETIME, - 'format' => '', - 'size' => 0, - 'signed' => false, - 'required' => false, - 'default' => null, - 'array' => false, - 'filters' => ['datetime'], - ], - [ - '$id' => ID::custom('schedulePrevious'), - 'type' => Database::VAR_DATETIME, - 'format' => '', - 'size' => 0, - 'signed' => false, - 'required' => false, - 'default' => null, - 'array' => false, - 'filters' => ['datetime'], - ], - [ - '$id' => ID::custom('scheduleNext'), + '$id' => ID::custom('scheduleUpdatedAt'), 'type' => Database::VAR_DATETIME, 'format' => '', 'size' => 0, @@ -2258,20 +2348,6 @@ $collections = [ 'lengths' => [], 'orders' => [Database::ORDER_ASC], ], - [ - '$id' => ID::custom('_key_scheduleNext'), - 'type' => Database::INDEX_KEY, - 'attributes' => ['scheduleNext'], - 'lengths' => [], - 'orders' => [Database::ORDER_ASC], - ], - [ - '$id' => ID::custom('_key_schedulePrevious'), - 'type' => Database::INDEX_KEY, - 'attributes' => ['schedulePrevious'], - 'lengths' => [], - 'orders' => [Database::ORDER_ASC], - ], [ '$id' => ID::custom('_key_timeout'), 'type' => Database::INDEX_KEY, diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 9b91df5d86..a94cde197d 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -36,7 +36,6 @@ use Utopia\Validator\Text; use Utopia\Validator\Range; use Utopia\Validator\WhiteList; use Utopia\Config\Config; -use Cron\CronExpression; use Executor\Executor; use Utopia\CLI\Console; use Utopia\Database\Validator\Roles; @@ -72,10 +71,8 @@ App::post('/v1/functions') ->inject('project') ->inject('user') ->inject('events') - ->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) { - - $cron = !empty($schedule) ? new CronExpression($schedule) : null; - $next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null; + ->inject('dbForConsole') + ->action(function (string $functionId, string $name, array $execute, string $runtime, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; $function = $dbForProject->createDocument('functions', new Document([ @@ -88,22 +85,24 @@ App::post('/v1/functions') 'events' => $events, 'schedule' => $schedule, 'scheduleUpdatedAt' => DateTime::now(), - 'schedulePrevious' => null, - 'scheduleNext' => $next, 'timeout' => $timeout, 'search' => implode(' ', [$functionId, $name, $runtime]) ])); - if ($next) { - // Async task reschedule - $functionEvent = new Func(); - $functionEvent - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); - } + $schedule = Authorization::skip( + fn() => $dbForConsole->createDocument('schedules', new Document([ + 'region' => App::getEnv('_APP_REGION'), // Todo replace with projects region + 'resourceType' => 'function', + 'resourceId' => $function->getId(), + 'resourceUpdatedAt' => DateTime::now(), + 'projectId' => $project->getId(), + 'schedule' => $function->getAttribute('schedule'), + 'active' => false, + ])) + ); + + $function->setAttribute('scheduleId', $schedule->getId()); + $dbForProject->updateDocument('functions', $function->getId(), $function); $eventsInstance->setParam('functionId', $function->getId()); @@ -448,7 +447,8 @@ App::put('/v1/functions/:functionId') ->inject('project') ->inject('user') ->inject('events') - ->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance) { + ->inject('dbForConsole') + ->action(function (string $functionId, string $name, array $execute, array $events, string $schedule, int $timeout, bool $enabled, Response $response, Database $dbForProject, Document $project, Document $user, Event $eventsInstance, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); @@ -456,9 +456,6 @@ App::put('/v1/functions/:functionId') throw new Exception(Exception::FUNCTION_NOT_FOUND); } - $cron = !empty($schedule) ? new CronExpression($schedule) : null; - $next = !empty($schedule) ? DateTime::format($cron->getNextRunDate()) : null; - $enabled ??= $function->getAttribute('enabled', true); $function = $dbForProject->updateDocument('functions', $function->getId(), new Document(array_merge($function->getArrayCopy(), [ @@ -467,23 +464,27 @@ App::put('/v1/functions/:functionId') 'events' => $events, 'schedule' => $schedule, 'scheduleUpdatedAt' => DateTime::now(), - 'scheduleNext' => $next, 'timeout' => $timeout, 'enabled' => $enabled, 'search' => implode(' ', [$functionId, $name, $function->getAttribute('runtime')]), ]))); - if ($next) { - // Async task reschedule - $functionEvent = new Func(); - $functionEvent - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + + /** + * In case we want to clear the schedule + */ + if (!empty($function->getAttribute('deployment'))) { + $schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt')); } + $schedule + ->setAttribute('schedule', $function->getAttribute('schedule')) + ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment'))); + + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); + $eventsInstance->setParam('functionId', $function->getId()); $response->dynamic($function, Response::MODEL_FUNCTION); @@ -509,7 +510,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId') ->inject('dbForProject') ->inject('project') ->inject('events') - ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events) { + ->inject('dbForConsole') + ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $events, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); $deployment = $dbForProject->getDocument('deployments', $deploymentId); @@ -535,6 +537,18 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId') 'deployment' => $deployment->getId() ]))); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + + $active = !empty($function->getAttribute('schedule')); + + if ($active) { + $schedule->setAttribute('resourceUpdatedAt', datetime::now()); + } + + $schedule->setAttribute('active', $active); + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); + $events ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()); @@ -560,7 +574,9 @@ App::delete('/v1/functions/:functionId') ->inject('dbForProject') ->inject('deletes') ->inject('events') - ->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events) { + ->inject('project') + ->inject('dbForConsole') + ->action(function (string $functionId, Response $response, Database $dbForProject, Delete $deletes, Event $events, Document $project, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); @@ -572,6 +588,15 @@ App::delete('/v1/functions/:functionId') throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove function from DB'); } + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + + $schedule + ->setAttribute('resourceUpdatedAt', DateTime::now()) + ->setAttribute('active', false) + ; + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); + $deletes ->setType(DELETE_TYPE_DOCUMENT) ->setDocument($function); @@ -608,7 +633,8 @@ App::post('/v1/functions/:functionId/deployments') ->inject('project') ->inject('deviceFunctions') ->inject('deviceLocal') - ->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal) { + ->inject('dbForConsole') + ->action(function (string $functionId, string $entrypoint, mixed $code, bool $activate, Request $request, Response $response, Database $dbForProject, Event $events, Document $project, Device $deviceFunctions, Device $deviceLocal, Database $dbForConsole) { $function = $dbForProject->getDocument('functions', $functionId); @@ -760,6 +786,22 @@ App::post('/v1/functions/:functionId/deployments') } } + /** + * TODO Should we update also the function collection with the scheduleUpdatedAt attr? + */ + + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + + $active = !empty($function->getAttribute('schedule')); + + if ($active) { + $schedule->setAttribute('resourceUpdatedAt', datetime::now()); + } + + $schedule->setAttribute('active', $active); + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); + $metadata = null; $events @@ -992,8 +1034,6 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId') $response->noContent(); }); - - App::post('/v1/functions/:functionId/executions') ->groups(['api', 'functions']) ->desc('Create Execution') @@ -1019,7 +1059,8 @@ App::post('/v1/functions/:functionId/executions') ->inject('events') ->inject('usage') ->inject('mode') - ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode) { + ->inject('queueForFunctions') + ->action(function (string $functionId, string $data, bool $async, Response $response, Document $project, Database $dbForProject, Document $user, Event $events, Stats $usage, string $mode, Func $queueForFunctions) { $function = Authorization::skip(fn () => $dbForProject->getDocument('functions', $functionId)); @@ -1107,24 +1148,22 @@ App::post('/v1/functions/:functionId/executions') ->setContext('function', $function); if ($async) { - $event = new Func(); - $event + $queueForFunctions ->setType('http') ->setExecution($execution) ->setFunction($function) ->setData($data) ->setJWT($jwt) ->setProject($project) - ->setUser($user); - - $event->trigger(); + ->setUser($user) + ->trigger(); return $response ->setStatusCode(Response::STATUS_CODE_ACCEPTED) ->dynamic($execution, Response::MODEL_EXECUTION); } - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value') ?? ''; return $carry; }, []); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index e07f405140..5624486755 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -5,6 +5,7 @@ use Appwrite\Event\Audit; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; +use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Usage\Stats; @@ -128,9 +129,9 @@ App::init() } } - /* - * Background Jobs - */ + /* + * Background Jobs + */ $events ->setEvent($route->getLabel('event', '')) ->setProject($project) @@ -251,7 +252,8 @@ App::shutdown() ->inject('database') ->inject('mode') ->inject('dbForProject') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject) use ($parseLabel) { + ->inject('queueForFunctions') + ->action(function (App $utopia, Request $request, Response $response, Document $project, Event $events, Audit $audits, Stats $usage, Delete $deletes, EventDatabase $database, string $mode, Database $dbForProject, Func $queueForFunctions) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -262,9 +264,8 @@ App::shutdown() /** * Trigger functions. */ - $events - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $queueForFunctions + ->from($events) ->trigger(); /** diff --git a/app/init.php b/app/init.php index ac30673de8..31f82740b6 100644 --- a/app/init.php +++ b/app/init.php @@ -71,9 +71,11 @@ use Utopia\Pools\Group; use Utopia\Pools\Pool; use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; +use Appwrite\Event\Func; use MaxMind\Db\Reader; use PHPMailer\PHPMailer\PHPMailer; use Swoole\Database\PDOProxy; +use Utopia\Queue; const APP_NAME = 'Appwrite'; const APP_DOMAIN = 'appwrite.io'; @@ -644,14 +646,16 @@ $register->set('pools', function () { }; $adapter->setDefaultDatabase($dsn->getDatabase()); - - break; - case 'queue': - $adapter = $resource(); break; case 'pubsub': $adapter = $resource(); break; + case 'queue': + $adapter = match ($dsn->getScheme()) { + 'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()), + default => null + }; + break; case 'cache': $adapter = match ($dsn->getScheme()) { 'redis' => new RedisCache($resource()), @@ -675,6 +679,7 @@ $register->set('pools', function () { return $group; }); + $register->set('influxdb', function () { // Register DB connection $host = App::getEnv('_APP_INFLUXDB_HOST', ''); @@ -847,10 +852,12 @@ App::setResource('mails', fn() => new Mail()); App::setResource('deletes', fn() => new Delete()); App::setResource('database', fn() => new EventDatabase()); App::setResource('messaging', fn() => new Phone()); +App::setResource('queueForFunctions', function (Group $pools) { + return new Func($pools->get('queue')->pop()->getResource()); +}, ['pools']); App::setResource('usage', function ($register) { return new Stats($register->get('statsd')); }, ['register']); - App::setResource('clients', function ($request, $console, $project) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), diff --git a/app/preload.php b/app/preload.php index 4935db3da4..e587bfaed5 100644 --- a/app/preload.php +++ b/app/preload.php @@ -36,6 +36,7 @@ foreach ( realpath(__DIR__ . '/../vendor/mongodb'), realpath(__DIR__ . '/../vendor/utopia-php/websocket'), // TODO: remove workerman autoload realpath(__DIR__ . '/../vendor/utopia-php/cache'), // TODO: remove memcached autoload + realpath(__DIR__ . '/../vendor/utopia-php/queue/src/Queue/Adapter/Workerman.php'), // TODO: remove workerman autoload ] as $key => $value ) { if ($value !== false) { diff --git a/app/worker.php b/app/worker.php new file mode 100644 index 0000000000..42a5f92439 --- /dev/null +++ b/app/worker.php @@ -0,0 +1,145 @@ + $register); + +Server::setResource('dbForConsole', function (Cache $cache, Registry $register) { + $pools = $register->get('pools'); + $database = $pools + ->get('console') + ->pop() + ->getResource() + ; + + $adapter = new Database($database, $cache); + $adapter->setNamespace('console'); + + return $adapter; +}, ['cache', 'register']); + +Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) { + $payload = $message->getPayload() ?? []; + $project = new Document($payload['project'] ?? []); + + if ($project->isEmpty() || $project->getId() === 'console') { + return $dbForConsole; + } + + $pools = $register->get('pools'); + $database = $pools + ->get($project->getAttribute('database')) + ->pop() + ->getResource() + ; + + $adapter = new Database($database, $cache); + $adapter->setNamespace('_' . $project->getInternalId()); + + return $adapter; +}, ['cache', 'register', 'message', 'dbForConsole']); + +Server::setResource('cache', function (Registry $register) { + $pools = $register->get('pools'); + $list = Config::getParam('pools-cache', []); + $adapters = []; + + foreach ($list as $value) { + $adapters[] = $pools + ->get($value) + ->pop() + ->getResource() + ; + } + + return new Cache(new Sharding($adapters)); +}, ['register']); + +Server::setResource('queueForFunctions', function (Registry $register) { + $pools = $register->get('pools'); + return new Func( + $pools + ->get('queue') + ->pop() + ->getResource() + ); +}, ['register']); + +Server::setResource('logger', function ($register) { + return $register->get('logger'); +}, ['register']); + +Server::setResource('statsd', function ($register) { + return $register->get('statsd'); +}, ['register']); + +$pools = $register->get('pools'); +$connection = $pools->get('queue')->pop()->getResource(); +$workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); + +if (empty(App::getEnv('QUEUE'))) { + throw new Exception('Please configure "QUEUE" environemnt variable.'); +} + +$adapter = new Swoole($connection, $workerNumber, App::getEnv('QUEUE')); +$server = new Server($adapter); + +$server + ->error() + ->inject('error') + ->inject('logger') + ->action(function (Throwable $error, Logger $logger) { + $version = App::getEnv('_APP_VERSION', 'UNKNOWN'); + + if ($error instanceof PDOException) { + throw $error; + } + + if ($error->getCode() >= 500 || $error->getCode() === 0) { + $log = new Log(); + + $log->setNamespace("appwrite-worker"); + $log->setServer(\gethostname()); + $log->setVersion($version); + $log->setType(Log::TYPE_ERROR); + $log->setMessage($error->getMessage()); + $log->setAction('appwrite-queue-' . App::getEnv('QUEUE')); + $log->addTag('verboseType', get_class($error)); + $log->addTag('code', $error->getCode()); + $log->addExtra('file', $error->getFile()); + $log->addExtra('line', $error->getLine()); + $log->addExtra('trace', $error->getTraceAsString()); + $log->addExtra('detailedTrace', $error->getTrace()); + $log->addExtra('roles', \Utopia\Database\Validator\Authorization::$roles); + + $isProduction = App::getEnv('_APP_ENV', 'development') === 'production'; + $log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING); + + $logger->addLog($log); + } + + Console::error('[Error] Type: ' . get_class($error)); + Console::error('[Error] Message: ' . $error->getMessage()); + Console::error('[Error] File: ' . $error->getFile()); + Console::error('[Error] Line: ' . $error->getLine()); + }); diff --git a/app/workers/builds.php b/app/workers/builds.php index 06c3de3960..1fa86b99a8 100644 --- a/app/workers/builds.php +++ b/app/workers/builds.php @@ -1,10 +1,10 @@ getProjectDB($project); $function = $dbForProject->getDocument('functions', $function->getId()); @@ -118,10 +121,13 @@ class BuildsV1 extends Worker ->trigger(); /** Trigger Functions */ - $deploymentUpdate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $pools = $register->get('pools'); + $connection = $pools->get('queue')->pop(); + $functions = new Func($connection->getResource()); + $functions + ->from($deploymentUpdate) ->trigger(); + $connection->reclaim(); /** Trigger Realtime */ $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ @@ -145,7 +151,7 @@ class BuildsV1 extends Worker $source = $deployment->getAttribute('path'); - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value'); return $carry; }, []); @@ -181,6 +187,8 @@ class BuildsV1 extends Worker Console::success("Build id: $buildId created"); + $function->setAttribute('scheduleUpdatedAt', DateTime::now()); + /** Set auto deploy */ if ($deployment->getAttribute('activate') === true) { $function->setAttribute('deployment', $deployment->getId()); @@ -188,11 +196,16 @@ class BuildsV1 extends Worker } /** Update function schedule */ - $schedule = $function->getAttribute('schedule', ''); - $cron = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? new CronExpression($schedule) : null; - $next = (empty($function->getAttribute('deployment')) && !empty($schedule)) ? DateTime::format($cron->getNextRunDate()) : null; - $function->setAttribute('scheduleNext', $next); - $function = $dbForProject->updateDocument('functions', $function->getId(), $function); + $dbForConsole = $this->getConsoleDB(); + $schedule = $dbForConsole->getDocument('schedules', $function->getAttribute('scheduleId')); + $schedule->setAttribute('resourceUpdatedAt', $function->getAttribute('scheduleUpdatedAt')); + + $schedule + ->setAttribute('schedule', $function->getAttribute('schedule')) + ->setAttribute('active', !empty($function->getAttribute('schedule')) && !empty($function->getAttribute('deployment'))); + + + Authorization::skip(fn () => $dbForConsole->updateDocument('schedules', $schedule->getId(), $schedule)); } catch (\Throwable $th) { $endTime = DateTime::now(); $interval = (new \DateTime($endTime))->diff(new \DateTime($startTime)); @@ -222,7 +235,6 @@ class BuildsV1 extends Worker ); /** Update usage stats */ - global $register; if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { $statsd = $register->get('statsd'); $usage = new Stats($statsd); diff --git a/app/workers/deletes.php b/app/workers/deletes.php index 0134fad66d..3e0df8ce38 100644 --- a/app/workers/deletes.php +++ b/app/workers/deletes.php @@ -468,19 +468,7 @@ class DeletesV1 extends Worker Query::equal('functionId', [$functionId]) ], $dbForProject); - /** - * Request executor to delete all deployment containers - * TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically. - */ - // Console::info("Requesting executor to delete all deployment containers for function " . $functionId); - // $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - // foreach ($deploymentIds as $deploymentId) { - // try { - // $executor->deleteRuntime($projectId, $deploymentId); - // } catch (Throwable $th) { - // Console::error($th->getMessage()); - // } - // } + // TODO: Request executor to delete runtime } /** @@ -520,17 +508,7 @@ class DeletesV1 extends Worker } }); - /** - * Request executor to delete the deployment container. - * TODO: Re-enable. Disabled for now because of proxy. Container killed after inactivity automatically. - */ - // Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId); - // try { - // $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - // $executor->deleteRuntime($projectId, $deploymentId); - // } catch (Throwable $th) { - // Console::error($th->getMessage()); - // } + // TODO: Request executor to delete runtime } diff --git a/app/workers/functions.php b/app/workers/functions.php index 6d1b3f41f9..3047bf027e 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -1,209 +1,45 @@ executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - } - - public function run(): void - { - $type = $this->args['type'] ?? ''; - $events = $this->args['events'] ?? []; - $project = new Document($this->args['project'] ?? []); - $user = new Document($this->args['user'] ?? []); - $payload = json_encode($this->args['payload'] ?? []); - - if ($project->getId() === 'console') { - return; - } - - $database = $this->getProjectDB($project); - - /** - * Handle Event execution. - */ - if (!empty($events)) { - $limit = 30; - $sum = 30; - $offset = 0; - $functions = []; - /** @var Document[] $functions */ - - while ($sum >= $limit) { - $functions = $database->find('functions', [ - Query::limit($limit), - Query::offset($offset), - Query::orderAsc('name'), - ]); - $sum = \count($functions); - $offset = $offset + $limit; - - Console::log('Fetched ' . $sum . ' functions...'); - - foreach ($functions as $function) { - if (!array_intersect($events, $function->getAttribute('events', []))) { - continue; - } - - Console::success('Iterating function: ' . $function->getAttribute('name')); - - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - trigger: 'event', - // Pass first, most verbose event pattern - event: $events[0], - eventData: $payload, - user: $user - ); - - Console::success('Triggered function: ' . $events[0]); - } - } - - return; - } - - /** - * Handle Schedule and HTTP execution. - */ - $user = new Document($this->args['user'] ?? []); - $project = new Document($this->args['project'] ?? []); - $execution = new Document($this->args['execution'] ?? []); - $function = new Document($this->args['function'] ?? []); - - switch ($type) { - case 'http': - $jwt = $this->args['jwt'] ?? ''; - $data = $this->args['data'] ?? ''; - - $function = $database->getDocument('functions', $execution->getAttribute('functionId')); - - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - executionId: $execution->getId(), - trigger: 'http', - data: $data, - user: $user, - jwt: $jwt - ); - - break; - - case 'schedule': - $functionOriginal = $function; - /* - * 1. Get Original Task - * 2. Check for updates - * If has updates skip task and don't reschedule - * If status not equal to play skip task - * 3. Check next run date, update task and add new job at the given date - * 4. Execute task (set optional timeout) - * 5. Update task response to log - * On success reset error count - * On failure add error count - * If error count bigger than allowed change status to pause - */ - - // Reschedule - $function = $database->getDocument('functions', $function->getId()); - - if (empty($function->getId())) { - throw new Exception('Function not found (' . $function->getId() . ')'); - } - - if ($functionOriginal->getAttribute('schedule') !== $function->getAttribute('schedule')) { // Schedule has changed from previous run, ignore this run. - return; - } - - if ($functionOriginal->getAttribute('scheduleUpdatedAt') !== $function->getAttribute('scheduleUpdatedAt')) { // Double execution due to rapid cron changes, ignore this run. - return; - } - - $cron = new CronExpression($function->getAttribute('schedule')); - $next = DateTime::format($cron->getNextRunDate()); - - $function = $function - ->setAttribute('scheduleNext', $next) - ->setAttribute('schedulePrevious', DateTime::now()); - - $function = $database->updateDocument( - 'functions', - $function->getId(), - $function - ); - - $reschedule = new Func(); - $reschedule - ->setFunction($function) - ->setType('schedule') - ->setUser($user) - ->setProject($project) - ->schedule(new \DateTime($next)); - ; - - $this->execute( - project: $project, - function: $function, - dbForProject: $database, - trigger: 'schedule' - ); - - break; - } - } - - private function execute( +Server::setResource('execute', function () { + return function ( + Func $queueForFunctions, + Database $dbForProject, + Client $statsd, Document $project, Document $function, - Database $dbForProject, string $trigger, - string $executionId = null, - string $event = null, - string $eventData = null, string $data = null, ?Document $user = null, - string $jwt = null + string $jwt = null, + string $event = null, + string $eventData = null, + string $executionId = null, ) { - $user ??= new Document(); $functionId = $function->getId(); $deploymentId = $function->getAttribute('deployment', ''); @@ -212,28 +48,28 @@ class FunctionsV1 extends Worker $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->getAttribute('resourceId') !== $functionId) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } if ($deployment->isEmpty()) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function', 404); + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } /** Check if build has exists */ $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); if ($build->isEmpty()) { - throw new Exception('Build not found', 404); + throw new Exception('Build not found'); } if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready', 400); + throw new Exception('Build not ready'); } /** Check if runtime is supported */ $runtimes = Config::getParam('runtimes', []); if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported', 400); + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } $runtime = $runtimes[$function->getAttribute('runtime')]; @@ -256,14 +92,17 @@ class FunctionsV1 extends Worker 'search' => implode(' ', [$functionId, $executionId]), ])); + // TODO: @Meldiron Trigger executions.create event here + if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } } + $execution->setAttribute('status', 'processing'); $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $vars = array_reduce($function->getAttribute('vars', []), function (array $carry, Document $var) { $carry[$var->getAttribute('key')] = $var->getAttribute('value'); return $carry; }, []); @@ -286,7 +125,8 @@ class FunctionsV1 extends Worker /** Execute function */ try { - $executionResponse = $this->executor->createExecution( + $client = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + $executionResponse = $client->createExecution( projectId: $project->getId(), deploymentId: $deploymentId, payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '', @@ -330,9 +170,8 @@ class FunctionsV1 extends Worker ->trigger(); /** Trigger Functions */ - $executionUpdate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $queueForFunctions + ->from($executionUpdate) ->trigger(); /** Trigger realtime event */ @@ -361,12 +200,11 @@ class FunctionsV1 extends Worker ); /** Update usage stats */ - global $register; if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { - $statsd = $register->get('statsd'); $usage = new Stats($statsd); $usage ->setParam('projectId', $project->getId()) + ->setParam('projectInternalId', $project->getInternalId()) ->setParam('functionId', $function->getId()) ->setParam('executions.{scope}.compute', 1) ->setParam('executionStatus', $execution->getAttribute('status', '')) @@ -375,9 +213,118 @@ class FunctionsV1 extends Worker ->setParam('networkResponseSize', 0) ->submit(); } - } + }; +}); - public function shutdown(): void - { - } -} +$server->job() + ->inject('message') + ->inject('dbForProject') + ->inject('queueForFunctions') + ->inject('statsd') + ->inject('execute') + ->action(function (Message $message, Database $dbForProject, Func $queueForFunctions, Client $statsd, callable $execute) { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + $type = $payload['type'] ?? ''; + $events = $payload['events'] ?? []; + $data = $payload['data'] ?? ''; + $eventData = $payload['payload'] ?? ''; + $project = new Document($payload['project'] ?? []); + $function = new Document($payload['function'] ?? []); + $user = new Document($payload['user'] ?? []); + + if ($project->getId() === 'console') { + return; + } + + if (!empty($events)) { + $limit = 30; + $sum = 30; + $offset = 0; + $functions = []; + /** @var Document[] $functions */ + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('name'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + Console::log('Fetched ' . $sum . ' functions...'); + + foreach ($functions as $function) { + if (!array_intersect($events, $function->getAttribute('events', []))) { + continue; + } + Console::success('Iterating function: ' . $function->getAttribute('name')); + $execute( + statsd: $statsd, + dbForProject: $dbForProject, + project: $project, + function: $function, + queueForFunctions: $queueForFunctions, + trigger: 'event', + event: $events[0], + eventData: $eventData, + user: $user, + data: null, + executionId: null, + jwt: null + ); + Console::success('Triggered function: ' . $events[0]); + } + } + return; + } + + /** + * Handle Schedule and HTTP execution. + */ + switch ($type) { + case 'http': + $jwt = $payload['jwt'] ?? ''; + $execution = new Document($payload['execution'] ?? []); + $user = new Document($payload['user'] ?? []); + $execute( + project: $project, + function: $function, + dbForProject: $dbForProject, + queueForFunctions: $queueForFunctions, + trigger: 'http', + executionId: $execution->getId(), + event: null, + eventData: null, + data: $data, + user: $user, + jwt: $jwt, + statsd: $statsd, + ); + break; + case 'schedule': + $execute( + project: $project, + function: $function, + dbForProject: $dbForProject, + queueForFunctions: $queueForFunctions, + trigger: 'schedule', + executionId: null, + event: null, + eventData: null, + data: null, + user: null, + jwt: null, + statsd: $statsd, + ); + break; + } + }); + +$server->workerStart(); +$server->start(); diff --git a/bin/schedule b/bin/schedule index dbc6d94d96..ddd1ea7f35 100644 --- a/bin/schedule +++ b/bin/schedule @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=1 REDIS_BACKEND=$REDIS_BACKEND RESQUE_PHP='/usr/src/code/vendor/autoload.php' php /usr/src/code/vendor/bin/resque-scheduler +php /usr/src/code/app/cli.php schedule $@ \ No newline at end of file diff --git a/bin/worker-functions b/bin/worker-functions index 5e9728c46e..687f9fd0cd 100644 --- a/bin/worker-functions +++ b/bin/worker-functions @@ -1,10 +1,3 @@ #!/bin/sh -if [ -z "$_APP_REDIS_USER" ] && [ -z "$_APP_REDIS_PASS" ] -then - REDIS_BACKEND="${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -else - REDIS_BACKEND="redis://${_APP_REDIS_USER}:${_APP_REDIS_PASS}@${_APP_REDIS_HOST}:${_APP_REDIS_PORT}" -fi - -INTERVAL=0.1 QUEUE='v1-functions' APP_INCLUDE='/usr/src/code/app/workers/functions.php' php /usr/src/code/vendor/bin/resque -dopcache.preload=opcache.preload=/usr/src/code/app/preload.php \ No newline at end of file +QUEUE=v1-functions php /usr/src/code/app/workers/functions.php $@ \ No newline at end of file diff --git a/composer.json b/composer.json index 3dd374b8e1..ffd9c6b6b9 100644 --- a/composer.json +++ b/composer.json @@ -53,13 +53,14 @@ "utopia-php/domains": "1.1.*", "utopia-php/framework": "0.25.*", "utopia-php/image": "0.5.*", + "utopia-php/queue": "0.4.*", "utopia-php/locale": "0.4.*", "utopia-php/logger": "0.3.*", "utopia-php/orchestration": "0.9.*", "utopia-php/platform": "0.3.*", "utopia-php/pools": "0.4.*", "utopia-php/preloader": "0.2.*", - "utopia-php/registry": "0.5.0", + "utopia-php/registry": "0.5.*", "utopia-php/storage": "0.11.*", "utopia-php/swoole": "0.5.*", "utopia-php/websocket": "0.1.0", diff --git a/composer.lock b/composer.lock index 2e39916a0c..517065af56 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "bf3e2ed6ee8e49ab74af97b368b89a63", + "content-hash": "a673091aa6bd8ef01380b63245427c93", "packages": [ { "name": "adhocore/jwt", @@ -2357,6 +2357,67 @@ }, "time": "2020-10-24T07:04:59+00:00" }, + { + "name": "utopia-php/queue", + "version": "0.4.1", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/queue.git", + "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/0b69ede484a04c567cbb202f592d8e5e3cd2433e", + "reference": "0b69ede484a04c567cbb202f592d8e5e3cd2433e", + "shasum": "" + }, + "require": { + "php": ">=8.0", + "utopia-php/cli": "0.14.*", + "utopia-php/framework": "0.*.*" + }, + "require-dev": { + "laravel/pint": "^0.2.3", + "phpstan/phpstan": "^1.8", + "phpunit/phpunit": "^9.5.5", + "swoole/ide-helper": "4.8.8", + "workerman/workerman": "^4.0" + }, + "suggest": { + "ext-swoole": "Needed to support Swoole.", + "workerman/workerman": "Needed to support Workerman." + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Queue\\": "src/Queue" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Torsten Dittmann", + "email": "torsten@appwrite.io" + } + ], + "description": "A powerful task queue.", + "keywords": [ + "Tasks", + "framework", + "php", + "queue", + "upf", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/queue/issues", + "source": "https://github.com/utopia-php/queue/tree/0.4.1" + }, + "time": "2022-11-15T16:56:37+00:00" + }, { "name": "utopia-php/registry", "version": "0.5.0", @@ -5241,5 +5302,5 @@ "platform-overrides": { "php": "8.0" }, - "plugin-api-version": "2.3.0" + "plugin-api-version": "2.1.0" } diff --git a/docker-compose.yml b/docker-compose.yml index a924e6d84a..d7fe33d536 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -186,6 +186,7 @@ services: - _APP_MAINTENANCE_RETENTION_AUDIT - _APP_SMS_PROVIDER - _APP_SMS_FROM + - _APP_REGION appwrite-realtime: entrypoint: realtime @@ -257,6 +258,7 @@ services: - mariadb environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -290,6 +292,7 @@ services: - request-catcher environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_SYSTEM_SECURITY_EMAIL_ADDRESS - _APP_REDIS_HOST @@ -320,6 +323,7 @@ services: - ./src:/usr/src/code/src environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -375,6 +379,7 @@ services: - mariadb environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -407,6 +412,7 @@ services: - mariadb environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_EXECUTOR_SECRET - _APP_EXECUTOR_HOST @@ -443,6 +449,7 @@ services: - ./src:/usr/src/code/src environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DOMAIN - _APP_DOMAIN_TARGET @@ -479,6 +486,7 @@ services: - openruntimes-executor environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -516,6 +524,7 @@ services: # - smtp environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_SYSTEM_EMAIL_NAME - _APP_SYSTEM_EMAIL_ADDRESS @@ -546,6 +555,7 @@ services: - redis environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_REDIS_HOST - _APP_REDIS_PORT - _APP_REDIS_USER @@ -570,6 +580,7 @@ services: - redis environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_DOMAIN - _APP_DOMAIN_TARGET - _APP_OPENSSL_KEY_V1 @@ -622,6 +633,7 @@ services: - mariadb environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -660,6 +672,7 @@ services: - mariadb environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_OPENSSL_KEY_V1 - _APP_DB_HOST - _APP_DB_PORT @@ -691,14 +704,25 @@ services: - ./app:/usr/src/code/app - ./src:/usr/src/code/src depends_on: + - mariadb - redis environment: - _APP_ENV + - _APP_WORKER_PER_CORE - _APP_REDIS_HOST - _APP_REDIS_PORT - _APP_REDIS_USER - _APP_REDIS_PASS + - _APP_DB_HOST + - _APP_DB_PORT + - _APP_DB_SCHEMA + - _APP_DB_USER + - _APP_DB_PASS + - _APP_CONNECTIONS_DB_PROJECT + - _APP_CONNECTIONS_DB_CONSOLE + - _APP_CONNECTIONS_CACHE - _APP_CONNECTIONS_QUEUE + - _APP_REGION openruntimes-executor: container_name: openruntimes-executor @@ -728,7 +752,7 @@ services: mariadb: image: mariadb:10.7 # fix issues when upgrading using: mysql_upgrade -u root -p - container_name: mariadb + container_name: appwrite-mariadb <<: *x-logging networks: - appwrite @@ -753,7 +777,7 @@ services: # - RELAY_FROM_HOSTS=192.168.0.0/16 ; *.yourdomain.com # - SMARTHOST_HOST=smtp # - SMARTHOST_PORT=587 - + redis: image: redis:7.0.4-alpine <<: *x-logging diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 222cf59444..6e3401e11b 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -337,8 +337,6 @@ class Event default => false }; - - return [ 'type' => $type, 'resource' => $resource, diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index b7531cf475..22940ad08e 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -6,6 +6,8 @@ use DateTime; use Resque; use ResqueScheduler; use Utopia\Database\Document; +use Utopia\Queue\Client; +use Utopia\Queue\Connection; class Func extends Event { @@ -15,7 +17,7 @@ class Func extends Event protected ?Document $function = null; protected ?Document $execution = null; - public function __construct() + public function __construct(protected Connection $connection) { parent::__construct(Event::FUNCTIONS_QUEUE_NAME, Event::FUNCTIONS_CLASS_NAME); } @@ -143,7 +145,11 @@ class Func extends Event */ public function trigger(): string|bool { - return Resque::enqueue($this->queue, $this->class, [ + $client = new Client($this->queue, $this->connection); + + $events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null; + + return $client->enqueue([ 'project' => $this->project, 'user' => $this->user, 'function' => $this->function, @@ -151,28 +157,26 @@ class Func extends Event 'type' => $this->type, 'jwt' => $this->jwt, 'payload' => $this->payload, - 'data' => $this->data + 'events' => $events, + 'data' => $this->data, ]); } /** - * Schedules the function event and schedules it in the functions worker queue. + * Generate a function event from a base event + * + * @param Event $event + * + * @return self * - * @param \DateTime|int $at - * @return void - * @throws \Resque_Exception - * @throws \ResqueScheduler_InvalidTimestampException */ - public function schedule(DateTime|int $at): void + public function from(Event $event): self { - ResqueScheduler::enqueueAt($at, $this->queue, $this->class, [ - 'project' => $this->project, - 'user' => $this->user, - 'function' => $this->function, - 'execution' => $this->execution, - 'type' => $this->type, - 'payload' => $this->payload, - 'data' => $this->data - ]); + $this->project = $event->getProject(); + $this->user = $event->getUser(); + $this->payload = $event->getPayload(); + $this->event = $event->getEvent(); + $this->params = $event->getParams(); + return $this; } } diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 7f6a062ed4..2968a66b95 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -7,6 +7,7 @@ use Appwrite\Platform\Tasks\Doctor; use Appwrite\Platform\Tasks\Install; use Appwrite\Platform\Tasks\Maintenance; use Appwrite\Platform\Tasks\Migrate; +use Appwrite\Platform\Tasks\Schedule; use Appwrite\Platform\Tasks\SDKs; use Appwrite\Platform\Tasks\Specs; use Appwrite\Platform\Tasks\SSL; @@ -28,6 +29,7 @@ class Tasks extends Service ->addAction(Doctor::getName(), new Doctor()) ->addAction(Install::getName(), new Install()) ->addAction(Maintenance::getName(), new Maintenance()) + ->addAction(Schedule::getName(), new Schedule()) ->addAction(Migrate::getName(), new Migrate()) ->addAction(SDKs::getName(), new SDKs()) ->addAction(VolumeSync::getName(), new VolumeSync()) diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index fe659c8746..307f502611 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -139,6 +139,8 @@ class Maintenance extends Action notifyDeleteExpiredSessions(); renewCertificates($dbForConsole); notifyDeleteCache($cacheRetention); + + // TODO: @Meldiron Every probably 24h, look for schedules with active=false, that doesnt have function anymore. Dlete such schedule }, $interval); } } diff --git a/src/Appwrite/Platform/Tasks/Schedule.php b/src/Appwrite/Platform/Tasks/Schedule.php new file mode 100644 index 0000000000..1d89d85a04 --- /dev/null +++ b/src/Appwrite/Platform/Tasks/Schedule.php @@ -0,0 +1,237 @@ +desc('Execute functions scheduled in Appwrite') + ->inject('pools') + ->inject('dbForConsole') + ->inject('getProjectDB') + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); + } + + /** + * 1. Load all documents from 'schedules' collection to create local copy + * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute + * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutime sleeps until exact time before sending request to worker. + */ + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void + { + Console::title('Scheduler V1'); + Console::success(APP_NAME . ' Scheduler v1 has started'); + + /** + * Extract only nessessary attributes to lower memory used. + * + * @var Document $schedule + * @return array + */ + $getSchedule = function (Document $schedule) use ($dbForConsole, $getProjectDB): array { + $project = $dbForConsole->getDocument('projects', $schedule->getAttribute('projectId')); + + $function = $getProjectDB($project)->getDocument('functions', $schedule->getAttribute('resourceId')); + + return [ + 'resourceId' => $schedule->getAttribute('resourceId'), + 'schedule' => $schedule->getAttribute('schedule'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + 'function' => $function, // TODO: @Meldiron Send only ID to worker to reduce memory usage here + ]; + }; + + $schedules = []; // Local copy of 'schedules' collection + $lastSyncUpdate = DateTime::now(); + + $limit = 10000; + $sum = $limit; + $total = 0; + $loadStart = \microtime(true); + $latestDocument = null; + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::equal('active', [true]), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $schedules[$document['resourceId']] = $getSchedule($document); + } + + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $pools->reclaim(); + + Console::success("{$total} functions were loaded in " . (microtime(true) - $loadStart) . " seconds"); + + Console::success("Starting timers at " . DateTime::now()); + + run( + function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + /** + * The timer synchronize $schedules copy with database collection. + */ + Timer::tick(self::FUNCTION_UPDATE_TIMER * 1000, function () use ($dbForConsole, &$schedules, &$lastSyncUpdate, $getSchedule, $pools) { + $time = DateTime::now(); + $timerStart = \microtime(true); + + $limit = 1000; + $sum = $limit; + $total = 0; + $latestDocument = null; + + Console::log("Sync tick: Running at $time"); + + while ($sum === $limit) { + $paginationQueries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $paginationQueries[] = Query::cursorAfter($latestDocument); + } + $results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [ + Query::equal('region', [App::getEnv('_APP_REGION')]), + Query::equal('resourceType', ['function']), + Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate), + ])); + + $sum = count($results); + $total = $total + $sum; + foreach ($results as $document) { + $localDocument = $schedules[$document['resourceId']] ?? null; + + $org = $localDocument !== null ? strtotime($localDocument['resourceUpdatedAt']) : null; + $new = strtotime($document['resourceUpdatedAt']); + + if ($document['active'] === false) { + Console::info("Removing: {$document['resourceId']}"); + unset($schedules[$document['resourceId']]); + } elseif ($new !== $org) { + Console::info("Updating: {$document['resourceId']}"); + $schedules[$document['resourceId']] = $getSchedule($document); + } + } + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + + $lastSyncUpdate = $time; + $timerEnd = \microtime(true); + + $pools->reclaim(); + + Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); + }); + + /** + * The timer to prepare soon-to-execute schedules. + */ + $lastEnqueueUpdate = null; + $enqueueFunctions = function () use (&$schedules, $lastEnqueueUpdate, $pools) { + $timerStart = \microtime(true); + $time = DateTime::now(); + + $enqueueDiff = $lastEnqueueUpdate === null ? 0 : $timerStart - $lastEnqueueUpdate; + $timeFrame = DateTime::addSeconds(new \DateTime(), self::FUNCTION_ENQUEUE_TIMER - $enqueueDiff); + + Console::log("Enqueue tick: started at: $time (with diff $enqueueDiff)"); + + $total = 0; + + $delayedExecutions = []; // Group executions with same delay to share one coroutine + + foreach ($schedules as $key => $schedule) { + $cron = new CronExpression($schedule['schedule']); + $nextDate = $cron->getNextRunDate(); + $next = DateTime::format($nextDate); + + $currentTick = $next < $timeFrame; + + if (!$currentTick) { + continue; + } + + $total++; + + $promiseStart = \time(); // in seconds + $executionStart = $nextDate->getTimestamp(); // in seconds + $delay = $executionStart - $promiseStart; // Time to wait from now until execution needs to be queued + + if (!isset($delayedExecutions[$delay])) { + $delayedExecutions[$delay] = []; + } + + $delayedExecutions[$delay][] = $key; + } + + foreach ($delayedExecutions as $delay => $scheduleKeys) { + \go(function () use ($delay, $schedules, $scheduleKeys, $pools) { + \sleep($delay); // in seconds + + $queue = $pools->get('queue')->pop(); + $connection = $queue->getResource(); + + foreach ($scheduleKeys as $scheduleKey) { + // Ensure schedule was not deleted + if (!isset($schedules[$scheduleKey])) { + return; + } + + $schedule = $schedules[$scheduleKey]; + + $functions = new Func($connection); + + $functions + ->setType('schedule') + ->setFunction($schedule['function']) + ->setProject($schedule['project']) + ->trigger(); + } + + $queue->reclaim(); + }); + } + + $timerEnd = \microtime(true); + $lastEnqueueUpdate = $timerStart; + Console::log("Enqueue tick: {$total} executions were enqueued in " . ($timerEnd - $timerStart) . " seconds"); + }; + + Timer::tick(self::FUNCTION_ENQUEUE_TIMER * 1000, fn() => $enqueueFunctions()); + $enqueueFunctions(); + } + ); + } +} diff --git a/src/Appwrite/Utopia/Response/Model/Func.php b/src/Appwrite/Utopia/Response/Model/Func.php index c7e69fff88..540b143876 100644 --- a/src/Appwrite/Utopia/Response/Model/Func.php +++ b/src/Appwrite/Utopia/Response/Model/Func.php @@ -81,18 +81,6 @@ class Func extends Model 'default' => '', 'example' => '5 4 * * *', ]) - ->addRule('scheduleNext', [ - 'type' => self::TYPE_DATETIME, - 'description' => 'Function\'s next scheduled execution time in ISO 8601 format.', - 'default' => '', - 'example' => self::TYPE_DATETIME_EXAMPLE, - ]) - ->addRule('schedulePrevious', [ - 'type' => self::TYPE_DATETIME, - 'description' => 'Function\'s previous scheduled execution time in ISO 8601 format.', - 'default' => '', - 'example' => self::TYPE_DATETIME_EXAMPLE, - ]) ->addRule('timeout', [ 'type' => self::TYPE_INTEGER, 'description' => 'Function execution timeout in seconds.',