mirror of
https://github.com/appwrite/appwrite
synced 2026-05-22 00:18:25 +00:00
Merge pull request #9687 from appwrite/PLA-2807
feat(dispatch): add contextual dispatch logic
This commit is contained in:
commit
09ca143231
1 changed files with 89 additions and 83 deletions
172
app/http.php
172
app/http.php
|
|
@ -43,29 +43,6 @@ $http = new Server(
|
|||
$payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing
|
||||
$totalWorkers = intval(System::getEnv('_APP_CPU_NUM', swoole_cpu_num())) * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
|
||||
|
||||
$http
|
||||
->set([
|
||||
'worker_num' => $totalWorkers,
|
||||
'dispatch_func' => 'dispatch',
|
||||
'open_http2_protocol' => true,
|
||||
'http_compression' => false,
|
||||
'package_max_length' => $payloadSize,
|
||||
'buffer_output_size' => $payloadSize,
|
||||
'task_worker_num' => 1, // required for the task to fetch domains background
|
||||
]);
|
||||
|
||||
$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) {
|
||||
Console::success('Worker ' . ++$workerId . ' started successfully');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server, $workerId) {
|
||||
Console::success('Starting reload...');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) {
|
||||
Console::success('Reload completed...');
|
||||
});
|
||||
|
||||
/**
|
||||
* Assigns HTTP requests to worker threads by analyzing its payload/content.
|
||||
*
|
||||
|
|
@ -83,76 +60,105 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) {
|
|||
*/
|
||||
function dispatch(Server $server, int $fd, int $type, $data = null): int
|
||||
{
|
||||
global $totalWorkers, $domains;
|
||||
$resolveWorkerId = function (Server $server, $data = null) {
|
||||
global $totalWorkers, $domains;
|
||||
|
||||
// If data is not set we can send request to any worker
|
||||
// first we try to pick idle worker, if not we randomly pick a worker
|
||||
if ($data === null) {
|
||||
// If data is not set we can send request to any worker
|
||||
// first we try to pick idle worker, if not we randomly pick a worker
|
||||
if ($data === null) {
|
||||
for ($i = 0; $i < $totalWorkers; $i++) {
|
||||
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
|
||||
return $i;
|
||||
}
|
||||
}
|
||||
return rand(0, $totalWorkers - 1);
|
||||
}
|
||||
|
||||
$riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1
|
||||
|
||||
// Each worker has numeric ID, starting from 0 and incrementing
|
||||
// From 0 to riskyWorkers, we consider safe workers
|
||||
// From riskyWorkers to totalWorkers, we consider risky workers
|
||||
$riskyWorkers = (int)floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers
|
||||
|
||||
$domain = '';
|
||||
// max up to 3 as first line has request details and second line has host
|
||||
$lines = explode("\n", $data, 3);
|
||||
$request = $lines[0];
|
||||
if (count($lines) > 1) {
|
||||
$domain = trim(explode('Host: ', $lines[1])[1]);
|
||||
}
|
||||
|
||||
// Sync executions are considered risky
|
||||
$risky = false;
|
||||
if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) {
|
||||
$risky = true;
|
||||
} elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
|
||||
$risky = true;
|
||||
} elseif ($domains->get(md5($domain), 'value') === 1) {
|
||||
// executions request coming from custom domain
|
||||
$risky = true;
|
||||
}
|
||||
|
||||
if ($risky) {
|
||||
// If risky request, only consider risky workers
|
||||
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
|
||||
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
|
||||
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
|
||||
// If idle worker found, give to him
|
||||
return $j;
|
||||
}
|
||||
}
|
||||
|
||||
// If no idle workers, give to random risky worker
|
||||
$worker = rand($riskyWorkers, $totalWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
return $worker;
|
||||
}
|
||||
|
||||
// If safe request, give to any idle worker
|
||||
// Its fine to pick risky worker here, because it's idle. Idle is never actually risky
|
||||
for ($i = 0; $i < $totalWorkers; $i++) {
|
||||
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
|
||||
return $i;
|
||||
}
|
||||
}
|
||||
return rand(0, $totalWorkers - 1);
|
||||
}
|
||||
|
||||
$riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1
|
||||
|
||||
// Each worker has numeric ID, starting from 0 and incrementing
|
||||
// From 0 to riskyWorkers, we consider safe workers
|
||||
// From riskyWorkers to totalWorkers, we consider risky workers
|
||||
$riskyWorkers = (int) floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers
|
||||
|
||||
$domain = '';
|
||||
// max up to 3 as first line has request details and second line has host
|
||||
$lines = explode("\n", $data, 3);
|
||||
$request = $lines[0];
|
||||
if (count($lines) > 1) {
|
||||
$domain = trim(explode('Host: ', $lines[1])[1]);
|
||||
}
|
||||
|
||||
// Sync executions are considered risky
|
||||
$risky = false;
|
||||
if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) {
|
||||
$risky = true;
|
||||
} elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
|
||||
$risky = true;
|
||||
} elseif ($domains->get(md5($domain), 'value') === 1) {
|
||||
// executions request coming from custom domain
|
||||
$risky = true;
|
||||
}
|
||||
|
||||
if ($risky) {
|
||||
// If risky request, only consider risky workers
|
||||
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
|
||||
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
|
||||
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
|
||||
// If idle worker found, give to him
|
||||
return $j;
|
||||
}
|
||||
}
|
||||
|
||||
// If no idle workers, give to random risky worker
|
||||
$worker = rand($riskyWorkers, $totalWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
// If no idle worker found, give to random safe worker
|
||||
// We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky.
|
||||
$worker = rand(0, $riskyWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
return $worker;
|
||||
}
|
||||
|
||||
// If safe request, give to any idle worker
|
||||
// Its fine to pick risky worker here, because it's idle. Idle is never actually risky
|
||||
for ($i = 0; $i < $totalWorkers; $i++) {
|
||||
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
|
||||
return $i;
|
||||
}
|
||||
}
|
||||
|
||||
// If no idle worker found, give to random safe worker
|
||||
// We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky.
|
||||
$worker = rand(0, $riskyWorkers - 1);
|
||||
Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}");
|
||||
return $worker;
|
||||
};
|
||||
$workerId = $resolveWorkerId($server, $data);
|
||||
$server->bind($fd, $workerId);
|
||||
return $workerId;
|
||||
}
|
||||
|
||||
|
||||
$http
|
||||
->set([
|
||||
Constant::OPTION_WORKER_NUM => $totalWorkers,
|
||||
Constant::OPTION_DISPATCH_FUNC => dispatch(...),
|
||||
Constant::OPTION_DISPATCH_MODE => SWOOLE_DISPATCH_UIDMOD,
|
||||
Constant::OPTION_HTTP_COMPRESSION => false,
|
||||
Constant::OPTION_PACKAGE_MAX_LENGTH => $payloadSize,
|
||||
Constant::OPTION_OUTPUT_BUFFER_SIZE => $payloadSize,
|
||||
Constant::OPTION_TASK_WORKER_NUM => 1, // required for the task to fetch domains background
|
||||
]);
|
||||
|
||||
$http->on(Constant::EVENT_WORKER_START, function ($server, $workerId) {
|
||||
Console::success('Worker ' . ++$workerId . ' started successfully');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_BEFORE_RELOAD, function ($server, $workerId) {
|
||||
Console::success('Starting reload...');
|
||||
});
|
||||
|
||||
$http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) {
|
||||
Console::success('Reload completed...');
|
||||
});
|
||||
|
||||
include __DIR__ . '/controllers/general.php';
|
||||
|
||||
function createDatabase(App $app, string $resourceKey, string $dbName, array $collections, mixed $pools, callable $extraSetup = null): void
|
||||
|
|
|
|||
Loading…
Reference in a new issue