From 391e045775051f41f3e6c5c6835b92901387853c Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Thu, 24 Apr 2025 23:11:52 +0200 Subject: [PATCH] feat(dispatch): add contextual dispatch logic --- app/http.php | 172 ++++++++++++++++++++++++++------------------------- 1 file changed, 89 insertions(+), 83 deletions(-) diff --git a/app/http.php b/app/http.php index 451a25a601..4fc6ea6825 100644 --- a/app/http.php +++ b/app/http.php @@ -43,29 +43,6 @@ $http = new Server( $payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing $totalWorkers = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6)); -$http - ->set([ - 'worker_num' => $totalWorkers, - 'dispatch_func' => 'dispatch', - 'open_http2_protocol' => true, - 'http_compression' => false, - 'package_max_length' => $payloadSize, - 'buffer_output_size' => $payloadSize, - 'task_worker_num' => 1, // required for the task to fetch domains background - ]); - -$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) { - Console::success('Worker ' . ++$workerId . ' started successfully'); -}); - -$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server, $workerId) { - Console::success('Starting reload...'); -}); - -$http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) { - Console::success('Reload completed...'); -}); - /** * Assigns HTTP requests to worker threads by analyzing its payload/content. * @@ -83,76 +60,105 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) { */ function dispatch(Server $server, int $fd, int $type, $data = null): int { - global $totalWorkers, $domains; + $resolveWorkerId = function (Server $server, $data = null) { + global $totalWorkers, $domains; - // If data is not set we can send request to any worker - // first we try to pick idle worker, if not we randomly pick a worker - if ($data === null) { + // If data is not set we can send request to any worker + // first we try to pick idle worker, if not we randomly pick a worker + if ($data === null) { + for ($i = 0; $i < $totalWorkers; $i++) { + if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { + return $i; + } + } + return rand(0, $totalWorkers - 1); + } + + $riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1 + + // Each worker has numeric ID, starting from 0 and incrementing + // From 0 to riskyWorkers, we consider safe workers + // From riskyWorkers to totalWorkers, we consider risky workers + $riskyWorkers = (int)floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers + + $domain = ''; + // max up to 3 as first line has request details and second line has host + $lines = explode("\n", $data, 3); + $request = $lines[0]; + if (count($lines) > 1) { + $domain = trim(explode('Host: ', $lines[1])[1]); + } + + // Sync executions are considered risky + $risky = false; + if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) { + $risky = true; + } elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) { + $risky = true; + } elseif ($domains->get(md5($domain), 'value') === 1) { + // executions request coming from custom domain + $risky = true; + } + + if ($risky) { + // If risky request, only consider risky workers + for ($j = $riskyWorkers; $j < $totalWorkers; $j++) { + /** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */ + if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) { + // If idle worker found, give to him + return $j; + } + } + + // If no idle workers, give to random risky worker + $worker = rand($riskyWorkers, $totalWorkers - 1); + Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}"); + return $worker; + } + + // If safe request, give to any idle worker + // Its fine to pick risky worker here, because it's idle. Idle is never actually risky for ($i = 0; $i < $totalWorkers; $i++) { if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { return $i; } } - return rand(0, $totalWorkers - 1); - } - $riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1 - - // Each worker has numeric ID, starting from 0 and incrementing - // From 0 to riskyWorkers, we consider safe workers - // From riskyWorkers to totalWorkers, we consider risky workers - $riskyWorkers = (int) floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers - - $domain = ''; - // max up to 3 as first line has request details and second line has host - $lines = explode("\n", $data, 3); - $request = $lines[0]; - if (count($lines) > 1) { - $domain = trim(explode('Host: ', $lines[1])[1]); - } - - // Sync executions are considered risky - $risky = false; - if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) { - $risky = true; - } elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) { - $risky = true; - } elseif ($domains->get(md5($domain), 'value') === 1) { - // executions request coming from custom domain - $risky = true; - } - - if ($risky) { - // If risky request, only consider risky workers - for ($j = $riskyWorkers; $j < $totalWorkers; $j++) { - /** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */ - if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) { - // If idle worker found, give to him - return $j; - } - } - - // If no idle workers, give to random risky worker - $worker = rand($riskyWorkers, $totalWorkers - 1); - Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}"); + // If no idle worker found, give to random safe worker + // We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky. + $worker = rand(0, $riskyWorkers - 1); + Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}"); return $worker; - } - - // If safe request, give to any idle worker - // Its fine to pick risky worker here, because it's idle. Idle is never actually risky - for ($i = 0; $i < $totalWorkers; $i++) { - if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) { - return $i; - } - } - - // If no idle worker found, give to random safe worker - // We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky. - $worker = rand(0, $riskyWorkers - 1); - Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}"); - return $worker; + }; + $workerId = $resolveWorkerId($server, $data); + $server->bind($fd, $workerId); + return $workerId; } + +$http + ->set([ + Constant::OPTION_WORKER_NUM => $totalWorkers, + Constant::OPTION_DISPATCH_FUNC => dispatch(...), + Constant::OPTION_DISPATCH_MODE => SWOOLE_DISPATCH_UIDMOD, + Constant::OPTION_HTTP_COMPRESSION => false, + Constant::OPTION_PACKAGE_MAX_LENGTH => $payloadSize, + Constant::OPTION_OUTPUT_BUFFER_SIZE => $payloadSize, + Constant::OPTION_TASK_WORKER_NUM => 1, // required for the task to fetch domains background + ]); + +$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) { + Console::success('Worker ' . ++$workerId . ' started successfully'); +}); + +$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server, $workerId) { + Console::success('Starting reload...'); +}); + +$http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) { + Console::success('Reload completed...'); +}); + include __DIR__ . '/controllers/general.php'; function createDatabase(App $app, string $resourceKey, string $dbName, array $collections, mixed $pools, callable $extraSetup = null): void