diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index 84a311e342..0114fd343c 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -816,22 +816,21 @@ App::post('/v1/databases/:databaseId/collections') $collectionId = $collectionId == 'unique()' ? ID::unique() : $collectionId; // Map aggregate permissions into the multiple permissions they represent. - $permissions = Permission::aggregate($permissions); + $permissions = Permission::aggregate($permissions) ?? []; try { - $dbForProject->createDocument('database_' . $database->getInternalId(), new Document([ + $collection = $dbForProject->createDocument('database_' . $database->getInternalId(), new Document([ '$id' => $collectionId, 'databaseInternalId' => $database->getInternalId(), 'databaseId' => $databaseId, - '$permissions' => $permissions ?? [], + '$permissions' => $permissions, 'documentSecurity' => $documentSecurity, 'enabled' => $enabled, 'name' => $name, 'search' => implode(' ', [$collectionId, $name]), ])); - $collection = $dbForProject->getDocument('database_' . $database->getInternalId(), $collectionId); - $dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions ?? [], documentSecurity: $documentSecurity); + $dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions, documentSecurity: $documentSecurity); } catch (DuplicateException) { throw new Exception(Exception::COLLECTION_ALREADY_EXISTS); } catch (LimitException) { diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index f4581df8e4..60a8c0ca97 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -135,6 +135,7 @@ App::get('/v1/health/cache') foreach ($configs as $key => $config) { foreach ($config as $database) { try { + /** @var \Utopia\Cache\Adapter $adapter */ $adapter = $pools->get($database)->pop()->getResource(); $checkStart = \microtime(true); @@ -191,11 +192,11 @@ App::get('/v1/health/queue') foreach ($configs as $key => $config) { foreach ($config as $database) { + $checkStart = \microtime(true); try { + /** @var Connection $adapter */ $adapter = $pools->get($database)->pop()->getResource(); - $checkStart = \microtime(true); - if ($adapter->ping()) { $output[] = new Document([ 'name' => $key . " ($database)", @@ -249,6 +250,7 @@ App::get('/v1/health/pubsub') foreach ($configs as $key => $config) { foreach ($config as $database) { try { + /** @var \Appwrite\PubSub\Adapter $adapter */ $adapter = $pools->get($database)->pop()->getResource(); $checkStart = \microtime(true); diff --git a/app/init.php b/app/init.php index d062e218e9..c9ec2e0061 100644 --- a/app/init.php +++ b/app/init.php @@ -42,6 +42,7 @@ use Appwrite\Hooks\Hooks; use Appwrite\Network\Validator\Email; use Appwrite\Network\Validator\Origin; use Appwrite\OpenSSL\OpenSSL; +use Appwrite\PubSub\Adapter\Redis as PubSub; use Appwrite\URL\URL as AppwriteURL; use Appwrite\Utopia\Request; use MaxMind\Db\Reader; @@ -973,7 +974,10 @@ $register->set('pools', function () { $adapter->setDatabase($dsn->getPath()); break; case 'pubsub': - $adapter = $resource(); + $adapter = match ($dsn->getScheme()) { + 'redis' => new PubSub($resource()), + default => null + }; break; case 'queue': $adapter = match ($dsn->getScheme()) { diff --git a/app/realtime.php b/app/realtime.php index 1b59eb3bc7..d38192b83c 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -365,17 +365,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); - - if ($redis->ping(true)) { + /** @var \Appwrite\PubSub\Adapter $pubsub */ + $pubsub = $register->get('pools')->get('pubsub')->pop()->getResource(); + if ($pubsub->ping(true)) { $attempts = 0; Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); } else { Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { + $pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { diff --git a/docker-compose.yml b/docker-compose.yml index 479ca38b8f..f2845dc137 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1053,4 +1053,4 @@ volumes: appwrite-certificates: appwrite-functions: appwrite-builds: - appwrite-config: + appwrite-config: \ No newline at end of file diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index c437d4d487..dceafacf6e 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -7,7 +7,6 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; -use Utopia\System\System; class Realtime extends Adapter { @@ -139,20 +138,26 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $redis = new \Redis(); //TODO: make this part of the constructor - $redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', '')); - $redis->publish('realtime', json_encode([ - 'project' => $projectId, - 'roles' => $roles, - 'permissionsChanged' => $permissionsChanged, - 'userId' => $userId, - 'data' => [ - 'events' => $events, - 'channels' => $channels, - 'timestamp' => DateTime::formatTz(DateTime::now()), - 'payload' => $payload - ] - ])); + global $register; + $pubsub = $register->get('pools')->get('pubsub')->pop(); + try { + /** @var \Appwrite\PubSub\Adapter $redis */ + $redis = $pubsub->getResource(); + $redis->publish('realtime', json_encode([ + 'project' => $projectId, + 'roles' => $roles, + 'permissionsChanged' => $permissionsChanged, + 'userId' => $userId, + 'data' => [ + 'events' => $events, + 'channels' => $channels, + 'timestamp' => DateTime::formatTz(DateTime::now()), + 'payload' => $payload + ] + ])); + } finally { + $pubsub->reclaim(); + } } /** diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index 82d1ca2d59..c43afea527 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; +use Appwrite\PubSub\Adapter; use Utopia\App; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -158,6 +159,7 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { + /** @var Adapter $adapter */ $adapter = $pools->get($pool)->pop()->getResource(); if ($adapter->ping()) { diff --git a/src/Appwrite/PubSub/Adapter.php b/src/Appwrite/PubSub/Adapter.php new file mode 100644 index 0000000000..e5ddbe5e62 --- /dev/null +++ b/src/Appwrite/PubSub/Adapter.php @@ -0,0 +1,13 @@ +client = $client; + + } + + public function ping($message = null): bool + { + return $this->client->ping($message); + } + + public function subscribe($channels, $callback) + { + return $this->client->subscribe($channels, $callback); + } + + public function publish($channel, $message) + { + return $this->client->publish($channel, $message); + } +} diff --git a/tests/unit/Event/EventTest.php b/tests/unit/Event/EventTest.php index dd9833378f..079bb47b65 100644 --- a/tests/unit/Event/EventTest.php +++ b/tests/unit/Event/EventTest.php @@ -3,13 +3,9 @@ namespace Tests\Unit\Event; use Appwrite\Event\Event; -use Appwrite\URL\URL; use InvalidArgumentException; use PHPUnit\Framework\TestCase; -use Utopia\DSN\DSN; -use Utopia\Queue; use Utopia\Queue\Client; -use Utopia\System\System; require_once __DIR__ . '/../../../app/init.php'; @@ -20,19 +16,8 @@ class EventTest extends TestCase public function setUp(): void { - $fallbackForRedis = 'redis_main=' . URL::unparse([ - 'scheme' => 'redis', - 'host' => System::getEnv('_APP_REDIS_HOST', 'redis'), - 'port' => System::getEnv('_APP_REDIS_PORT', '6379'), - 'user' => System::getEnv('_APP_REDIS_USER', ''), - 'pass' => System::getEnv('_APP_REDIS_PASS', ''), - ]); - - $dsn = System::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis); - $dsn = explode('=', $dsn); - $dsn = $dsn[1] ?? ''; - $dsn = new DSN($dsn); - $connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + global $register; + $connection = $register->get('pools')->get('queue')->pop()->getResource(); $this->queue = 'v1-tests' . uniqid(); $this->object = new Event($connection); $this->object->setClass('TestsV1'); diff --git a/tests/unit/Usage/StatsTest.php b/tests/unit/Usage/StatsTest.php index 67e39d8974..79fa1f58ec 100644 --- a/tests/unit/Usage/StatsTest.php +++ b/tests/unit/Usage/StatsTest.php @@ -2,13 +2,9 @@ namespace Tests\Unit\Usage; -use Appwrite\URL\URL as AppwriteURL; use PHPUnit\Framework\TestCase; -use Utopia\DSN\DSN; -use Utopia\Queue; use Utopia\Queue\Client; use Utopia\Queue\Connection; -use Utopia\System\System; class StatsTest extends TestCase { @@ -19,18 +15,9 @@ class StatsTest extends TestCase public function setUp(): void { - $env = System::getEnv('_APP_CONNECTIONS_QUEUE', 'redis_main=' . AppwriteURL::unparse([ - 'scheme' => 'redis', - 'host' => System::getEnv('_APP_REDIS_HOST', 'redis'), - 'port' => System::getEnv('_APP_REDIS_PORT', '6379'), - 'user' => System::getEnv('_APP_REDIS_USER', ''), - 'pass' => System::getEnv('_APP_REDIS_PASS', ''), - ])); - - $dsn = explode('=', $env); - $dsn = count($dsn) > 1 ? $dsn[1] : $dsn[0]; - $dsn = new DSN($dsn); - $this->connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + global $register; + $connection = $register->get('pools')->get('queue')->pop()->getResource(); + $this->connection = $connection; $this->client = new Client(self::QUEUE_NAME, $this->connection); }