Merge pull request #9016 from appwrite/feat-safe-workers

Feat: Safe workers
This commit is contained in:
Steven Nguyen 2024-11-22 11:07:18 -08:00 committed by GitHub
commit 54ec39f513
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -9,6 +9,7 @@ use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Http\Server;
use Swoole\Process;
use Swoole\Table;
use Utopia\Abuse\Adapters\Database\TimeLimit;
use Utopia\App;
use Utopia\Audit\Audit;
@ -16,11 +17,13 @@ use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Logger\Log;
use Utopia\Logger\Log\User;
@ -28,6 +31,12 @@ use Utopia\Pools\Group;
use Utopia\Swoole\Files;
use Utopia\System\System;
const DOMAIN_SYNC_TIMER = 30; // 30 seconds
$domains = new Table(1_000_000); // 1 million rows
$domains->column('value', Table::TYPE_INT, 1);
$domains->create();
$http = new Server(
host: "0.0.0.0",
port: System::getEnv('PORT', 80),
@ -35,11 +44,12 @@ $http = new Server(
);
$payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing
$workerNumber = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$totalWorkers = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$http
->set([
'worker_num' => $workerNumber,
'worker_num' => $totalWorkers,
'dispatch_func' => 'dispatch',
'open_http2_protocol' => true,
'http_compression' => false,
'package_max_length' => $payloadSize,
@ -58,6 +68,93 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) {
Console::success('Reload completed...');
});
/**
* Assigns HTTP requests to worker threads by analyzing its payload/content.
*
* Routes requests as 'safe' or 'risky' based on specific content patterns (like POST actions or certain domains)
* to optimize load distribution between the workers. Utilizes `$safeThreadsPercent` to manage risk by assigning
* riskier tasks to a dedicated worker subset. Prefers idle workers, with fallback to random selection if necessary.
* doc: https://openswoole.com/docs/modules/swoole-server/configuration#dispatch_func
*
* @param Server $server Swoole server instance.
* @param int $fd client ID
* @param int $type the type of data and its current state
* @param string|null $data Request content for categorization.
* @global int $totalThreads Total number of workers.
* @return int Chosen worker ID for the request.
*/
function dispatch(Server $server, int $fd, int $type, $data = null): int
{
global $totalWorkers, $domains;
// If data is not set we can send request to any worker
// first we try to pick idle worker, if not we randomly pick a worker
if ($data === null) {
for ($i = 0; $i < $totalWorkers; $i++) {
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
return $i;
}
}
return rand(0, $totalWorkers - 1);
}
$riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1
// Each worker has numeric ID, starting from 0 and incrementing
// From 0 to riskyWorkers, we consider safe workers
// From riskyWorkers to totalWorkers, we consider risky workers
$riskyWorkers = (int) floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers
$domain = '';
// max up to 3 as first line has request details and second line has host
$lines = explode("\n", $data, 3);
$request = $lines[0];
if (count($lines) > 1) {
$domain = trim(explode('Host: ', $lines[1])[1]);
}
// Sync executions are considered risky
$risky = false;
if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) {
$risky = true;
} elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
$risky = true;
} elseif ($domains->get(md5($domain), 'value') === 1) {
// executions request coming from custom domain
$risky = true;
}
if ($risky) {
// If risky request, only consider risky workers
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
// If idle worker found, give to him
return $j;
}
}
// If no idle workers, give to random risky worker
$worker = rand($riskyWorkers, $totalWorkers - 1);
Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}");
return $worker;
}
// If safe request, give to any idle worker
// Its fine to pick risky worker here, because it's idle. Idle is never actually risky
for ($i = 0; $i < $totalWorkers; $i++) {
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
return $i;
}
}
// If no idle worker found, give to random safe worker
// We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky.
$worker = rand(0, $riskyWorkers - 1);
Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}");
return $worker;
}
include __DIR__ . '/controllers/general.php';
$http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $register) {
@ -213,6 +310,9 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
Console::success('Server started successfully (max payload is ' . number_format($payloadSize) . ' bytes)');
Console::info("Master pid {$http->master_pid}, manager pid {$http->manager_pid}");
// Start the task that starts fetching custom domains
$http->task([], 0);
// listen ctrl + c
Process::signal(2, function () use ($http) {
Console::log('Stop by Ctrl+C');
@ -330,4 +430,59 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
}
});
// Fetch domains every `DOMAIN_SYNC_TIMER` seconds and update in the memory
$http->on('Task', function () use ($register, $domains) {
$lastSyncUpdate = null;
$pools = $register->get('pools');
App::setResource('pools', fn () => $pools);
$app = new App('UTC');
/** @var Utopia\Database\Database $dbForConsole */
$dbForConsole = $app->getResource('dbForConsole');
Console::loop(function () use ($dbForConsole, $domains, &$lastSyncUpdate) {
try {
$time = DateTime::now();
$limit = 1000;
$sum = $limit;
$latestDocument = null;
while ($sum === $limit) {
$queries = [Query::limit($limit)];
if ($latestDocument !== null) {
$queries[] = Query::cursorAfter($latestDocument);
}
if ($lastSyncUpdate != null) {
$queries[] = Query::greaterThanEqual('$updatedAt', $lastSyncUpdate);
}
$queries[] = Query::equal('resourceType', ['function']);
$results = [];
try {
$results = Authorization::skip(fn () => $dbForConsole->find('rules', $queries));
} catch (Throwable $th) {
Console::error($th->getMessage());
}
$sum = count($results);
foreach ($results as $document) {
$domain = $document->getAttribute('domain');
if (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
continue;
}
$domains->set(md5($domain), ['value' => 1]);
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$lastSyncUpdate = $time;
if ($sum > 0) {
Console::log("Sync domains tick: {$sum} domains were updated");
}
} catch (Throwable $th) {
Console::error($th->getMessage());
}
}, DOMAIN_SYNC_TIMER, 0, function ($error) {
Console::error($error);
});
});
$http->start();