Merge branch '1.6.x' of https://github.com/appwrite/appwrite into multi-region-support

This commit is contained in:
shimon 2024-11-24 10:30:54 +02:00
commit e91c762953
9 changed files with 352 additions and 58 deletions

View file

@ -328,7 +328,8 @@ App::post('/v1/functions')
if (!empty($functionsDomain)) {
$routeSubdomain = ID::unique();
$domain = "{$routeSubdomain}.{$functionsDomain}";
$ruleId = md5($domain);
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
$ruleId = version_compare(APP_VERSION_STABLE, '1.7.0', '<') ? ID::unique() : md5($domain);
$rule = Authorization::skip(
fn () => $dbForConsole->createDocument('rules', new Document([

View file

@ -11,6 +11,7 @@ use Utopia\App;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Exception\Query as QueryException;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Query;
use Utopia\Database\Validator\Query\Cursor;
use Utopia\Database\Validator\UID;
@ -59,8 +60,16 @@ App::post('/v1/proxy/rules')
throw new Exception(Exception::GENERAL_ARGUMENT_INVALID, 'This domain name is not allowed. Please pick another one.');
}
$ruleId = md5($domain);
$document = $dbForConsole->getDocument('rules', $ruleId);
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (version_compare(APP_VERSION_STABLE, '1.7.0', '<')) {
$document = $dbForConsole->findOne('rules', [
Query::equal('domain', [$domain]),
]);
} else {
$ruleId = md5($domain);
$document = $dbForConsole->getDocument('rules', $ruleId);
}
if (!$document->isEmpty()) {
if ($document->getAttribute('projectId') === $project->getId()) {
@ -101,7 +110,9 @@ App::post('/v1/proxy/rules')
throw new Exception(Exception::GENERAL_ARGUMENT_INVALID, 'Domain may not start with http:// or https://.');
}
$ruleId = md5($domain->get());
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
$ruleId = version_compare(APP_VERSION_STABLE, '1.7.0', '<') ? ID::unique() : md5($domain->get());
$rule = new Document([
'$id' => $ruleId,
'projectId' => $project->getId(),

View file

@ -1060,7 +1060,8 @@ App::patch('/v1/teams/:teamId/memberships/:membershipId/status')
throw new Exception(Exception::TEAM_INVITE_MISMATCH, 'Invite does not belong to current user (' . $user->getAttribute('email') . ')');
}
if ($user->isEmpty()) {
$hasSession = !$user->isEmpty();
if (!$hasSession) {
$user->setAttributes($dbForProject->getDocument('users', $userId)->getArrayCopy()); // Get user
}
@ -1079,39 +1080,64 @@ App::patch('/v1/teams/:teamId/memberships/:membershipId/status')
Authorization::skip(fn () => $dbForProject->updateDocument('users', $user->getId(), $user->setAttribute('emailVerification', true)));
// Log user in
// Create session for the user if not logged in
if (!$hasSession) {
Authorization::setRole(Role::user($user->getId())->toString());
Authorization::setRole(Role::user($user->getId())->toString());
$detector = new Detector($request->getUserAgent('UNKNOWN'));
$record = $geodb->get($request->getIP());
$authDuration = $project->getAttribute('auths', [])['duration'] ?? Auth::TOKEN_EXPIRATION_LOGIN_LONG;
$expire = DateTime::addSeconds(new \DateTime(), $authDuration);
$secret = Auth::tokenGenerator();
$session = new Document(array_merge([
'$id' => ID::unique(),
'$permissions' => [
Permission::read(Role::user($user->getId())),
Permission::update(Role::user($user->getId())),
Permission::delete(Role::user($user->getId())),
],
'userId' => $user->getId(),
'userInternalId' => $user->getInternalId(),
'provider' => Auth::SESSION_PROVIDER_EMAIL,
'providerUid' => $user->getAttribute('email'),
'secret' => Auth::hash($secret), // One way hash encryption to protect DB leak
'userAgent' => $request->getUserAgent('UNKNOWN'),
'ip' => $request->getIP(),
'factors' => ['email'],
'countryCode' => ($record) ? \strtolower($record['country']['iso_code']) : '--',
'expire' => DateTime::addSeconds(new \DateTime(), $authDuration)
], $detector->getOS(), $detector->getClient(), $detector->getDevice()));
$detector = new Detector($request->getUserAgent('UNKNOWN'));
$record = $geodb->get($request->getIP());
$authDuration = $project->getAttribute('auths', [])['duration'] ?? Auth::TOKEN_EXPIRATION_LOGIN_LONG;
$expire = DateTime::addSeconds(new \DateTime(), $authDuration);
$secret = Auth::tokenGenerator();
$session = new Document(array_merge([
'$id' => ID::unique(),
'userId' => $user->getId(),
'userInternalId' => $user->getInternalId(),
'provider' => Auth::SESSION_PROVIDER_EMAIL,
'providerUid' => $user->getAttribute('email'),
'secret' => Auth::hash($secret), // One way hash encryption to protect DB leak
'userAgent' => $request->getUserAgent('UNKNOWN'),
'ip' => $request->getIP(),
'factors' => ['email'],
'countryCode' => ($record) ? \strtolower($record['country']['iso_code']) : '--',
'expire' => DateTime::addSeconds(new \DateTime(), $authDuration)
], $detector->getOS(), $detector->getClient(), $detector->getDevice()));
$session = $dbForProject->createDocument('sessions', $session);
$session = $dbForProject->createDocument('sessions', $session
->setAttribute('$permissions', [
Permission::read(Role::user($user->getId())),
Permission::update(Role::user($user->getId())),
Permission::delete(Role::user($user->getId())),
]));
Authorization::setRole(Role::user($userId)->toString());
$dbForProject->purgeCachedDocument('users', $user->getId());
if (!Config::getParam('domainVerification')) {
$response->addHeader('X-Fallback-Cookies', \json_encode([Auth::$cookieName => Auth::encodeSession($user->getId(), $secret)]));
}
Authorization::setRole(Role::user($userId)->toString());
$response
->addCookie(
name: Auth::$cookieName . '_legacy',
value: Auth::encodeSession($user->getId(), $secret),
expire: (new \DateTime($expire))->getTimestamp(),
path: '/',
domain: Config::getParam('cookieDomain'),
secure: ('https' === $protocol),
httponly: true
)
->addCookie(
name: Auth::$cookieName,
value: Auth::encodeSession($user->getId(), $secret),
expire: (new \DateTime($expire))->getTimestamp(),
path: '/',
domain: Config::getParam('cookieDomain'),
secure: ('https' === $protocol),
httponly: true,
sameSite: Config::getParam('cookieSamesite')
)
;
}
$membership = $dbForProject->updateDocument('memberships', $membership->getId(), $membership);
@ -1125,22 +1151,11 @@ App::patch('/v1/teams/:teamId/memberships/:membershipId/status')
->setParam('membershipId', $membership->getId())
;
if (!Config::getParam('domainVerification')) {
$response
->addHeader('X-Fallback-Cookies', \json_encode([Auth::$cookieName => Auth::encodeSession($user->getId(), $secret)]))
;
}
$response
->addCookie(Auth::$cookieName . '_legacy', Auth::encodeSession($user->getId(), $secret), (new \DateTime($expire))->getTimestamp(), '/', Config::getParam('cookieDomain'), ('https' == $protocol), true, null)
->addCookie(Auth::$cookieName, Auth::encodeSession($user->getId(), $secret), (new \DateTime($expire))->getTimestamp(), '/', Config::getParam('cookieDomain'), ('https' == $protocol), true, Config::getParam('cookieSamesite'))
;
$response->dynamic(
$membership
->setAttribute('teamName', $team->getAttribute('name'))
->setAttribute('userName', $user->getAttribute('name'))
->setAttribute('userEmail', $user->getAttribute('email')),
->setAttribute('teamName', $team->getAttribute('name'))
->setAttribute('userName', $user->getAttribute('name'))
->setAttribute('userEmail', $user->getAttribute('email')),
Response::MODEL_MEMBERSHIP
);
});

View file

@ -1804,6 +1804,7 @@ App::post('/v1/users/:userId/sessions')
'provider' => Auth::SESSION_PROVIDER_SERVER,
'secret' => Auth::hash($secret), // One way hash encryption to protect DB leak
'userAgent' => $request->getUserAgent('UNKNOWN'),
'factors' => ['server'],
'ip' => $request->getIP(),
'countryCode' => ($record) ? \strtolower($record['country']['iso_code']) : '--',
'expire' => $expire,
@ -1816,8 +1817,11 @@ App::post('/v1/users/:userId/sessions')
$countryName = $locale->getText('countries.' . strtolower($session->getAttribute('countryCode')), $locale->getText('locale.country.unknown'));
$session = $dbForProject->createDocument('sessions', $session);
$dbForProject->purgeCachedDocument('users', $user->getId());
$session
->setAttribute('secret', $secret)
->setAttribute('secret', Auth::encodeSession($user->getId(), $secret))
->setAttribute('countryName', $countryName);
$queueForEvents

View file

@ -29,6 +29,7 @@ use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Domains\Domain;
use Utopia\DSN\DSN;
@ -51,7 +52,17 @@ function router(App $utopia, Database $dbForConsole, callable $getProjectDB, Swo
$host = $request->getHostname() ?? '';
$route = Authorization::skip(fn () => $dbForConsole->getDocument('rules', md5($host)));
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (version_compare(APP_VERSION_STABLE, '1.7.0', '<')) {
$route = Authorization::skip(
fn () => $dbForConsole->find('rules', [
Query::equal('domain', [$host]),
Query::limit(1)
])
)[0] ?? new Document();
} else {
$route = Authorization::skip(fn () => $dbForConsole->getDocument('rules', md5($host)));
}
if ($route->isEmpty()) {
if ($host === System::getEnv('_APP_DOMAIN_FUNCTIONS', '')) {
@ -512,18 +523,31 @@ App::init()
if (!empty($envDomain) && $envDomain !== 'localhost') {
$mainDomain = $envDomain;
} else {
$domainDocument = $dbForConsole->getDocument('rules', md5($envDomain));
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (version_compare(APP_VERSION_STABLE, '1.7.0', '<')) {
$domainDocument = $dbForConsole->findOne('rules', [Query::orderAsc('$id')]);
} else {
$domainDocument = $dbForConsole->getDocument('rules', md5($envDomain));
}
$mainDomain = !$domainDocument->isEmpty() ? $domainDocument->getAttribute('domain') : $domain->get();
}
if ($mainDomain !== $domain->get()) {
Console::warning($domain->get() . ' is not a main domain. Skipping SSL certificate generation.');
} else {
$domainDocument = $dbForConsole->getDocument('rules', md5($domain->get()));
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (version_compare(APP_VERSION_STABLE, '1.7.0', '<')) {
$domainDocument = $dbForConsole->findOne('rules', [
Query::equal('domain', [$domain->get()])
]);
} else {
$domainDocument = $dbForConsole->getDocument('rules', md5($domain->get()));
}
if ($domainDocument->isEmpty()) {
$domainDocument = new Document([
'$id' => md5($domain->get()),
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
'$id' => version_compare(APP_VERSION_STABLE, '1.7.0', '<') ? ID::unique() : md5($domain->get()),
'domain' => $domain->get(),
'resourceType' => 'api',
'status' => 'verifying',

View file

@ -9,6 +9,7 @@ use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use Swoole\Http\Server;
use Swoole\Process;
use Swoole\Table;
use Utopia\Abuse\Adapters\Database\TimeLimit;
use Utopia\App;
use Utopia\Audit\Audit;
@ -16,11 +17,13 @@ use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Logger\Log;
use Utopia\Logger\Log\User;
@ -28,6 +31,12 @@ use Utopia\Pools\Group;
use Utopia\Swoole\Files;
use Utopia\System\System;
const DOMAIN_SYNC_TIMER = 30; // 30 seconds
$domains = new Table(1_000_000); // 1 million rows
$domains->column('value', Table::TYPE_INT, 1);
$domains->create();
$http = new Server(
host: "0.0.0.0",
port: System::getEnv('PORT', 80),
@ -35,11 +44,12 @@ $http = new Server(
);
$payloadSize = 12 * (1024 * 1024); // 12MB - adding slight buffer for headers and other data that might be sent with the payload - update later with valid testing
$workerNumber = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$totalWorkers = swoole_cpu_num() * intval(System::getEnv('_APP_WORKER_PER_CORE', 6));
$http
->set([
'worker_num' => $workerNumber,
'worker_num' => $totalWorkers,
'dispatch_func' => 'dispatch',
'open_http2_protocol' => true,
'http_compression' => false,
'package_max_length' => $payloadSize,
@ -58,6 +68,93 @@ $http->on(Constant::EVENT_AFTER_RELOAD, function ($server, $workerId) {
Console::success('Reload completed...');
});
/**
* Assigns HTTP requests to worker threads by analyzing its payload/content.
*
* Routes requests as 'safe' or 'risky' based on specific content patterns (like POST actions or certain domains)
* to optimize load distribution between the workers. Utilizes `$safeThreadsPercent` to manage risk by assigning
* riskier tasks to a dedicated worker subset. Prefers idle workers, with fallback to random selection if necessary.
* doc: https://openswoole.com/docs/modules/swoole-server/configuration#dispatch_func
*
* @param Server $server Swoole server instance.
* @param int $fd client ID
* @param int $type the type of data and its current state
* @param string|null $data Request content for categorization.
* @global int $totalThreads Total number of workers.
* @return int Chosen worker ID for the request.
*/
function dispatch(Server $server, int $fd, int $type, $data = null): int
{
global $totalWorkers, $domains;
// If data is not set we can send request to any worker
// first we try to pick idle worker, if not we randomly pick a worker
if ($data === null) {
for ($i = 0; $i < $totalWorkers; $i++) {
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
return $i;
}
}
return rand(0, $totalWorkers - 1);
}
$riskyWorkersPercent = intval(System::getEnv('_APP_RISKY_WORKERS_PERCENT', 80)) / 100; // Decimal form 0 to 1
// Each worker has numeric ID, starting from 0 and incrementing
// From 0 to riskyWorkers, we consider safe workers
// From riskyWorkers to totalWorkers, we consider risky workers
$riskyWorkers = (int) floor($totalWorkers * $riskyWorkersPercent); // Absolute amount of risky workers
$domain = '';
// max up to 3 as first line has request details and second line has host
$lines = explode("\n", $data, 3);
$request = $lines[0];
if (count($lines) > 1) {
$domain = trim(explode('Host: ', $lines[1])[1]);
}
// Sync executions are considered risky
$risky = false;
if (str_starts_with($request, 'POST') && str_contains($request, '/executions')) {
$risky = true;
} elseif (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
$risky = true;
} elseif ($domains->get(md5($domain), 'value') === 1) {
// executions request coming from custom domain
$risky = true;
}
if ($risky) {
// If risky request, only consider risky workers
for ($j = $riskyWorkers; $j < $totalWorkers; $j++) {
/** Reference https://openswoole.com/docs/modules/swoole-server-getWorkerStatus#description */
if ($server->getWorkerStatus($j) === SWOOLE_WORKER_IDLE) {
// If idle worker found, give to him
return $j;
}
}
// If no idle workers, give to random risky worker
$worker = rand($riskyWorkers, $totalWorkers - 1);
Console::warning("swoole_dispatch: Risky branch: did not find a idle worker, picking random worker {$worker}");
return $worker;
}
// If safe request, give to any idle worker
// Its fine to pick risky worker here, because it's idle. Idle is never actually risky
for ($i = 0; $i < $totalWorkers; $i++) {
if ($server->getWorkerStatus($i) === SWOOLE_WORKER_IDLE) {
return $i;
}
}
// If no idle worker found, give to random safe worker
// We avoid risky workers here, as it could be in work - not idle. Thats exactly when they are risky.
$worker = rand(0, $riskyWorkers - 1);
Console::warning("swoole_dispatch: Non-risky branch: did not find a idle worker, picking random worker {$worker}");
return $worker;
}
include __DIR__ . '/controllers/general.php';
$http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $register) {
@ -213,6 +310,9 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
Console::success('Server started successfully (max payload is ' . number_format($payloadSize) . ' bytes)');
Console::info("Master pid {$http->master_pid}, manager pid {$http->manager_pid}");
// Start the task that starts fetching custom domains
$http->task([], 0);
// listen ctrl + c
Process::signal(2, function () use ($http) {
Console::log('Stop by Ctrl+C');
@ -330,4 +430,59 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
}
});
// Fetch domains every `DOMAIN_SYNC_TIMER` seconds and update in the memory
$http->on('Task', function () use ($register, $domains) {
$lastSyncUpdate = null;
$pools = $register->get('pools');
App::setResource('pools', fn () => $pools);
$app = new App('UTC');
/** @var Utopia\Database\Database $dbForConsole */
$dbForConsole = $app->getResource('dbForConsole');
Console::loop(function () use ($dbForConsole, $domains, &$lastSyncUpdate) {
try {
$time = DateTime::now();
$limit = 1000;
$sum = $limit;
$latestDocument = null;
while ($sum === $limit) {
$queries = [Query::limit($limit)];
if ($latestDocument !== null) {
$queries[] = Query::cursorAfter($latestDocument);
}
if ($lastSyncUpdate != null) {
$queries[] = Query::greaterThanEqual('$updatedAt', $lastSyncUpdate);
}
$queries[] = Query::equal('resourceType', ['function']);
$results = [];
try {
$results = Authorization::skip(fn () => $dbForConsole->find('rules', $queries));
} catch (Throwable $th) {
Console::error($th->getMessage());
}
$sum = count($results);
foreach ($results as $document) {
$domain = $document->getAttribute('domain');
if (str_ends_with($domain, System::getEnv('_APP_DOMAIN_FUNCTIONS'))) {
continue;
}
$domains->set(md5($domain), ['value' => 1]);
}
$latestDocument = !empty(array_key_last($results)) ? $results[array_key_last($results)] : null;
}
$lastSyncUpdate = $time;
if ($sum > 0) {
Console::log("Sync domains tick: {$sum} domains were updated");
}
} catch (Throwable $th) {
Console::error($th->getMessage());
}
}, DOMAIN_SYNC_TIMER, 0, function ($error) {
Console::error($error);
});
});
$http->start();

View file

@ -477,8 +477,14 @@ class Certificates extends Action
*/
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{
$rule = $dbForConsole->getDocument('rules', md5($domain));
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (version_compare(APP_VERSION_STABLE, '1.7.0', '<')) {
$rule = $dbForConsole->findOne('rules', [
Query::equal('domain', [$domain]),
]);
} else {
$rule = $dbForConsole->getDocument('rules', md5($domain));
}
if (!$rule->isEmpty()) {
$rule->setAttribute('certificateId', $certificateId);

View file

@ -559,6 +559,76 @@ trait TeamsBaseClient
return $data;
}
/**
* @depends testCreateTeam
*/
public function testUpdateMembershipWithSession(array $data): void
{
$teamUid = $data['teamUid'] ?? '';
// create user
$response = $this->client->call(Client::METHOD_POST, '/account', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], [
'userId' => 'unique()',
'email' => uniqid() . 'foe@localhost.test',
'password' => 'password',
'name' => 'test'
]);
$this->assertEquals(201, $response['headers']['status-code']);
$user = $response['body'];
// create session
$response = $this->client->call(Client::METHOD_POST, '/account/sessions', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], [
'email' => $user['email'],
'password' => 'password'
]);
$this->assertEquals(201, $response['headers']['status-code']);
$session = $response['cookies']['a_session_' . $this->getProject()['$id']];
$response = $this->client->call(Client::METHOD_POST, '/teams/' . $teamUid . '/memberships', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'email' => $user['email'],
'roles' => ['developer'],
'url' => 'http://localhost:5000/join-us#title'
]);
$this->assertEquals(201, $response['headers']['status-code']);
$lastEmail = $this->getLastEmail();
$secret = substr($lastEmail['text'], strpos($lastEmail['text'], '&secret=', 0) + 8, 256);
$membershipUid = substr($lastEmail['text'], strpos($lastEmail['text'], '?membershipId=', 0) + 14, 20);
$userUid = substr($lastEmail['text'], strpos($lastEmail['text'], '&userId=', 0) + 8, 20);
$response = $this->client->call(Client::METHOD_PATCH, '/teams/' . $teamUid . '/memberships/' . $membershipUid . '/status', [
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'cookie' => 'a_session_' . $this->getProject()['$id'] . '=' . $session,
], [
'secret' => $secret,
'userId' => $userUid,
]);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertNotEmpty($response['body']['$id']);
$this->assertNotEmpty($response['body']['userId']);
$this->assertNotEmpty($response['body']['teamId']);
$this->assertCount(1, $response['body']['roles']);
$this->assertEmpty($response['cookies']);
}
/**
* @depends testUpdateTeamMembership
*/
@ -648,7 +718,7 @@ trait TeamsBaseClient
], $this->getHeaders()));
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals(3, $response['body']['total']);
$this->assertEquals(4, $response['body']['total']);
$ownerMembershipUid = $response['body']['memberships'][0]['$id'];
@ -703,7 +773,7 @@ trait TeamsBaseClient
], $this->getHeaders()));
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals(2, $response['body']['total']);
$this->assertEquals(3, $response['body']['total']);
/**
* Test for when the owner tries to delete their membership

View file

@ -310,6 +310,14 @@ trait UsersBase
$this->assertNotEmpty($session['secret']);
$this->assertNotEmpty($session['expire']);
$this->assertEquals('server', $session['provider']);
$response = $this->client->call(Client::METHOD_GET, '/account', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-session' => $session['secret']
]);
$this->assertEquals(200, $response['headers']['status-code']);
}