diff --git a/app/http.php b/app/http.php index 4bfacd4cac..c80fccee54 100644 --- a/app/http.php +++ b/app/http.php @@ -9,6 +9,7 @@ use Swoole\Http\Request as SwooleRequest; use Swoole\Http\Response as SwooleResponse; use Swoole\Http\Server; use Swoole\Process; +use Swoole\Table; use Utopia\Abuse\Adapters\Database\TimeLimit; use Utopia\App; use Utopia\Audit\Audit; @@ -16,11 +17,13 @@ use Utopia\Cache\Cache; use Utopia\CLI\Console; use Utopia\Config\Config; use Utopia\Database\Database; +use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Exception\Duplicate; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Permission; use Utopia\Database\Helpers\Role; +use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Logger\Log; use Utopia\Logger\Log\User; @@ -28,6 +31,12 @@ use Utopia\Pools\Group; use Utopia\Swoole\Files; use Utopia\System\System; +const DOMAIN_SYNC_TIMER = 30; // 30 seconds + +$domains = new Table(1_000_000); // 1 million rows +$domains->column('value', Table::TYPE_INT, 1); +$domains->create(); + $http = new Server( host: "0.0.0.0", port: System::getEnv('PORT', 80), @@ -35,11 +44,12 @@ $http = new Server( ); $payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing -$workerNumber = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6)); +$totalWorkers = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6)); $http ->set([ - 'worker_num' => $workerNumber, + 'worker_num' => $totalWorkers, + 'dispatch_func' => 'dispatch', 'open_http2_protocol' => true, 'http_compression' => false, 'package_max_length' => $payloadSize, @@ -58,6 +68,93 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) { Console::success('Reload completed...'); }); +/** + * Assigns HTTP requests to worker threads by analyzing its payload/content. + * + * Routes requests as 'safe' or 'risky' based on specific content patterns (like POST actions or certain domains) + * to optimize load distribution between the workers. Utilizes `$safeThreadsPercent` to manage risk by assigning + * riskier tasks to a dedicated worker subset. Prefers idle workers, with fallback to random selection if necessary. + * doc: https://openswoole.com/docs/modules/swoole-server/configuration#dispatch_func + * + * @param Server $server Swoole server instance. + * @param int $fd client ID + * @param int $type the type of data and its current state + * @param string|null $data Request content for categorization. + * @global int $totalThreads Total number of workers. + * @return int Chosen worker ID for the request. + */ +function dispatch(Server $server, int $fd, int $type, $data = null): int +{ + global $totalWorkers, $domains; + + // If data is not set we can send request to any worker + // first we try to pick idle worker, if not we randomly pick a worker + if ($data === null) { + for ($i = 0; $i < $totalWorkers; $i++) { + if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { + return $i; + } + } + return rand(0, $totalWorkers - 1); + } + + $riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1 + + // Each worker has numeric ID, starting from 0 and incrementing + // From 0 to riskyWorkers, we consider safe workers + // From riskyWorkers to totalWorkers, we consider risky workers + $riskyWorkers = (int) floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers + + $domain = ''; + // max up to 3 as first line has request details and second line has host + $lines = explode("\n", $data, 3); + $request = $lines[0]; + if (count($lines) > 1) { + $domain = trim(explode('Host: ', $lines[1])[1]); + } + + // Sync executions are considered risky + $risky = false; + if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) { + $risky = true; + } elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) { + $risky = true; + } elseif ($domains->get(md5($domain), 'value') === 1) { + // executions request coming from custom domain + $risky = true; + } + + if ($risky) { + // If risky request, only consider risky workers + for ($j = $riskyWorkers; $j < $totalWorkers; $j++) { + /** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */ + if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) { + // If idle worker found, give to him + return $j; + } + } + + // If no idle workers, give to random risky worker + $worker = rand($riskyWorkers, $totalWorkers - 1); + Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}"); + return $worker; + } + + // If safe request, give to any idle worker + // Its fine to pick risky worker here, because it's idle. Idle is never actually risky + for ($i = 0; $i < $totalWorkers; $i++) { + if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { + return $i; + } + } + + // If no idle worker found, give to random safe worker + // We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky. + $worker = rand(0, $riskyWorkers - 1); + Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}"); + return $worker; +} + include __DIR__ . '/controllers/general.php'; $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $register) { @@ -213,6 +310,9 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg Console::success('Server started successfully (max payload is ' . number_format($payloadSize) . ' bytes)'); Console::info("Master pid {$http->master_pid}, manager pid {$http->manager_pid}"); + // Start the task that starts fetching custom domains + $http->task([], 0); + // listen ctrl + c Process::signal(2, function () use ($http) { Console::log('Stop by Ctrl+C'); @@ -330,4 +430,59 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool } }); +// Fetch domains every `DOMAIN_SYNC_TIMER` seconds and update in the memory +$http->on('Task', function () use ($register, $domains) { + $lastSyncUpdate = null; + $pools = $register->get('pools'); + App::setResource('pools', fn () => $pools); + $app = new App('UTC'); + + /** @var Utopia\Database\Database $dbForConsole */ + $dbForConsole = $app->getResource('dbForConsole'); + + Console::loop(function () use ($dbForConsole, $domains, &$lastSyncUpdate) { + try { + $time = DateTime::now(); + $limit = 1000; + $sum = $limit; + $latestDocument = null; + + while ($sum === $limit) { + $queries = [Query::limit($limit)]; + if ($latestDocument !== null) { + $queries[] = Query::cursorAfter($latestDocument); + } + if ($lastSyncUpdate != null) { + $queries[] = Query::greaterThanEqual('$updatedAt', $lastSyncUpdate); + } + $queries[] = Query::equal('resourceType', ['function']); + $results = []; + try { + $results = Authorization::skip(fn () => $dbForConsole->find('rules', $queries)); + } catch (Throwable $th) { + Console::error($th->getMessage()); + } + + $sum = count($results); + foreach ($results as $document) { + $domain = $document->getAttribute('domain'); + if (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) { + continue; + } + $domains->set(md5($domain), ['value' => 1]); + } + $latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null; + } + $lastSyncUpdate = $time; + if ($sum > 0) { + Console::log("Sync domains tick: {$sum} domains were updated"); + } + } catch (Throwable $th) { + Console::error($th->getMessage()); + } + }, DOMAIN_SYNC_TIMER, 0, function ($error) { + Console::error($error); + }); +}); + $http->start();