feat(tasks): Coroutine

This commit is contained in:
Binyamin Yawitz 2024-06-03 14:09:58 -04:00
parent 90fd0ba392
commit 6083d8b7a8
No known key found for this signature in database
9 changed files with 415 additions and 361 deletions

View file

@ -9,192 +9,236 @@ use Appwrite\Event\Func;
use Appwrite\Event\Hamster;
use Appwrite\Platform\Appwrite;
use Appwrite\Utopia\Queue\Connections;
use Utopia\Cache\Adapter\None;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\CLI;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Queue\Connection\Redis;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Adapter\MySQL;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\DI\Dependency;
use Utopia\Logger\Log;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Swoole\Runtime;
use Utopia\CLI\Adapters\Swoole as SwooleCLI;
global $register;
global $global, $container;
CLI::setResource('register', fn () => $register);
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
CLI::setResource('connections', function () {
return new Connections();
});
$registry = new Dependency();
$registry
->setName('register')
->setCallback(fn() => $global);
CLI::setResource('cache', function ($pools, Connections $connections) {
$list = Config::getParam('pools-cache', []);
$adapters = [];
$connections = new Dependency();
$connections
->setName('connections')
->setCallback(fn() => new Connections());
foreach ($list as $value) {
$connection = $pools->get($value)->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
}
$cache = new Dependency();
$cache
->setName('cache')
->setCallback(function () {
return new Cache(new None());
});
$container->set($cache);
return new Cache(new Sharding($adapters));
}, ['pools', 'connections']);
$pools = new Dependency();
$pools
->setName('pools')
->inject('register')
->setCallback(function (Registry $register) {
return $register->get('pools');
});
CLI::setResource('pools', function (Registry $register) {
return $register->get('pools');
}, ['register']);
$dbForConsole = new Dependency();
$dbForConsole
->setName('dbForConsole')
->inject('pools')
->inject('cache')
->inject('auth')
->inject('connections')
->setCallback(function ($pools, $cache, $auth, Connections $connections) {
$pool = $pools['pools-console-main']['pool'];
$dsn = $pools['pools-console-main']['dsn'];
$connection = $pool->get();
$connections->add($connection, $pool);
CLI::setResource('dbForConsole', function ($pools, $cache, $auth, Connections $connections) {
$sleep = 3;
$maxAttempts = 5;
$attempts = 0;
$ready = false;
$adapter = match ($dsn->getScheme()) {
'mariadb' => new MariaDB($connection),
'mysql' => new MySQL($connection),
default => null
};
$connection = null;
$adapter->setDatabase($dsn->getPath());
do {
$attempts++;
try {
// Prepare database connection
$connection = $pools->get('console')->pop();
$dbAdapter = $connection->getResource();
$dbForConsole = new Database($dbAdapter, $cache);
$dbForConsole->setAuthorization($auth);
$dbForConsole
->setNamespace('_console')
->setMetadata('host', \gethostname())
->setMetadata('project', 'console');
// Ensure tables exist
$collections = Config::getParam('collections', [])['console'];
$last = \array_key_last($collections);
if (!($dbForConsole->exists($dbForConsole->getDatabase(), $last))) { /** TODO cache ready variable using registry */
throw new Exception('Tables not ready yet.');
}
$ready = true;
} catch (\Throwable $err) {
if($connection !== null) {
$connection->reclaim();
$connection = null;
}
Console::warning($err->getMessage());
sleep($sleep);
}
} while ($attempts < $maxAttempts && !$ready);
if($connection !== null) {
$connections->add($connection);
}
if (!$ready) {
throw new Exception("Console is not ready yet. Please try again later.");
}
return $dbForConsole;
}, ['pools', 'cache', 'auth', 'connections']);
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, $auth, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$databaseName = $project->getAttribute('database');
if (isset($databases[$databaseName])) {
$database = $databases[$databaseName];
$database->setNamespace('_' . $project->getInternalId());
return $database;
}
$connection = $pools->get($databaseName)->pop();
$connections->add($connection);
$dbAdapter = $connection->getResource();
$database = new Database($dbAdapter, $cache);
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$databases[$databaseName] = $database;
$database
->setNamespace('_' . $project->getInternalId())
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
$database->setNamespace('_console');
return $database;
};
}, ['pools', 'dbForConsole', 'cache', 'auth', 'connections']);
});
CLI::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
}, ['pools', 'connections']);
CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
CLI::setResource('queueForHamster', function (Connection $queue) {
return new Hamster($queue);
}, ['queue']);
CLI::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
CLI::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue);
}, ['queue']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');
$getProjectDB = new Dependency();
$getProjectDB
->setName('getProjectDB')
->inject('pools')
->inject('dbForConsole')
->inject('cache')
->inject('auth')
->inject('connections')
->setCallback(function (array $pools, Database $dbForConsole, Cache $cache, Authorization $auth, Connections $connections) {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections): Database {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
if ($logger) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$databaseName = $project->getAttribute('database');
$log = new Log();
$log->setNamespace($namespace);
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$pool = $pools['pools-database-' . $databaseName]['pool'];
$dsn = $pools['pools-database-' . $databaseName]['dsn'];
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$connection = $pool->get();
$connections->add($connection, $pool);
$adapter = match ($dsn->getScheme()) {
'mariadb' => new MariaDB($connection),
'mysql' => new MySQL($connection),
default => null
};
$adapter->setDatabase($dsn->getPath());
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$database->setNamespace('_' . $project->getInternalId());
$log->setAction($action);
return $database;
};
});
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$queue = new Dependency();
$queue
->setName('queue')
->inject('pools')
->inject('connections')
->setCallback(function (array $pools, Connections $connections) {
$pool = $pools['pools-queue-main']['pool'];
$dsn = $pools['pools-queue-main']['dsn'];
$connection = $pool->get();
$connections->add($connection, $pool);
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
return new Redis($dsn->getHost(), $dsn->getPort());
});
$responseCode = $logger->addLog($log);
Console::info('Usage stats log pushed with status code: ' . $responseCode);
}
$queueForFunctions = new Dependency();
$queueForFunctions
->setName('queueForFunctions')
->inject('queue')
->setCallback(function (Connection $queue) {
return new Func($queue);
});
Console::warning("Failed: {$error->getMessage()}");
Console::warning($error->getTraceAsString());
};
}, ['register']);
$queueForHamster = new Dependency();
$queueForHamster
->setName('queueForHamster')
->inject('queue')
->setCallback(function (Connection $queue) {
return new Hamster($queue);
});
CLI::setResource('auth', fn () => new Authorization());
$queueForDeletes = new Dependency();
$queueForDeletes
->setName('queueForDeletes')
->inject('queue')
->setCallback(function (Connection $queue) {
return new Delete($queue);
});
$queueForCertificates = new Dependency();
$queueForCertificates
->setName('queueForCertificates')
->inject('queue')
->setCallback(function (Connection $queue) {
return new Certificate($queue);
});
$queueForCertificates = new Dependency();
$queueForCertificates
->setName('queueForCertificates')
->inject('queue')
->setCallback(function (Connection $queue) {
return new Certificate($queue);
});
$logError = new Dependency();
$logError
->setName('logError')
->inject('register')
->setCallback(function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');
if ($logger) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace($namespace);
$log->setServer(\gethostname());
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('detailedTrace', $error->getTrace());
$log->setAction($action);
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
$responseCode = $logger->addLog($log);
Console::info('Usage stats log pushed with status code: ' . $responseCode);
}
Console::warning("Failed: {$error->getMessage()}");
Console::warning($error->getTraceAsString());
};
});
$auth = new Dependency();
$auth
->setName('auth')
->setCallback(fn() => new Authorization());
$container->set($registry);
$container->set($connections);
$container->set($cache);
$container->set($pools);
$container->set($dbForConsole);
$container->set($getProjectDB);
$container->set($queue);
$container->set($queueForFunctions);
$container->set($queueForHamster);
$container->set($queueForDeletes);
$container->set($queueForCertificates);
$container->set($logError);
$container->set($auth);
$platform = new Appwrite();
$platform->init(Service::TYPE_CLI);
$platform->init(Service::TYPE_CLI, ['adapter' => new SwooleCLI(1)]);
$cli = $platform->getCli();
@ -212,4 +256,6 @@ $cli
Console::error($error->getMessage());
});
$cli->run();
$cli
->setContainer($container)
->run();

View file

@ -23,6 +23,8 @@ use Utopia\Http\Adapter\Swoole\Server;
use Utopia\Http\Http;
use Utopia\System\System;
global $global, $container;
$workerNumber = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$payloadSize = 6 * (1024 * 1024); // 6MB
@ -48,7 +50,7 @@ $server = new Server('0.0.0.0', '80', [
$http = new Http($server, $container, 'UTC');
// $http->loadFiles(__DIR__ . '/../console');
$http->loadFiles(__DIR__ . '/../console');
$http->setRequestClass(Request::class);
$http->setResponseClass(Response::class);

View file

@ -39,7 +39,7 @@ use Utopia\Registry\Registry;
use Utopia\Storage\Device\Local;
use Utopia\System\System;
global $gloabl, $container;
global $global, $container;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);

View file

@ -50,7 +50,7 @@
"utopia-php/analytics": "dev-feat-framework-v2 as 0.10.99",
"utopia-php/audit": "dev-feat-framework-v2 as 0.39.99",
"utopia-php/cache": "0.9.*",
"utopia-php/cli": "0.17.*",
"utopia-php/cli": "dev-dev-coroutines as 0.17.99",
"utopia-php/config": "0.2.*",
"utopia-php/database": "dev-feat-framework-v2 as 0.49.99",
"utopia-php/domains": "dev-feat-framework-v2 as 0.5.99",

158
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "bb3213cfb05b043e7d2ea2002069bb98",
"content-hash": "7778579de897a3e077914bd37686a62b",
"packages": [
{
"name": "adhocore/jwt",
@ -1050,12 +1050,12 @@
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php80.git",
"reference": "7d191eb4022901cd3d91a816ec5464ca3a08a8aa"
"reference": "77fa7995ac1b21ab60769b7323d600a991a90433"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/7d191eb4022901cd3d91a816ec5464ca3a08a8aa",
"reference": "7d191eb4022901cd3d91a816ec5464ca3a08a8aa",
"url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/77fa7995ac1b21ab60769b7323d600a991a90433",
"reference": "77fa7995ac1b21ab60769b7323d600a991a90433",
"shasum": ""
},
"require": {
@ -1123,7 +1123,7 @@
"type": "tidelift"
}
],
"time": "2024-04-19T06:31:17+00:00"
"time": "2024-05-31T15:07:36+00:00"
},
{
"name": "thecodingmachine/safe",
@ -1458,27 +1458,29 @@
},
{
"name": "utopia-php/cli",
"version": "0.17.0",
"version": "dev-dev-coroutines",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/cli.git",
"reference": "0829fd5215afe88f53f3091cedc808da801fd1bb"
"reference": "c9b049160aff2eecea7dd1ff041165ae15f41f76"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/cli/zipball/0829fd5215afe88f53f3091cedc808da801fd1bb",
"reference": "0829fd5215afe88f53f3091cedc808da801fd1bb",
"url": "https://api.github.com/repos/utopia-php/cli/zipball/c9b049160aff2eecea7dd1ff041165ae15f41f76",
"reference": "c9b049160aff2eecea7dd1ff041165ae15f41f76",
"shasum": ""
},
"require": {
"php": ">=7.4",
"utopia-php/framework": "0.34.*"
"utopia-php/di": "dev-main",
"utopia-php/framework": "dev-feat-di-upgrade as 0.34.99"
},
"require-dev": {
"laravel/pint": "1.2.*",
"phpstan/phpstan": "^1.10",
"phpunit/phpunit": "^9.3",
"squizlabs/php_codesniffer": "^3.6",
"vimeo/psalm": "4.0.1"
"swoole/ide-helper": "4.8.8"
},
"type": "library",
"autoload": {
@ -1501,9 +1503,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/cli/issues",
"source": "https://github.com/utopia-php/cli/tree/0.17.0"
"source": "https://github.com/utopia-php/cli/tree/dev-coroutines"
},
"time": "2024-01-24T11:37:29+00:00"
"time": "2024-06-03T17:55:34+00:00"
},
{
"name": "utopia-php/config",
@ -2035,22 +2037,21 @@
},
{
"name": "utopia-php/migration",
"version": "0.4.1",
"version": "0.4.4",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/migration.git",
"reference": "ae3cfe93f6d313105d226aeb68806660c806a925"
"reference": "a8a5d392bebf082faf289f4dfe09d9fd76844c33"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/migration/zipball/ae3cfe93f6d313105d226aeb68806660c806a925",
"reference": "ae3cfe93f6d313105d226aeb68806660c806a925",
"url": "https://api.github.com/repos/utopia-php/migration/zipball/a8a5d392bebf082faf289f4dfe09d9fd76844c33",
"reference": "a8a5d392bebf082faf289f4dfe09d9fd76844c33",
"shasum": ""
},
"require": {
"appwrite/appwrite": "10.1.0",
"php": "8.*",
"utopia-php/cli": "0.*"
"php": "8.*"
},
"require-dev": {
"laravel/pint": "1.*",
@ -2077,9 +2078,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/migration/issues",
"source": "https://github.com/utopia-php/migration/tree/0.4.1"
"source": "https://github.com/utopia-php/migration/tree/0.4.4"
},
"time": "2024-05-01T13:19:18+00:00"
"time": "2024-05-17T05:25:31+00:00"
},
{
"name": "utopia-php/mongo",
@ -2197,20 +2198,20 @@
"source": {
"type": "git",
"url": "https://github.com/utopia-php/platform.git",
"reference": "e6f2f281b2ad962211b1c6f88650d0230f615a3a"
"reference": "55b1d8675a35d8fb269614a1e916a1bff616983c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/e6f2f281b2ad962211b1c6f88650d0230f615a3a",
"reference": "e6f2f281b2ad962211b1c6f88650d0230f615a3a",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/55b1d8675a35d8fb269614a1e916a1bff616983c",
"reference": "55b1d8675a35d8fb269614a1e916a1bff616983c",
"shasum": ""
},
"require": {
"ext-json": "*",
"ext-redis": "*",
"php": ">=8.0",
"utopia-php/cli": "0.17.*",
"utopia-php/framework": "0.34.*",
"utopia-php/cli": "dev-dev-coroutines as 0.17.99",
"utopia-php/framework": "dev-feat-di-upgrade as 0.34.99",
"utopia-php/queue": "dev-feat-coroutine-and-di as 0.7.99"
},
"require-dev": {
@ -2239,7 +2240,7 @@
"issues": "https://github.com/utopia-php/platform/issues",
"source": "https://github.com/utopia-php/platform/tree/feat-framework-v2"
},
"time": "2024-04-21T18:38:38+00:00"
"time": "2024-06-03T18:01:18+00:00"
},
{
"name": "utopia-php/queue",
@ -2540,16 +2541,16 @@
},
{
"name": "utopia-php/vcs",
"version": "0.6.5",
"version": "0.6.6",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/vcs.git",
"reference": "104e47ea8e38c156ec0e0bd415caa3dcd5046fe2"
"reference": "e538264cfee5e3efdfe1771efba04750cf20b2c4"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/vcs/zipball/104e47ea8e38c156ec0e0bd415caa3dcd5046fe2",
"reference": "104e47ea8e38c156ec0e0bd415caa3dcd5046fe2",
"url": "https://api.github.com/repos/utopia-php/vcs/zipball/e538264cfee5e3efdfe1771efba04750cf20b2c4",
"reference": "e538264cfee5e3efdfe1771efba04750cf20b2c4",
"shasum": ""
},
"require": {
@ -2583,9 +2584,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/vcs/issues",
"source": "https://github.com/utopia-php/vcs/tree/0.6.5"
"source": "https://github.com/utopia-php/vcs/tree/0.6.6"
},
"time": "2024-01-08T17:11:12+00:00"
"time": "2024-05-17T09:36:30+00:00"
},
{
"name": "utopia-php/view",
@ -2815,16 +2816,16 @@
"packages-dev": [
{
"name": "appwrite/sdk-generator",
"version": "0.38.2",
"version": "0.38.6",
"source": {
"type": "git",
"url": "https://github.com/appwrite/sdk-generator.git",
"reference": "51284668529e2b10ed933412a42b603c76cded23"
"reference": "d7016d6d72545e84709892faca972eb4bf5bd699"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/51284668529e2b10ed933412a42b603c76cded23",
"reference": "51284668529e2b10ed933412a42b603c76cded23",
"url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/d7016d6d72545e84709892faca972eb4bf5bd699",
"reference": "d7016d6d72545e84709892faca972eb4bf5bd699",
"shasum": ""
},
"require": {
@ -2860,9 +2861,9 @@
"description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms",
"support": {
"issues": "https://github.com/appwrite/sdk-generator/issues",
"source": "https://github.com/appwrite/sdk-generator/tree/0.38.2"
"source": "https://github.com/appwrite/sdk-generator/tree/0.38.6"
},
"time": "2024-04-25T07:49:29+00:00"
"time": "2024-05-20T18:00:16+00:00"
},
{
"name": "doctrine/deprecations",
@ -2984,16 +2985,16 @@
},
{
"name": "laravel/pint",
"version": "v1.15.3",
"version": "v1.16.0",
"source": {
"type": "git",
"url": "https://github.com/laravel/pint.git",
"reference": "3600b5d17aff52f6100ea4921849deacbbeb8656"
"reference": "1b3a3dc5bc6a81ff52828ba7277621f1d49d6d98"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/laravel/pint/zipball/3600b5d17aff52f6100ea4921849deacbbeb8656",
"reference": "3600b5d17aff52f6100ea4921849deacbbeb8656",
"url": "https://api.github.com/repos/laravel/pint/zipball/1b3a3dc5bc6a81ff52828ba7277621f1d49d6d98",
"reference": "1b3a3dc5bc6a81ff52828ba7277621f1d49d6d98",
"shasum": ""
},
"require": {
@ -3004,11 +3005,11 @@
"php": "^8.1.0"
},
"require-dev": {
"friendsofphp/php-cs-fixer": "^3.54.0",
"illuminate/view": "^10.48.8",
"larastan/larastan": "^2.9.5",
"laravel-zero/framework": "^10.3.0",
"mockery/mockery": "^1.6.11",
"friendsofphp/php-cs-fixer": "^3.57.1",
"illuminate/view": "^10.48.10",
"larastan/larastan": "^2.9.6",
"laravel-zero/framework": "^10.4.0",
"mockery/mockery": "^1.6.12",
"nunomaduro/termwind": "^1.15.1",
"pestphp/pest": "^2.34.7"
},
@ -3046,7 +3047,7 @@
"issues": "https://github.com/laravel/pint/issues",
"source": "https://github.com/laravel/pint"
},
"time": "2024-04-30T15:02:26+00:00"
"time": "2024-05-21T18:08:25+00:00"
},
{
"name": "matthiasmullie/minify",
@ -3239,12 +3240,12 @@
"source": {
"type": "git",
"url": "https://github.com/nikic/PHP-Parser.git",
"reference": "c5ee33df86c06b3278c670f64273b1ba768a0744"
"reference": "daaadc3bae458908aa477b90a8932e7da9253f22"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/c5ee33df86c06b3278c670f64273b1ba768a0744",
"reference": "c5ee33df86c06b3278c670f64273b1ba768a0744",
"url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/daaadc3bae458908aa477b90a8932e7da9253f22",
"reference": "daaadc3bae458908aa477b90a8932e7da9253f22",
"shasum": ""
},
"require": {
@ -3290,7 +3291,7 @@
"issues": "https://github.com/nikic/PHP-Parser/issues",
"source": "https://github.com/nikic/PHP-Parser/tree/master"
},
"time": "2024-04-19T12:04:10+00:00"
"time": "2024-06-03T06:24:19+00:00"
},
{
"name": "phar-io/manifest",
@ -3470,12 +3471,12 @@
"source": {
"type": "git",
"url": "https://github.com/phpDocumentor/ReflectionDocBlock.git",
"reference": "88a07d262854c827db22f2eac8b072138e492f65"
"reference": "aa53f8d4374d1f5bd3fc598548d6272cb5d9bf39"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/phpDocumentor/ReflectionDocBlock/zipball/88a07d262854c827db22f2eac8b072138e492f65",
"reference": "88a07d262854c827db22f2eac8b072138e492f65",
"url": "https://api.github.com/repos/phpDocumentor/ReflectionDocBlock/zipball/aa53f8d4374d1f5bd3fc598548d6272cb5d9bf39",
"reference": "aa53f8d4374d1f5bd3fc598548d6272cb5d9bf39",
"shasum": ""
},
"require": {
@ -3527,7 +3528,7 @@
"issues": "https://github.com/phpDocumentor/ReflectionDocBlock/issues",
"source": "https://github.com/phpDocumentor/ReflectionDocBlock/tree/5.x"
},
"time": "2024-05-08T18:52:15+00:00"
"time": "2024-05-21T06:14:15+00:00"
},
{
"name": "phpdocumentor/type-resolver",
@ -3535,12 +3536,12 @@
"source": {
"type": "git",
"url": "https://github.com/phpDocumentor/TypeResolver.git",
"reference": "483fb7fe262607b0a5ec32f99bdc42e2212b22fe"
"reference": "eee054a3d40f09920f5b27c9b09a6483f88d9d24"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/phpDocumentor/TypeResolver/zipball/483fb7fe262607b0a5ec32f99bdc42e2212b22fe",
"reference": "483fb7fe262607b0a5ec32f99bdc42e2212b22fe",
"url": "https://api.github.com/repos/phpDocumentor/TypeResolver/zipball/eee054a3d40f09920f5b27c9b09a6483f88d9d24",
"reference": "eee054a3d40f09920f5b27c9b09a6483f88d9d24",
"shasum": ""
},
"require": {
@ -3586,7 +3587,7 @@
"issues": "https://github.com/phpDocumentor/TypeResolver/issues",
"source": "https://github.com/phpDocumentor/TypeResolver/tree/1.x"
},
"time": "2024-03-29T20:21:22+00:00"
"time": "2024-05-24T14:24:30+00:00"
},
{
"name": "phpspec/prophecy",
@ -3660,16 +3661,16 @@
},
{
"name": "phpstan/phpdoc-parser",
"version": "1.29.0",
"version": "1.29.1",
"source": {
"type": "git",
"url": "https://github.com/phpstan/phpdoc-parser.git",
"reference": "536889f2b340489d328f5ffb7b02bb6b183ddedc"
"reference": "fcaefacf2d5c417e928405b71b400d4ce10daaf4"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/536889f2b340489d328f5ffb7b02bb6b183ddedc",
"reference": "536889f2b340489d328f5ffb7b02bb6b183ddedc",
"url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/fcaefacf2d5c417e928405b71b400d4ce10daaf4",
"reference": "fcaefacf2d5c417e928405b71b400d4ce10daaf4",
"shasum": ""
},
"require": {
@ -3701,9 +3702,9 @@
"description": "PHPDoc parser with support for nullable, intersection and generic types",
"support": {
"issues": "https://github.com/phpstan/phpdoc-parser/issues",
"source": "https://github.com/phpstan/phpdoc-parser/tree/1.29.0"
"source": "https://github.com/phpstan/phpdoc-parser/tree/1.29.1"
},
"time": "2024-05-06T12:04:23+00:00"
"time": "2024-05-31T08:52:43+00:00"
},
{
"name": "phpstan/phpstan",
@ -5239,12 +5240,12 @@
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-ctype.git",
"reference": "c9e59dec962d38cf2e0e4c61c4a1a1312f4dd7fe"
"reference": "0424dff1c58f028c451efff2045f5d92410bd540"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/c9e59dec962d38cf2e0e4c61c4a1a1312f4dd7fe",
"reference": "c9e59dec962d38cf2e0e4c61c4a1a1312f4dd7fe",
"url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/0424dff1c58f028c451efff2045f5d92410bd540",
"reference": "0424dff1c58f028c451efff2045f5d92410bd540",
"shasum": ""
},
"require": {
@ -5311,7 +5312,7 @@
"type": "tidelift"
}
],
"time": "2024-04-19T06:31:17+00:00"
"time": "2024-05-31T15:07:36+00:00"
},
{
"name": "symfony/polyfill-mbstring",
@ -5319,12 +5320,12 @@
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-mbstring.git",
"reference": "e642fbe7a7b73cdb05460555289a9057bfd6ead6"
"reference": "098e36a5b73de12beeb5ac17e80abf3696f7ad5f"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/e642fbe7a7b73cdb05460555289a9057bfd6ead6",
"reference": "e642fbe7a7b73cdb05460555289a9057bfd6ead6",
"url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/098e36a5b73de12beeb5ac17e80abf3696f7ad5f",
"reference": "098e36a5b73de12beeb5ac17e80abf3696f7ad5f",
"shasum": ""
},
"require": {
@ -5392,7 +5393,7 @@
"type": "tidelift"
}
],
"time": "2024-04-19T06:31:17+00:00"
"time": "2024-05-31T15:07:36+00:00"
},
{
"name": "textalk/websocket",
@ -5624,6 +5625,12 @@
"alias": "0.39.99",
"alias_normalized": "0.39.99.0"
},
{
"package": "utopia-php/cli",
"version": "dev-dev-coroutines",
"alias": "0.17.99",
"alias_normalized": "0.17.99.0"
},
{
"package": "utopia-php/database",
"version": "dev-feat-framework-v2",
@ -5672,6 +5679,7 @@
"utopia-php/abuse": 20,
"utopia-php/analytics": 20,
"utopia-php/audit": 20,
"utopia-php/cli": 20,
"utopia-php/database": 20,
"utopia-php/domains": 20,
"utopia-php/framework": 20,

View file

@ -10,6 +10,8 @@ x-logging: &x-logging
max-file: "5"
max-size: "10m"
version: "3"
services:
traefik:
image: traefik:2.11
@ -40,6 +42,7 @@ services:
networks:
- gateway
- appwrite
- runtimes
appwrite:
container_name: appwrite
@ -48,7 +51,7 @@ services:
build:
context: .
args:
DEBUG: false
DEBUG: true
TESTING: true
VERSION: dev
ports:
@ -75,14 +78,15 @@ services:
- appwrite-config:/storage/config:rw
- appwrite-certificates:/storage/certificates:rw
- appwrite-functions:/storage/functions:rw
- appwrite-builds:/storage/builds:rw
- ./phpunit.xml:/usr/src/code/phpunit.xml
- ./tests:/usr/src/code/tests
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./docs:/usr/src/code/docs
- ./public:/usr/src/code/public
- ./src:/usr/src/code/src
- ./dev:/usr/src/code/dev
- ./temp-debug:/tmp/xdebug
depends_on:
- mariadb
- redis
@ -92,6 +96,7 @@ services:
- -e
- app/http.php
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_LOCALE
@ -187,7 +192,6 @@ services:
- _APP_MESSAGE_EMAIL_TEST_DSN
- _APP_MESSAGE_PUSH_TEST_DSN
- _APP_CONSOLE_COUNTRIES_DENYLIST
appwrite-realtime:
entrypoint: realtime
<<: *x-logging
@ -214,11 +218,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPTIONS_ABUSE
@ -246,11 +252,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -275,12 +283,14 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
- request-catcher
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -315,8 +325,10 @@ services:
- appwrite-builds:/storage/builds:rw
- appwrite-certificates:/storage/certificates:rw
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -364,11 +376,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -397,11 +411,13 @@ services:
- appwrite-functions:/storage/functions:rw
- appwrite-builds:/storage/builds:rw
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -465,8 +481,10 @@ services:
- appwrite-config:/storage/config:rw
- appwrite-certificates:/storage/certificates:rw
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -495,15 +513,19 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
- openruntimes-executor
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DOMAIN
- _APP_OPTIONS_FORCE_HTTPS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
@ -534,12 +556,14 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- maildev
# - smtp
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -569,10 +593,12 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -601,11 +627,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
- ./tests:/usr/src/code/tests
depends_on:
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -626,19 +654,21 @@ services:
- _APP_MIGRATIONS_FIREBASE_CLIENT_ID
- _APP_MIGRATIONS_FIREBASE_CLIENT_SECRET
appwrite-maintenance:
appwrite-task-maintenance:
entrypoint: maintenance
<<: *x-logging
container_name: appwrite-maintenance
container_name: appwrite-task-maintenance
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_DOMAIN
@ -672,11 +702,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -703,11 +735,13 @@ services:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -725,20 +759,22 @@ services:
- _APP_LOGGING_CONFIG
- _APP_USAGE_AGGREGATION_INTERVAL
appwrite-scheduler-functions:
appwrite-task-scheduler-functions:
entrypoint: schedule-functions
<<: *x-logging
container_name: appwrite-scheduler-functions
container_name: appwrite-task-scheduler-functions
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -752,20 +788,22 @@ services:
- _APP_DB_USER
- _APP_DB_PASS
appwrite-scheduler-messages:
appwrite-task-scheduler-messages:
entrypoint: schedule-messages
<<: *x-logging
container_name: appwrite-scheduler-messages
container_name: appwrite-task-scheduler-messages
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./vendor:/usr/src/code/vendor
- ./src:/usr/src/code/src
depends_on:
- mariadb
- redis
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
@ -785,65 +823,9 @@ services:
networks:
- appwrite
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- _APP_ASSISTANT_OPENAI_API_KEY
appwrite-worker-hamster:
entrypoint: worker-hamster
<<: *x-logging
container_name: appwrite-worker-hamster
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_MIXPANEL_TOKEN
appwrite-hamster-scheduler:
entrypoint: hamster
<<: *x-logging
container_name: appwrite-hamster-scheduler
image: appwrite-dev
networks:
- appwrite
volumes:
- ./app:/usr/src/code/app
- ./src:/usr/src/code/src
depends_on:
- redis
- mariadb
environment:
- _APP_ENV
- _APP_WORKER_PER_CORE
- _APP_OPENSSL_KEY_V1
- _APP_REDIS_HOST
- _APP_REDIS_PORT
- _APP_REDIS_USER
- _APP_REDIS_PASS
- _APP_DB_HOST
- _APP_DB_PORT
- _APP_DB_SCHEMA
- _APP_DB_USER
- _APP_DB_PASS
- _APP_HAMSTER_TIME
- _APP_HAMSTER_INTERVAL
openruntimes-executor:
container_name: openruntimes-executor
hostname: appwrite-executor
@ -862,6 +844,7 @@ services:
# It's not possible to share mount file between 2 containers without host mount (copying is too slow)
- /tmp:/tmp:rw
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- OPR_EXECUTOR_INACTIVE_TRESHOLD=$_APP_FUNCTIONS_INACTIVE_THRESHOLD
- OPR_EXECUTOR_MAINTENANCE_INTERVAL=$_APP_FUNCTIONS_MAINTENANCE_INTERVAL
- OPR_EXECUTOR_NETWORK=$_APP_FUNCTIONS_RUNTIMES_NETWORK
@ -905,6 +888,7 @@ services:
- appwrite
- runtimes
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- OPR_PROXY_WORKER_PER_CORE=$_APP_WORKER_PER_CORE
- OPR_PROXY_ENV=$_APP_ENV
- OPR_PROXY_EXECUTOR_SECRET=$_APP_EXECUTOR_SECRET
@ -925,10 +909,10 @@ services:
- appwrite
volumes:
- appwrite-mariadb:/var/lib/mysql:rw
- ./mariadb-config.cnf:/etc/mysql/conf.d/mariadb-config.cnf
ports:
- "3306:3306"
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- MYSQL_ROOT_PASSWORD=${_APP_DB_ROOT_PASS}
- MYSQL_DATABASE=${_APP_DB_SCHEMA}
- MYSQL_USER=${_APP_DB_USER}
@ -944,6 +928,7 @@ services:
# networks:
# - appwrite
# environment:
# PHP_IDE_CONFIG=serverName=Appwrite
# - LOCAL_DOMAINS=@
# - RELAY_FROM_HOSTS=192.168.0.0/16 ; *.yourdomain.com
# - SMARTHOST_HOST=smtp
@ -1019,6 +1004,7 @@ services:
networks:
- appwrite
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- REDIS_HOSTS=redis
ports:
- "8081:5540"
@ -1032,6 +1018,7 @@ services:
ports:
- "9509:3000"
environment:
- PHP_IDE_CONFIG=serverName=Appwrite
- SERVER_URL=http://localhost/v1/graphql
# Dev Tools End ------------------------------------------------------------------------------------------

View file

@ -2,6 +2,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Utopia\Queue\Connections;
use Swoole\Timer;
use Utopia\CLI\Console;
use Utopia\Database\Database;
@ -21,17 +22,19 @@ abstract class ScheduleBase extends Action
protected const ENQUEUE_TIMER = 60; //seconds
protected array $schedules = [];
protected Connections $connections;
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract protected function enqueueResources(
Group $pools,
array $pools,
Database $dbForConsole
);
public function __construct()
{
$this->connections = new Connections();
$type = static::getSupportedResource();
$this
@ -39,7 +42,7 @@ abstract class ScheduleBase extends Action
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
->callback(fn (array $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
@ -47,7 +50,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
public function action(array $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -124,76 +127,71 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results);
}
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
Console::log("Sync tick: Running at $time");
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
Console::log("Sync tick: Running at $time");
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [System::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null;
// Check if resource has been updated since last sync
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
$new = \strtotime($document['resourceUpdatedAt']);
if (!$document['active']) {
Console::info("Removing: {$document['resourceId']}");
unset($this->schedules[$document['resourceId']]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceId']}");
$this->schedules[$document['resourceId']] = $getSchedule($document);
}
}
$latestDocument = \end($results);
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$results = $dbForConsole->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', [System::getEnv('_APP_REGION', 'default')]),
Query::equal('resourceType', [static::getSupportedResource()]),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$pools->reclaim();
$sum = count($results);
$total = $total + $sum;
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
foreach ($results as $document) {
$localDocument = $schedules[$document['resourceId']] ?? null;
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForConsole)
);
// Check if resource has been updated since last sync
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
$new = \strtotime($document['resourceUpdatedAt']);
$this->enqueueResources($pools, $dbForConsole);
if (!$document['active']) {
Console::info("Removing: {$document['resourceId']}");
unset($this->schedules[$document['resourceId']]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceId']}");
$this->schedules[$document['resourceId']] = $getSchedule($document);
}
}
$latestDocument = \end($results);
}
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn() => $this->enqueueResources($pools, $dbForConsole)
);
$this->enqueueResources($pools, $dbForConsole);
}
}

View file

@ -8,6 +8,7 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\Queue\Connection\Redis;
class ScheduleFunctions extends ScheduleBase
{
@ -26,7 +27,7 @@ class ScheduleFunctions extends ScheduleBase
return 'function';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
protected function enqueueResources(array $pools, Database $dbForConsole): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
@ -68,8 +69,12 @@ class ScheduleFunctions extends ScheduleBase
\go(function () use ($delay, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$pool = $pools['pools-queue-main']['pool'];
$dsn = $pools['pools-queue-main']['dsn'];
$connection = $pool->get();
$this->connections->add($connection, $pool);
$queueConnection = new Redis($dsn->getHost(), $dsn->getPort());
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
@ -79,7 +84,7 @@ class ScheduleFunctions extends ScheduleBase
$schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($connection);
$queueForFunctions = new Func($queueConnection);
$queueForFunctions
->setType('schedule')
@ -90,7 +95,8 @@ class ScheduleFunctions extends ScheduleBase
->trigger();
}
$queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
$this->connections->reclaim();
// $queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
});
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Connection\Redis;
class ScheduleMessages extends ScheduleBase
{
@ -21,7 +22,7 @@ class ScheduleMessages extends ScheduleBase
return 'message';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
protected function enqueueResources(array $pools, Database $dbForConsole): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -36,9 +37,14 @@ class ScheduleMessages extends ScheduleBase
}
\go(function () use ($now, $schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$pool = $pools['pools-queue-main']['pool'];
$dsn = $pools['pools-queue-main']['dsn'];
$connection = $pool->get();
$this->connections->add($connection, $pool);
$queueConnection = new Redis($dsn->getHost(), $dsn->getPort());
$queueForMessaging = new Messaging($queueConnection);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
@ -51,7 +57,8 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
$this->connections->reclaim();
// $queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
unset($this->schedules[$schedule['resourceId']]);
});