Work on the main worker

This commit is contained in:
Eldad Fux 2024-04-15 22:03:07 +02:00
parent 13eb3bccd0
commit 07a830e9a6
3 changed files with 65 additions and 49 deletions

View file

@ -1,9 +1,4 @@
<?php
ini_set('memory_limit', '512M');
ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
ini_set('default_socket_timeout', -1);
error_reporting(E_ALL);
require_once __DIR__ . '/../vendor/autoload.php';

View file

@ -6,8 +6,11 @@ require_once __DIR__ . '/init/locale.php';
require_once __DIR__ . '/init/database/filters.php';
require_once __DIR__ . '/init/database/formats.php';
// Unlimited memory limit to handle as many coroutines/requests as possible
ini_set('memory_limit', '-1');
ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
ini_set('default_socket_timeout', -1);
error_reporting(E_ALL);
global $http, $container;

View file

@ -1,6 +1,7 @@
<?php
require_once __DIR__ . '/init.php';
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/init2.php';
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
@ -18,48 +19,63 @@ use Appwrite\Event\UsageDump;
use Appwrite\Platform\Appwrite;
use Appwrite\Utopia\Queue\Connections;
use Swoole\Runtime;
use Utopia\Cache\Adapter\None;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\MariaDB;
use Utopia\Database\Adapter\MySQL;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Http\Http;
use Utopia\Logger\Log;
use Utopia\Logger\Logger;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\Queue\Connection\Redis;
use Utopia\Queue\Message;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Local;
use Utopia\System\System;
global $register;
global $gloabl;
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
Server::setResource('register', fn () => $register);
Server::setResource('register', fn () => $gloabl);
Server::setResource('connections', function () {
return new Connections();
});
Server::setResource('dbForConsole', function (Cache $cache, Registry $register, Authorization $auth, Connections $connections) {
$pools = $register->get('pools');
$connection = $pools->get('console')->pop();
$connections->add($connection);
$database = $connection->getResource();
Server::setResource('pools', function () use ($gloabl) {
return $gloabl->get('pools');
});
$adapter = new Database($database, $cache);
$adapter->setAuthorization($auth);
$adapter->setNamespace('_console');
Server::setResource('dbForConsole', function (Cache $cache, array $pools, Authorization $auth, Connections $connections) {
$pool = $pools['pools-console-main']['pool'];
$dsn = $pools['pools-console-main']['dsn'];
$connection = $pool->get();
$connections->add($connection, $pool);
return $adapter;
}, ['cache', 'register', 'auth', 'connections']);
$adapter = match ($dsn->getScheme()) {
'mariadb' => new MariaDB($connection),
'mysql' => new MySQL($connection),
default => null
};
$adapter->setDatabase($dsn->getPath());
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$database->setNamespace('_console');
return $database;
}, ['cache', 'pools', 'auth', 'connections']);
Server::setResource('project', function (Message $message, Database $dbForConsole) {
$payload = $message->getPayload() ?? [];
@ -72,21 +88,31 @@ Server::setResource('project', function (Message $message, Database $dbForConsol
return $dbForConsole->getDocument('projects', $project->getId());
}, ['message', 'dbForConsole']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Document $project, Database $dbForConsole, Authorization $auth, Connections $connections) {
Server::setResource('dbForProject', function (Cache $cache, array $pools, Message $message, Document $project, Database $dbForConsole, Authorization $auth, Connections $connections) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
$pools = $register->get('pools');
$connection = $pools->get($project->getAttribute('database'))->pop();
$connections->add($connection);
$database = $connection->getResource();
$pool = $pools['pools-database-'.$project->getAttribute('database')]['pool'];
$dsn = $pools['pools-database-'.$project->getAttribute('database')]['dsn'];
$adapter = new Database($database, $cache);
$adapter->setAuthorization($auth);
$adapter->setNamespace('_' . $project->getInternalId());
return $adapter;
}, ['cache', 'register', 'message', 'project', 'dbForConsole', 'auth', 'connections']);
$connection = $pool->get();
$connections->add($connection, $pool);
$adapter = match ($dsn->getScheme()) {
'mariadb' => new MariaDB($connection),
'mysql' => new MySQL($connection),
default => null
};
$adapter->setDatabase($dsn->getPath());
$database = new Database($adapter, $cache);
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$database->setNamespace('_' . $project->getInternalId());
return $database;
}, ['cache', 'pools', 'message', 'project', 'dbForConsole', 'auth', 'connections']);
Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache, Authorization $auth, Connections $connections) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
@ -131,19 +157,9 @@ Server::setResource('executionRetention', function () {
return DateTime::addSeconds(new \DateTime(), -1 * System::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', 1209600));
});
Server::setResource('cache', function (Registry $register, Connections $connections) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$connection = $pools->get($value)->pop();
$connections->add($connection);
$adapters[] = $connection->getResource();
}
return new Cache(new Sharding($adapters));
}, ['register', 'connections']);
Server::setResource('cache', function () {
return new Cache(new None());
}, []);
Server::setResource('log', fn () => new Log());
@ -155,10 +171,13 @@ Server::setResource('queueForUsageDump', function (Connection $queue) {
return new UsageDump($queue);
}, ['queue']);
Server::setResource('queue', function (Group $pools, Connections $connections) {
$connection = $pools->get('queue')->pop();
$connections->add($connection);
return $connection->getResource();
Server::setResource('queue', function (array $pools, Connections $connections) {
$pool = $pools['pools-queue-main']['pool'];
$dsn = $pools['pools-queue-main']['dsn'];
$connection = $pool->get();
$connections->add($connection, $pool);
return new Redis($dsn->getHost(), $dsn->getPort());
}, ['pools', 'connections']);
Server::setResource('queueForDatabase', function (Connection $queue) {
@ -235,7 +254,6 @@ Server::setResource('deviceForLocalFiles', function (Document $project) {
Server::setResource('auth', fn () => new Authorization());
$pools = $register->get('pools');
$platform = new Appwrite();
$args = $_SERVER['argv'];
@ -267,7 +285,7 @@ try {
*/
$platform->init(Service::TYPE_WORKER, [
'workersNum' => System::getEnv('_APP_WORKERS_NUM', 1),
'connection' => $pools->get('queue')->pop()->getResource(),
'connection' => $global->get('pools')['pools-queue-main']['pool']->get(),
'workerName' => strtolower($workerName) ?? null,
'queueName' => $queueName
]);