fix: realtime auth

This commit is contained in:
Binyamin Yawitz 2024-06-05 09:08:27 -04:00
parent 72d5e94c97
commit 6a7ee3df8a
No known key found for this signature in database

View file

@ -49,23 +49,19 @@ require_once __DIR__ . '/init2.php';
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
$auth = new Dependency();
$cache = new Dependency();
$getProjectDB = new Dependency();
$auth
->setName('auth')
->setCallback(fn() => new Authorization());
$getProjectDB
->setName('getProjectDB')
->inject('pools')
->inject('dbForConsole')
->inject('cache')
->inject('auth')
->inject('authorization')
->inject('connections')
->setCallback(function (array $pools, Database $dbForConsole, Cache $cache, Authorization $auth, Connections $connections) {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $auth, $connections): Database {
->setCallback(function (array $pools, Database $dbForConsole, Cache $cache, Authorization $authorization, Connections $connections) {
return function (Document $project) use ($pools, $dbForConsole, $cache, &$databases, $authorization, $connections): Database {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForConsole;
}
@ -85,7 +81,7 @@ $getProjectDB
$adapter->setDatabase($dsn->getPath());
$database = new Database($adapter, $cache);
$database->setAuthorization($auth);
$database->setAuthorization($authorization);
$database->setNamespace('_' . $project->getInternalId());
return $database;
@ -98,7 +94,6 @@ $cache
return new Cache(new None());
});
$container->set($auth);
$container->set($cache);
$container->set($getProjectDB);
@ -167,7 +162,7 @@ $server->error($logError);
$server->onStart(function () use ($stats, $container, $containerId, &$statsDocument, $logError) {
sleep(5); // wait for the initial database schema to be ready
Console::success('Server started successfully');
$auth = $container->get('auth');
$authorization = $container->get('authorization');
/**
* Create document for this worker to share stats across Containers.
*/
@ -187,8 +182,8 @@ $server->onStart(function () use ($stats, $container, $containerId, &$statsDocum
'value' => '{}'
]);
$auth = $container->get('auth');
$statsDocument = $auth->skip(fn() => $database->createDocument('realtime', $document));
$authorization = $container->get('authorization');
$statsDocument = $authorization->skip(fn () => $database->createDocument('realtime', $document));
break;
} catch (Throwable) {
Console::warning("Collection not ready. Retrying connection ({$attempts})...");
@ -201,7 +196,7 @@ $server->onStart(function () use ($stats, $container, $containerId, &$statsDocum
/**
* Save current connections to the Database every 5 seconds.
*/
Timer::tick(5000, function () use ($container, $stats, &$statsDocument, $logError, $auth) {
Timer::tick(5000, function () use ($container, $stats, &$statsDocument, $logError, $authorization) {
$payload = [];
foreach ($stats as $projectId => $value) {
$payload[$projectId] = $stats->get($projectId, 'connectionsTotal');
@ -217,7 +212,7 @@ $server->onStart(function () use ($stats, $container, $containerId, &$statsDocum
->setAttribute('timestamp', DateTime::now())
->setAttribute('value', json_encode($payload));
$auth->skip(fn() => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
$authorization->skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
} catch (Throwable $th) {
call_user_func($logError, $th, "updateWorkerDocument");
} finally {
@ -232,9 +227,9 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
$attempts = 0;
$start = time();
$auth = $container->get('auth');
$authorization = $container->get('authorization');
Timer::tick(5000, function () use ($server, $container, $realtime, $stats, $logError, $auth) {
Timer::tick(5000, function () use ($server, $container, $realtime, $stats, $logError, $authorization) {
/**
* Sending current connections to project channels on the console project every 5 seconds.
*/
@ -243,7 +238,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
$payload = [];
$list = $auth->skip(fn() => $database->find('realtime', [
$list = $authorization->skip(fn () => $database->find('realtime', [
Query::greaterThan('timestamp', DateTime::addSeconds(new \DateTime(), -15)),
]));
@ -337,7 +332,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $realtime, $auth, $container) {
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $realtime, $authorization, $container) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {
@ -348,12 +343,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
$connection = array_key_first(reset($realtime->subscriptions[$projectId]['user:' . $userId]));
$consoleDatabase = $container->get('dbForConsole');
$project = $auth->skip(fn() => $consoleDatabase->getDocument('projects', $projectId));
$project = $authorization->skip(fn () => $consoleDatabase->getDocument('projects', $projectId));
$database = $container->get('getProjectDB')($project);
$user = $database->getDocument('users', $userId);
$roles = Auth::getRoles($user, $auth);
$roles = Auth::getRoles($user, $authorization);
$realtime->subscribe($projectId, $connection, $roles, $realtime->connections[$connection]['channels']);
//TODO NOW $global->get('pools')->reclaim();
@ -397,7 +392,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $container, $stats
});
$server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $container, $stats, &$realtime, $logError) {
$auth = $container->get('auth');
$authorization = $container->get('authorization');
$request = new Request(new UtopiaRequest($request));
$response = new Response(new UtopiaResponse(new SwooleResponse()));
@ -405,8 +400,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$requestInjection = new Dependency();
$responseInjection = new Dependency();
$requestInjection->setName('request')->setCallback(fn() => $request);
$responseInjection->setName('response')->setCallback(fn() => $response);
$requestInjection->setName('request')->setCallback(fn () => $request);
$responseInjection->setName('response')->setCallback(fn () => $response);
$container->set($requestInjection);
$container->set($responseInjection);
@ -428,7 +423,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
if (
array_key_exists('realtime', $project->getAttribute('apis', []))
&& !$project->getAttribute('apis', [])['realtime']
&& !(Auth::isPrivilegedUser($auth->getRoles()) || Auth::isAppUser($auth->getRoles()))
&& !(Auth::isPrivilegedUser($authorization->getRoles()) || Auth::isAppUser($authorization->getRoles()))
) {
throw new AppwriteException(AppwriteException::GENERAL_API_DISABLED);
}
@ -467,8 +462,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $originValidator->getDescription());
}
$auth = $container->get('auth');
$roles = Auth::getRoles($user, $auth);
$authorization = $container->get('authorization');
$roles = Auth::getRoles($user, $authorization);
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
@ -528,8 +523,8 @@ $server->onMessage(function (int $connection, string $message) use ($server, $co
$database = $container->get('dbForConsole');
if ($projectId !== 'console') {
$auth = $container->get('auth');
$project = $auth->skip(fn() => $database->getDocument('projects', $projectId));
$authorization = $container->get('authorization');
$project = $authorization->skip(fn () => $database->getDocument('projects', $projectId));
$database = $container->get('getProjectDB')($project);
} else {
$project = null;
@ -581,7 +576,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $co
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Session is not valid.');
}
$roles = Auth::getRoles($user, $auth);
$roles = Auth::getRoles($user, $authorization);
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels);