Merge branch '1.6.x' into feat-custom-cf-hostnames

This commit is contained in:
Fabian Gruber 2024-11-06 12:40:56 +01:00
commit b4c986839e
11 changed files with 89 additions and 62 deletions

View file

@ -816,22 +816,21 @@ App::post('/v1/databases/:databaseId/collections')
$collectionId = $collectionId == 'unique()' ? ID::unique() : $collectionId;
// Map aggregate permissions into the multiple permissions they represent.
$permissions = Permission::aggregate($permissions);
$permissions = Permission::aggregate($permissions) ?? [];
try {
$dbForProject->createDocument('database_' . $database->getInternalId(), new Document([
$collection = $dbForProject->createDocument('database_' . $database->getInternalId(), new Document([
'$id' => $collectionId,
'databaseInternalId' => $database->getInternalId(),
'databaseId' => $databaseId,
'$permissions' => $permissions ?? [],
'$permissions' => $permissions,
'documentSecurity' => $documentSecurity,
'enabled' => $enabled,
'name' => $name,
'search' => implode(' ', [$collectionId, $name]),
]));
$collection = $dbForProject->getDocument('database_' . $database->getInternalId(), $collectionId);
$dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions ?? [], documentSecurity: $documentSecurity);
$dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions, documentSecurity: $documentSecurity);
} catch (DuplicateException) {
throw new Exception(Exception::COLLECTION_ALREADY_EXISTS);
} catch (LimitException) {

View file

@ -135,6 +135,7 @@ App::get('/v1/health/cache')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
/** @var \Utopia\Cache\Adapter $adapter */
$adapter = $pools->get($database)->pop()->getResource();
$checkStart = \microtime(true);
@ -191,11 +192,11 @@ App::get('/v1/health/queue')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
$checkStart = \microtime(true);
try {
/** @var Connection $adapter */
$adapter = $pools->get($database)->pop()->getResource();
$checkStart = \microtime(true);
if ($adapter->ping()) {
$output[] = new Document([
'name' => $key . " ($database)",
@ -249,6 +250,7 @@ App::get('/v1/health/pubsub')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
/** @var \Appwrite\PubSub\Adapter $adapter */
$adapter = $pools->get($database)->pop()->getResource();
$checkStart = \microtime(true);

View file

@ -42,6 +42,7 @@ use Appwrite\Hooks\Hooks;
use Appwrite\Network\Validator\Email;
use Appwrite\Network\Validator\Origin;
use Appwrite\OpenSSL\OpenSSL;
use Appwrite\PubSub\Adapter\Redis as PubSub;
use Appwrite\URL\URL as AppwriteURL;
use Appwrite\Utopia\Request;
use MaxMind\Db\Reader;
@ -973,7 +974,10 @@ $register->set('pools', function () {
$adapter->setDatabase($dsn->getPath());
break;
case 'pubsub':
$adapter = $resource();
$adapter = match ($dsn->getScheme()) {
'redis' => new PubSub($resource()),
default => null
};
break;
case 'queue':
$adapter = match ($dsn->getScheme()) {

View file

@ -365,17 +365,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
$start = time();
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {
/** @var \Appwrite\PubSub\Adapter $pubsub */
$pubsub = $register->get('pools')->get('pubsub')->pop()->getResource();
if ($pubsub->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
} else {
Console::error('Pub/sub failed (worker: ' . $workerId . ')');
}
$redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
$pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) {
$event = json_decode($payload, true);
if ($event['permissionsChanged'] && isset($event['userId'])) {

View file

@ -1053,4 +1053,4 @@ volumes:
appwrite-certificates:
appwrite-functions:
appwrite-builds:
appwrite-config:
appwrite-config:

View file

@ -7,7 +7,6 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\System\System;
class Realtime extends Adapter
{
@ -139,20 +138,26 @@ class Realtime extends Adapter
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
$redis = new \Redis(); //TODO: make this part of the constructor
$redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([
'project' => $projectId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'events' => $events,
'channels' => $channels,
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
]));
global $register;
$pubsub = $register->get('pools')->get('pubsub')->pop();
try {
/** @var \Appwrite\PubSub\Adapter $redis */
$redis = $pubsub->getResource();
$redis->publish('realtime', json_encode([
'project' => $projectId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'events' => $events,
'channels' => $channels,
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
]));
} finally {
$pubsub->reclaim();
}
}
/**

View file

@ -3,6 +3,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\ClamAV\Network;
use Appwrite\PubSub\Adapter;
use Utopia\App;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -158,6 +159,7 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $pool) {
try {
/** @var Adapter $adapter */
$adapter = $pools->get($pool)->pop()->getResource();
if ($adapter->ping()) {

View file

@ -0,0 +1,13 @@
<?php
namespace Appwrite\PubSub;
interface Adapter
{
public function ping($message = null): bool;
public function subscribe($channels, $callback);
public function publish($channel, $message);
}

View file

@ -0,0 +1,31 @@
<?php
namespace Appwrite\PubSub\Adapter;
use Appwrite\PubSub\Adapter;
class Redis implements Adapter
{
private \Redis $client;
public function __construct(\Redis $client)
{
$this->client = $client;
}
public function ping($message = null): bool
{
return $this->client->ping($message);
}
public function subscribe($channels, $callback)
{
return $this->client->subscribe($channels, $callback);
}
public function publish($channel, $message)
{
return $this->client->publish($channel, $message);
}
}

View file

@ -3,13 +3,9 @@
namespace Tests\Unit\Event;
use Appwrite\Event\Event;
use Appwrite\URL\URL;
use InvalidArgumentException;
use PHPUnit\Framework\TestCase;
use Utopia\DSN\DSN;
use Utopia\Queue;
use Utopia\Queue\Client;
use Utopia\System\System;
require_once __DIR__ . '/../../../app/init.php';
@ -20,19 +16,8 @@ class EventTest extends TestCase
public function setUp(): void
{
$fallbackForRedis = 'redis_main=' . URL::unparse([
'scheme' => 'redis',
'host' => System::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => System::getEnv('_APP_REDIS_PORT', '6379'),
'user' => System::getEnv('_APP_REDIS_USER', ''),
'pass' => System::getEnv('_APP_REDIS_PASS', ''),
]);
$dsn = System::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis);
$dsn = explode('=', $dsn);
$dsn = $dsn[1] ?? '';
$dsn = new DSN($dsn);
$connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort());
global $register;
$connection = $register->get('pools')->get('queue')->pop()->getResource();
$this->queue = 'v1-tests' . uniqid();
$this->object = new Event($connection);
$this->object->setClass('TestsV1');

View file

@ -2,13 +2,9 @@
namespace Tests\Unit\Usage;
use Appwrite\URL\URL as AppwriteURL;
use PHPUnit\Framework\TestCase;
use Utopia\DSN\DSN;
use Utopia\Queue;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\System\System;
class StatsTest extends TestCase
{
@ -19,18 +15,9 @@ class StatsTest extends TestCase
public function setUp(): void
{
$env = System::getEnv('_APP_CONNECTIONS_QUEUE', 'redis_main=' . AppwriteURL::unparse([
'scheme' => 'redis',
'host' => System::getEnv('_APP_REDIS_HOST', 'redis'),
'port' => System::getEnv('_APP_REDIS_PORT', '6379'),
'user' => System::getEnv('_APP_REDIS_USER', ''),
'pass' => System::getEnv('_APP_REDIS_PASS', ''),
]));
$dsn = explode('=', $env);
$dsn = count($dsn) > 1 ? $dsn[1] : $dsn[0];
$dsn = new DSN($dsn);
$this->connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort());
global $register;
$connection = $register->get('pools')->get('queue')->pop()->getResource();
$this->connection = $connection;
$this->client = new Client(self::QUEUE_NAME, $this->connection);
}