From 4a92db4dc393fa3e382c308f7f4ff5857d5565aa Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Wed, 16 Nov 2022 09:47:46 +0530 Subject: [PATCH] feat: check async execution --- app/controllers/api/functions.php | 8 +- app/worker.php | 33 ++-- app/workers/functions.php | 99 ++++++------ composer.lock | 2 +- temp/appwrite.json | 21 +++ temp/functions/My Awesome Function/.gitignore | 149 ++++++++++++++++++ temp/functions/My Awesome Function/README.md | 47 ++++++ .../My Awesome Function/package.json | 15 ++ .../My Awesome Function/src/index.js | 46 ++++++ 9 files changed, 352 insertions(+), 68 deletions(-) create mode 100644 temp/appwrite.json create mode 100644 temp/functions/My Awesome Function/.gitignore create mode 100644 temp/functions/My Awesome Function/README.md create mode 100644 temp/functions/My Awesome Function/package.json create mode 100644 temp/functions/My Awesome Function/src/index.js diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 7204271d0f..04da1234a1 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -1217,10 +1217,10 @@ App::post('/v1/functions/:functionId/executions') // TODO revise this later using route label $usage - ->setParam('functionId', $function->getId()) - ->setParam('executions.{scope}.compute', 1) - ->setParam('executionStatus', $execution->getAttribute('status', '')) - ->setParam('executionTime', $execution->getAttribute('duration')); // ms + ->setParam('functionId', $function->getId()) + ->setParam('executions.{scope}.compute', 1) + ->setParam('executionStatus', $execution->getAttribute('status', '')) + ->setParam('executionTime', $execution->getAttribute('duration')); // ms $roles = Authorization::getRoles(); $isPrivilegedUser = Auth::isPrivilegedUser($roles); diff --git a/app/worker.php b/app/worker.php index 564a577c71..110a9d81da 100644 --- a/app/worker.php +++ b/app/worker.php @@ -20,37 +20,37 @@ Server::setResource('register', fn() => $register); Server::setResource('dbForConsole', function (Cache $cache, Registry $register) { $pools = $register->get('pools'); - $dbAdapter = $pools + $database = $pools ->get('console') ->pop() ->getResource() ; - $database = new Database($dbAdapter, $cache); - $database->setNamespace('console'); + $adapter = new Database($database, $cache); + $adapter->setNamespace('console'); - return $database; + return $adapter; }, ['cache', 'register']); Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) { - $args = $message->getPayload() ?? []; - $project = new Document($args['project'] ?? []); + $payload = $message->getPayload() ?? []; + $project = new Document($payload['project'] ?? []); if ($project->isEmpty() || $project->getId() === 'console') { return $dbForConsole; } $pools = $register->get('pools'); - $dbAdapter = $pools + $database = $pools ->get($project->getAttribute('database')) ->pop() ->getResource() ; - $database = new Database($dbAdapter, $cache); - $database->setNamespace('_' . $project->getInternalId()); + $adapter = new Database($database, $cache); + $adapter->setNamespace('_' . $project->getInternalId()); - return $database; + return $adapter; }, ['cache', 'register', 'message', 'dbForConsole']); Server::setResource('cache', function (Registry $register) { @@ -71,19 +71,26 @@ Server::setResource('cache', function (Registry $register) { Server::setResource('functions', function (Registry $register) { $pools = $register->get('pools'); - return new Func($pools->get('queue')->pop()->getResource()); + return new Func( + $pools + ->get('queue') + ->pop() + ->getResource() + ); }, ['register']); Server::setResource('logger', function ($register) { return $register->get('logger'); }, ['register']); +Server::setResource('statsd', function ($register) { + return $register->get('statsd'); +}, ['register']); $pools = $register->get('pools'); $connection = $pools->get('queue')->pop()->getResource(); - $workerNumber = swoole_cpu_num() * intval(App::getEnv('_APP_WORKER_PER_CORE', 6)); $workerNumber = 1; -Runtime::enableCoroutine(SWOOLE_HOOK_ALL); +Runtime::enableCoroutine(SWOOLE_HOOK_ALL); \ No newline at end of file diff --git a/app/workers/functions.php b/app/workers/functions.php index e0c9e64961..02f3932c12 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -2,13 +2,13 @@ require_once __DIR__ . '/../worker.php'; -use Utopia\Queue; use Utopia\Queue\Message; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Usage\Stats; use Appwrite\Utopia\Response\Model\Execution; +use Domnikl\Statsd\Client; use Executor\Executor; use Utopia\App; use Utopia\CLI\Console; @@ -21,14 +21,16 @@ use Utopia\Database\Query; use Utopia\Database\Role; use Utopia\Database\Validator\Authorization; use Utopia\Logger\Log; +use Utopia\Queue\Adapter\Swoole; +use Utopia\Queue\Server; Authorization::disable(); Authorization::setDefaultStatus(false); global $connection; global $workerNumber; - -$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); +$adapter = new Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); +$server = new Server($adapter); $execute = function ( Document $project, @@ -41,12 +43,14 @@ $execute = function ( string $eventData = null, string $data = null, ?Document $user = null, - string $jwt = null -) use ($executor, $register) { + string $jwt = null, + Client $statsd +) { + $user ??= new Document(); $functionId = $function->getId(); $deploymentId = $function->getAttribute('deployment', ''); - + var_dump("Deployment ID : ", $deploymentId); /** Check if deployment exists */ $deployment = $dbForProject->getDocument('deployments', $deploymentId); @@ -163,6 +167,7 @@ $execute = function ( ]); /** Execute function */ + $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); try { $executionResponse = $executor->createExecution( projectId: $project->getId(), @@ -223,7 +228,7 @@ $execute = function ( 'executionId' => $execution->getId() ]); $target = Realtime::fromPayload( - // Pass first, most verbose event pattern + // Pass first, most verbose event pattern event: $allEvents[0], payload: $execution ); @@ -243,12 +248,11 @@ $execute = function ( ); /** Update usage stats */ - global $register; if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { - $statsd = $register->get('statsd'); $usage = new Stats($statsd); $usage ->setParam('projectId', $project->getId()) + ->setParam('projectInternalId', $project->getInternalId()) ->setParam('functionId', $function->getId()) ->setParam('executions.{scope}.compute', 1) ->setParam('executionStatus', $execution->getAttribute('status', '')) @@ -259,21 +263,25 @@ $execute = function ( } }; -$adapter = new Queue\Adapter\Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); -$server = new Queue\Server($adapter); - $server->job() ->inject('message') ->inject('dbForProject') ->inject('functions') - ->action(function (Message $message, Database $dbForProject, Func $functions) use ($execute) { - $args = $message->getPayload() ?? []; - $type = $args['type'] ?? ''; - $events = $args['events'] ?? []; - $project = new Document($args['project'] ?? []); - $user = new Document($args['user'] ?? []); - // Where $payload comes from - $payload = json_encode($args['payload'] ?? []); + ->inject('statsd') + ->action(function (Message $message, Database $dbForProject, Func $functions, Client $statsd) use ($execute) { + $payload = $message->getPayload() ?? []; + + if (empty($payload)) { + throw new Exception('Missing payload'); + } + + var_dump(json_encode($payload)); + + $type = $payload['type'] ?? ''; + $events = $payload['events'] ?? []; + $project = new Document($payload['project'] ?? []); + $data = $payload['data'] ?? ''; + $user = new Document($payload['user'] ?? []); if ($project->getId() === 'console') { return; @@ -284,38 +292,31 @@ $server->job() */ if (!empty($events)) { $limit = 30; - $sum = $limit; - $total = 0; - $latestDocument = null; + $sum = 30; + $offset = 0; + $functions = []; + /** @var Document[] $functions */ - while ($sum === $limit) { - $paginationQueries = [Query::limit($limit)]; - if ($latestDocument !== null) { - $paginationQueries[] = Query::cursorAfter($latestDocument); - } - $results = $dbForProject->find('functions', \array_merge($paginationQueries, [ - Query::orderAsc('name') - ])); + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('name'), + ]); - $sum = count($results); - $total = $total + $sum; + $sum = \count($functions); + $offset = $offset + $limit; Console::log('Fetched ' . $sum . ' functions...'); - foreach ($results as $function) { + foreach ($functions as $function) { if (!array_intersect($events, $function->getAttribute('events', []))) { continue; } - Console::success('Iterating function: ' . $function->getAttribute('name')); - - // As event, pass first, most verbose event pattern - call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null); - + call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null, $statsd); Console::success('Triggered function: ' . $events[0]); } - - $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; } return; @@ -324,20 +325,18 @@ $server->job() /** * Handle Schedule and HTTP execution. */ - $project = new Document($args['project'] ?? []); - $function = new Document($args['function'] ?? []); + $function = new Document($payload['function'] ?? []); + var_dump($function); switch ($type) { case 'http': - $jwt = $args['jwt'] ?? ''; - $data = $args['data'] ?? ''; - $execution = new Document($args['execution'] ?? []); - $user = new Document($args['user'] ?? []); - // $function = $dbForProject->getDocument('functions', $execution->getAttribute('functionId')); - call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt); + $jwt = $payload['jwt'] ?? ''; + $execution = new Document($payload['execution'] ?? []); + $user = new Document($payload['user'] ?? []); + call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt, $statsd); break; case 'schedule': - call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null); + call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null, $statsd); break; } }); diff --git a/composer.lock b/composer.lock index 674e42a5ca..d41a4a1cf9 100644 --- a/composer.lock +++ b/composer.lock @@ -5311,5 +5311,5 @@ "platform-overrides": { "php": "8.0" }, - "plugin-api-version": "2.1.0" + "plugin-api-version": "2.3.0" } diff --git a/temp/appwrite.json b/temp/appwrite.json new file mode 100644 index 0000000000..643a821415 --- /dev/null +++ b/temp/appwrite.json @@ -0,0 +1,21 @@ +{ + "projectId": "6374484424b42c83155c", + "projectName": "Default", + "functions": [ + { + "$id": "637449ac7066fa0d8cde", + "name": "My Awesome Function", + "runtime": "node-14.5", + "path": "functions/My Awesome Function", + "entrypoint": "src/index.js", + "ignore": [ + "node_modules", + ".npm" + ], + "execute": [], + "events": [], + "schedule": "", + "timeout": 15 + } + ] +} \ No newline at end of file diff --git a/temp/functions/My Awesome Function/.gitignore b/temp/functions/My Awesome Function/.gitignore new file mode 100644 index 0000000000..2551987d6d --- /dev/null +++ b/temp/functions/My Awesome Function/.gitignore @@ -0,0 +1,149 @@ + +# Created by https://www.toptal.com/developers/gitignore/api/node +# Edit at https://www.toptal.com/developers/gitignore?templates=node + +### Node ### +# Logs +logs +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Diagnostic reports (https://nodejs.org/api/report.html) +report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json + +# Runtime data +pids +*.pid +*.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover +lib-cov + +# Coverage directory used by tools like istanbul +coverage +*.lcov + +# nyc test coverage +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) +.grunt + +# Bower dependency directory (https://bower.io/) +bower_components + +# node-waf configuration +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) +build/Release + +# Dependency directories +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) +web_modules/ + +# TypeScript cache +*.tsbuildinfo + +# Optional npm cache directory +.npm + +# Optional eslint cache +.eslintcache + +# Optional stylelint cache +.stylelintcache + +# Microbundle cache +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history +.node_repl_history + +# Output of 'npm pack' +*.tgz + +# Yarn Integrity file +.yarn-integrity + +# dotenv environment variable files +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) +.cache +.parcel-cache + +# Next.js build output +.next +out + +# Nuxt.js build / generate output +.nuxt +dist + +# Gatsby files +.cache/ +# Comment in the public line in if your project uses Gatsby and not Next.js +# https://nextjs.org/blog/next-9-1#public-directory-support +# public + +# vuepress build output +.vuepress/dist + +# vuepress v2.x temp and cache directory +.temp + +# Docusaurus cache and generated files +.docusaurus + +# Serverless directories +.serverless/ + +# FuseBox cache +.fusebox/ + +# DynamoDB Local files +.dynamodb/ + +# TernJS port file +.tern-port + +# Stores VSCode versions used for testing VSCode extensions +.vscode-test + +# yarn v2 +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +### Node Patch ### +# Serverless Webpack directories +.webpack/ + +# Optional stylelint cache + +# SvelteKit build / generate output +.svelte-kit + +# End of https://www.toptal.com/developers/gitignore/api/node + +# OS +## Mac +.DS_Store diff --git a/temp/functions/My Awesome Function/README.md b/temp/functions/My Awesome Function/README.md new file mode 100644 index 0000000000..0eb0d2ad04 --- /dev/null +++ b/temp/functions/My Awesome Function/README.md @@ -0,0 +1,47 @@ +# My Awesome Function + +Welcome to the documentation of this function 👋 We strongly recommend keeping this file in sync with your function's logic to make sure anyone can easily understand your function in the future. If you don't need documentation, you can remove this file. + +## 🤖 Documentation + +Simple function similar to typical "hello world" example, but instead, we return a simple JSON that tells everyone how awesome developers are. + + + +_Example input:_ + +This function expects no input + + + +_Example output:_ + + + +```json +{ + "areDevelopersAwesome": true +} +``` + +## 📝 Environment Variables + +List of environment variables used by this cloud function: + +- **APPWRITE_FUNCTION_ENDPOINT** - Endpoint of Appwrite project +- **APPWRITE_FUNCTION_API_KEY** - Appwrite API Key + + +## 🚀 Deployment + +There are two ways of deploying the Appwrite function, both having the same results, but each using a different process. We highly recommend using CLI deployment to achieve the best experience. + +### Using CLI + +Make sure you have [Appwrite CLI](https://appwrite.io/docs/command-line#installation) installed, and you have successfully logged into your Appwrite server. To make sure Appwrite CLI is ready, you can use the command `appwrite client --debug` and it should respond with green text `✓ Success`. + +Make sure you are in the same folder as your `appwrite.json` file and run `appwrite deploy function` to deploy your function. You will be prompted to select which functions you want to deploy. + +### Manual using tar.gz + +Manual deployment has no requirements and uses Appwrite Console to deploy the tag. First, enter the folder of your function. Then, create a tarball of the whole folder and gzip it. After creating `.tar.gz` file, visit Appwrite Console, click on the `Deploy Tag` button and switch to the `Manual` tab. There, set the `entrypoint` to `src/index.js`, and upload the file we just generated. diff --git a/temp/functions/My Awesome Function/package.json b/temp/functions/My Awesome Function/package.json new file mode 100644 index 0000000000..fa252199b9 --- /dev/null +++ b/temp/functions/My Awesome Function/package.json @@ -0,0 +1,15 @@ +{ + "name": "appwrite-function", + "version": "1.0.0", + "description": "", + "main": "src/index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "node-appwrite": "^8.0.0" + } +} diff --git a/temp/functions/My Awesome Function/src/index.js b/temp/functions/My Awesome Function/src/index.js new file mode 100644 index 0000000000..4ed0028cac --- /dev/null +++ b/temp/functions/My Awesome Function/src/index.js @@ -0,0 +1,46 @@ +const sdk = require("node-appwrite"); + +/* + 'req' variable has: + 'headers' - object with request headers + 'payload' - request body data as a string + 'variables' - object with function variables + + 'res' variable has: + 'send(text, status)' - function to return text response. Status code defaults to 200 + 'json(obj, status)' - function to return JSON response. Status code defaults to 200 + + If an error is thrown, a response with code 500 will be returned. +*/ + +module.exports = async function (req, res) { + const client = new sdk.Client(); + + // You can remove services you don't use + const account = new sdk.Account(client); + const avatars = new sdk.Avatars(client); + const database = new sdk.Databases(client); + const functions = new sdk.Functions(client); + const health = new sdk.Health(client); + const locale = new sdk.Locale(client); + const storage = new sdk.Storage(client); + const teams = new sdk.Teams(client); + const users = new sdk.Users(client); + + if ( + !req.variables['APPWRITE_FUNCTION_ENDPOINT'] || + !req.variables['APPWRITE_FUNCTION_API_KEY'] + ) { + console.warn("Environment variables are not set. Function cannot use Appwrite SDK."); + } else { + client + .setEndpoint(req.variables['APPWRITE_FUNCTION_ENDPOINT']) + .setProject(req.variables['APPWRITE_FUNCTION_PROJECT_ID']) + .setKey(req.variables['APPWRITE_FUNCTION_API_KEY']) + .setSelfSigned(true); + } + + res.json({ + areDevelopersAwesome: true, + }); +};