diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index f764c4c3ea..76a3ef8b61 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -274,7 +274,6 @@ $createSession = function (string $userId, string $secret, Request $request, Res App::post('/v1/account') ->desc('Create account') ->groups(['api', 'account', 'auth']) - ->label('event', 'users.[userId].create') ->label('scope', 'sessions.write') ->label('auth.type', 'emailPassword') ->label('audits.event', 'user.create') @@ -297,9 +296,8 @@ App::post('/v1/account') ->inject('user') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Hooks $hooks) { $email = \strtolower($email); if ('console' === $project->getId()) { @@ -409,8 +407,6 @@ App::post('/v1/account') Authorization::setRole(Role::user($user->getId())->toString()); Authorization::setRole(Role::users()->toString()); - $queueForEvents->setParam('userId', $user->getId()); - $response ->setStatusCode(Response::STATUS_CODE_CREATED) ->dynamic($user, Response::MODEL_ACCOUNT); @@ -442,7 +438,6 @@ App::get('/v1/account') App::delete('/v1/account') ->desc('Delete account') ->groups(['api', 'account']) - ->label('event', 'users.[userId].delete') ->label('scope', 'account') ->label('audits.event', 'user.delete') ->label('audits.resource', 'user/{response.$id}') @@ -1499,6 +1494,7 @@ App::get('/v1/account/sessions/oauth2/:provider/redirect') 'providerType' => MESSAGE_TYPE_EMAIL, 'identifier' => $email, ])); + } catch (Duplicate) { $failureRedirect(Exception::USER_ALREADY_EXISTS); } diff --git a/app/controllers/api/databases.php b/app/controllers/api/databases.php index 84a311e342..0114fd343c 100644 --- a/app/controllers/api/databases.php +++ b/app/controllers/api/databases.php @@ -816,22 +816,21 @@ App::post('/v1/databases/:databaseId/collections') $collectionId = $collectionId == 'unique()' ? ID::unique() : $collectionId; // Map aggregate permissions into the multiple permissions they represent. - $permissions = Permission::aggregate($permissions); + $permissions = Permission::aggregate($permissions) ?? []; try { - $dbForProject->createDocument('database_' . $database->getInternalId(), new Document([ + $collection = $dbForProject->createDocument('database_' . $database->getInternalId(), new Document([ '$id' => $collectionId, 'databaseInternalId' => $database->getInternalId(), 'databaseId' => $databaseId, - '$permissions' => $permissions ?? [], + '$permissions' => $permissions, 'documentSecurity' => $documentSecurity, 'enabled' => $enabled, 'name' => $name, 'search' => implode(' ', [$collectionId, $name]), ])); - $collection = $dbForProject->getDocument('database_' . $database->getInternalId(), $collectionId); - $dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions ?? [], documentSecurity: $documentSecurity); + $dbForProject->createCollection('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), permissions: $permissions, documentSecurity: $documentSecurity); } catch (DuplicateException) { throw new Exception(Exception::COLLECTION_ALREADY_EXISTS); } catch (LimitException) { diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index f4581df8e4..60a8c0ca97 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -135,6 +135,7 @@ App::get('/v1/health/cache') foreach ($configs as $key => $config) { foreach ($config as $database) { try { + /** @var \Utopia\Cache\Adapter $adapter */ $adapter = $pools->get($database)->pop()->getResource(); $checkStart = \microtime(true); @@ -191,11 +192,11 @@ App::get('/v1/health/queue') foreach ($configs as $key => $config) { foreach ($config as $database) { + $checkStart = \microtime(true); try { + /** @var Connection $adapter */ $adapter = $pools->get($database)->pop()->getResource(); - $checkStart = \microtime(true); - if ($adapter->ping()) { $output[] = new Document([ 'name' => $key . " ($database)", @@ -249,6 +250,7 @@ App::get('/v1/health/pubsub') foreach ($configs as $key => $config) { foreach ($config as $database) { try { + /** @var \Appwrite\PubSub\Adapter $adapter */ $adapter = $pools->get($database)->pop()->getResource(); $checkStart = \microtime(true); diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 314e298d6b..42f7a59f54 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -51,7 +51,7 @@ use Utopia\Validator\Text; use Utopia\Validator\WhiteList; /** TODO: Remove function when we move to using utopia/platform */ -function createUser(string $hash, mixed $hashOptions, string $userId, ?string $email, ?string $password, ?string $phone, string $name, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks): Document +function createUser(string $hash, mixed $hashOptions, string $userId, ?string $email, ?string $password, ?string $phone, string $name, Document $project, Database $dbForProject, Hooks $hooks): Document { $plaintextPassword = $password; $hashOptionsObject = (\is_string($hashOptions)) ? \json_decode($hashOptions, true) : $hashOptions; // Cast to JSON array @@ -176,15 +176,12 @@ function createUser(string $hash, mixed $hashOptions, string $userId, ?string $e throw new Exception(Exception::USER_ALREADY_EXISTS); } - $queueForEvents->setParam('userId', $user->getId()); - return $user; } App::post('/v1/users') ->desc('Create user') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -203,10 +200,9 @@ App::post('/v1/users') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) ->dynamic($user, Response::MODEL_USER); @@ -215,7 +211,6 @@ App::post('/v1/users') App::post('/v1/users/bcrypt') ->desc('Create user with bcrypt password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -233,10 +228,9 @@ App::post('/v1/users/bcrypt') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('bcrypt', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('bcrypt', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -246,7 +240,6 @@ App::post('/v1/users/bcrypt') App::post('/v1/users/md5') ->desc('Create user with MD5 password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -264,10 +257,9 @@ App::post('/v1/users/md5') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('md5', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('md5', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -277,7 +269,6 @@ App::post('/v1/users/md5') App::post('/v1/users/argon2') ->desc('Create user with Argon2 password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -295,10 +286,9 @@ App::post('/v1/users/argon2') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('argon2', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('argon2', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -308,7 +298,6 @@ App::post('/v1/users/argon2') App::post('/v1/users/sha') ->desc('Create user with SHA password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -327,16 +316,15 @@ App::post('/v1/users/sha') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordVersion, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $passwordVersion, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { $options = '{}'; if (!empty($passwordVersion)) { $options = '{"version":"' . $passwordVersion . '"}'; } - $user = createUser('sha', $options, $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + $user = createUser('sha', $options, $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -346,7 +334,6 @@ App::post('/v1/users/sha') App::post('/v1/users/phpass') ->desc('Create user with PHPass password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -364,10 +351,9 @@ App::post('/v1/users/phpass') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('phpass', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('phpass', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -377,7 +363,6 @@ App::post('/v1/users/phpass') App::post('/v1/users/scrypt') ->desc('Create user with Scrypt password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -400,9 +385,8 @@ App::post('/v1/users/scrypt') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordSalt, int $passwordCpu, int $passwordMemory, int $passwordParallel, int $passwordLength, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $passwordSalt, int $passwordCpu, int $passwordMemory, int $passwordParallel, int $passwordLength, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { $options = [ 'salt' => $passwordSalt, 'costCpu' => $passwordCpu, @@ -411,7 +395,7 @@ App::post('/v1/users/scrypt') 'length' => $passwordLength ]; - $user = createUser('scrypt', \json_encode($options), $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + $user = createUser('scrypt', \json_encode($options), $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -421,7 +405,6 @@ App::post('/v1/users/scrypt') App::post('/v1/users/scrypt-modified') ->desc('Create user with Scrypt modified password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -442,10 +425,9 @@ App::post('/v1/users/scrypt-modified') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordSalt, string $passwordSaltSeparator, string $passwordSignerKey, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('scryptMod', '{"signerKey":"' . $passwordSignerKey . '","saltSeparator":"' . $passwordSaltSeparator . '","salt":"' . $passwordSalt . '"}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $passwordSalt, string $passwordSaltSeparator, string $passwordSignerKey, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('scryptMod', '{"signerKey":"' . $passwordSignerKey . '","saltSeparator":"' . $passwordSaltSeparator . '","salt":"' . $passwordSalt . '"}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 7a5de8af19..f5921bf6e8 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -11,10 +11,11 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Messaging; +use Appwrite\Event\Realtime; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Utopia\Abuse\Abuse; @@ -28,6 +29,7 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Connection; use Utopia\System\System; use Utopia\Validator\WhiteList; @@ -57,8 +59,36 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$databaseListener = function (string $event, Document $document, Document $project, Usage $queueForUsage, Database $dbForProject) { +$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + // Only trigger events for user creation with the database listener. + if ($document->getCollection() !== 'users') { + return; + } + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)); + + // Trigger functions, webhooks, and realtime events + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + if ($queueForEvents->getProject()->getId() === 'console') { + return; + } + + $queueForRealtime + ->from($queueForEvents) + ->trigger(); +}; + +$usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { $value = 1; if ($event === Database::EVENT_DOCUMENT_DELETE) { $value = -1; @@ -353,6 +383,7 @@ App::init() ->inject('response') ->inject('project') ->inject('user') + ->inject('queue') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -362,7 +393,7 @@ App::init() ->inject('queueForUsage') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -456,9 +487,24 @@ App::init() $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); + // Clone the queues, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. + $queueForEventsClone = new Event($queue); + $queueForFunctions = new Func($queue); + $queueForWebhooks = new Webhook($queue); + $queueForRealtime = new Realtime(); + $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( + $document, + $response, + $queueForEventsClone->from($queueForEvents), + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) + )); $useCache = $route->getLabel('cache', false); if ($useCache) { @@ -591,11 +637,13 @@ App::shutdown() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForMessaging') - ->inject('dbForProject') ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') + ->inject('dbForProject') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -604,54 +652,18 @@ App::shutdown() $queueForEvents->setPayload($responsePayload); } - /** - * Trigger functions. - */ - if (!$queueForEvents->isPaused()) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - } - /** - * Trigger webhooks. - */ - $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + $queueForFunctions + ->from($queueForEvents) ->trigger(); - /** - * Trigger realtime. - */ if ($project->getId() !== 'console') { - $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); - $payload = new Document($queueForEvents->getPayload()); - - $db = $queueForEvents->getContext('database'); - $collection = $queueForEvents->getContext('collection'); - $bucket = $queueForEvents->getContext('bucket'); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $payload, - project: $project, - database: $db, - collection: $collection, - bucket: $bucket, - ); - - Realtime::send( - projectId: $target['projectId'] ?? $project->getId(), - payload: $queueForEvents->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $queueForEvents->getParam('userId') - ] - ); + $queueForRealtime + ->from($queueForEvents) + ->trigger(); } } diff --git a/app/init.php b/app/init.php index e962c58b67..c9ec2e0061 100644 --- a/app/init.php +++ b/app/init.php @@ -31,7 +31,9 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\Specification; use Appwrite\GraphQL\Promises\Adapter\Swoole; @@ -40,6 +42,7 @@ use Appwrite\Hooks\Hooks; use Appwrite\Network\Validator\Email; use Appwrite\Network\Validator\Origin; use Appwrite\OpenSSL\OpenSSL; +use Appwrite\PubSub\Adapter\Redis as PubSub; use Appwrite\URL\URL as AppwriteURL; use Appwrite\Utopia\Request; use MaxMind\Db\Reader; @@ -971,7 +974,10 @@ $register->set('pools', function () { $adapter->setDatabase($dsn->getPath()); break; case 'pubsub': - $adapter = $resource(); + $adapter = match ($dsn->getScheme()) { + 'redis' => new PubSub($resource()), + default => null + }; break; case 'queue': $adapter = match ($dsn->getScheme()) { @@ -1134,6 +1140,12 @@ App::setResource('queueForDeletes', function (Connection $queue) { App::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); +App::setResource('queueForWebhooks', function (Connection $queue) { + return new Webhook($queue); +}, ['queue']); +App::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); App::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); diff --git a/app/realtime.php b/app/realtime.php index 1b59eb3bc7..d38192b83c 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -365,17 +365,16 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ - $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); - - if ($redis->ping(true)) { + /** @var \Appwrite\PubSub\Adapter $pubsub */ + $pubsub = $register->get('pools')->get('pubsub')->pop()->getResource(); + if ($pubsub->ping(true)) { $attempts = 0; Console::success('Pub/sub connection established (worker: ' . $workerId . ')'); } else { Console::error('Pub/sub failed (worker: ' . $workerId . ')'); } - $redis->subscribe(['realtime'], function (Redis $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { + $pubsub->subscribe(['realtime'], function (mixed $redis, string $channel, string $payload) use ($server, $workerId, $stats, $register, $realtime) { $event = json_decode($payload, true); if ($event['permissionsChanged'] && isset($event['userId'])) { diff --git a/composer.json b/composer.json index 5a2df912fd..a04ca51d43 100644 --- a/composer.json +++ b/composer.json @@ -48,10 +48,10 @@ "utopia-php/abuse": "0.43.0", "utopia-php/analytics": "0.10.*", "utopia-php/audit": "0.43.0", - "utopia-php/cache": "0.10.*", + "utopia-php/cache": "0.11.*", "utopia-php/cli": "0.15.*", "utopia-php/config": "0.2.*", - "utopia-php/database": "0.53.13", + "utopia-php/database": "0.53.16", "utopia-php/domains": "0.5.*", "utopia-php/dsn": "0.2.1", "utopia-php/framework": "0.33.*", diff --git a/composer.lock b/composer.lock index 452aefd26f..6dce436601 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f6eb364e8504ebc2f6c9fe38d75f7e86", + "content-hash": "b358198535c1867eabed7c0f99135a57", "packages": [ { "name": "adhocore/jwt", @@ -1574,16 +1574,16 @@ }, { "name": "utopia-php/cache", - "version": "0.10.2", + "version": "0.11.0", "source": { "type": "git", "url": "https://github.com/utopia-php/cache.git", - "reference": "b22c6eb6d308de246b023efd0fc9758aee8b8247" + "reference": "8ebcab5aac7606331cef69b0081f6c9eff2e58bc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/cache/zipball/b22c6eb6d308de246b023efd0fc9758aee8b8247", - "reference": "b22c6eb6d308de246b023efd0fc9758aee8b8247", + "url": "https://api.github.com/repos/utopia-php/cache/zipball/8ebcab5aac7606331cef69b0081f6c9eff2e58bc", + "reference": "8ebcab5aac7606331cef69b0081f6c9eff2e58bc", "shasum": "" }, "require": { @@ -1594,7 +1594,7 @@ }, "require-dev": { "laravel/pint": "1.2.*", - "phpstan/phpstan": "1.9.x-dev", + "phpstan/phpstan": "^1.12", "phpunit/phpunit": "^9.3", "vimeo/psalm": "4.13.1" }, @@ -1618,9 +1618,9 @@ ], "support": { "issues": "https://github.com/utopia-php/cache/issues", - "source": "https://github.com/utopia-php/cache/tree/0.10.2" + "source": "https://github.com/utopia-php/cache/tree/0.11.0" }, - "time": "2024-06-25T20:36:35+00:00" + "time": "2024-11-05T16:53:58+00:00" }, { "name": "utopia-php/cli", @@ -1724,23 +1724,23 @@ }, { "name": "utopia-php/database", - "version": "0.53.13", + "version": "0.53.16", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "a7e5de257e36e1b804d35b307865dd4036baa33e" + "reference": "6661edffeef05b59e16d102b989a72f7f78cf7de" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/a7e5de257e36e1b804d35b307865dd4036baa33e", - "reference": "a7e5de257e36e1b804d35b307865dd4036baa33e", + "url": "https://api.github.com/repos/utopia-php/database/zipball/6661edffeef05b59e16d102b989a72f7f78cf7de", + "reference": "6661edffeef05b59e16d102b989a72f7f78cf7de", "shasum": "" }, "require": { "ext-mbstring": "*", "ext-pdo": "*", "php": ">=8.3", - "utopia-php/cache": "0.10.*", + "utopia-php/cache": "0.11.*", "utopia-php/framework": "0.33.*", "utopia-php/mongo": "0.3.*" }, @@ -1774,9 +1774,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/0.53.13" + "source": "https://github.com/utopia-php/database/tree/0.53.16" }, - "time": "2024-11-05T10:08:05+00:00" + "time": "2024-11-06T03:07:16+00:00" }, { "name": "utopia-php/domains", @@ -2495,16 +2495,16 @@ }, { "name": "utopia-php/queue", - "version": "0.7.0", + "version": "0.7.1", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "917565256eb94bcab7246f7a746b1a486813761b" + "reference": "94c240d9f6383829807ce7b2d737f04b159fd3e8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/917565256eb94bcab7246f7a746b1a486813761b", - "reference": "917565256eb94bcab7246f7a746b1a486813761b", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/94c240d9f6383829807ce7b2d737f04b159fd3e8", + "reference": "94c240d9f6383829807ce7b2d737f04b159fd3e8", "shasum": "" }, "require": { @@ -2550,9 +2550,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/0.7.0" + "source": "https://github.com/utopia-php/queue/tree/0.7.1" }, - "time": "2024-01-17T19:00:43+00:00" + "time": "2024-11-05T17:00:38+00:00" }, { "name": "utopia-php/registry", @@ -2770,22 +2770,22 @@ }, { "name": "utopia-php/vcs", - "version": "0.8.2", + "version": "0.8.3", "source": { "type": "git", "url": "https://github.com/utopia-php/vcs.git", - "reference": "eb9b7eade1a46a4f660e0d5a6304f7fa26ec9d18" + "reference": "a032ed0611a8f4467aeaa9484f73223074457337" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/vcs/zipball/eb9b7eade1a46a4f660e0d5a6304f7fa26ec9d18", - "reference": "eb9b7eade1a46a4f660e0d5a6304f7fa26ec9d18", + "url": "https://api.github.com/repos/utopia-php/vcs/zipball/a032ed0611a8f4467aeaa9484f73223074457337", + "reference": "a032ed0611a8f4467aeaa9484f73223074457337", "shasum": "" }, "require": { "adhocore/jwt": "^1.1", "php": ">=8.0", - "utopia-php/cache": "^0.10.0", + "utopia-php/cache": "^0.11.0", "utopia-php/framework": "0.*.*" }, "require-dev": { @@ -2813,9 +2813,9 @@ ], "support": { "issues": "https://github.com/utopia-php/vcs/issues", - "source": "https://github.com/utopia-php/vcs/tree/0.8.2" + "source": "https://github.com/utopia-php/vcs/tree/0.8.3" }, - "time": "2024-08-13T14:36:30+00:00" + "time": "2024-11-05T17:10:09+00:00" }, { "name": "utopia-php/websocket", @@ -4004,16 +4004,16 @@ }, { "name": "phpdocumentor/reflection-docblock", - "version": "5.4.1", + "version": "5.5.0", "source": { "type": "git", "url": "https://github.com/phpDocumentor/ReflectionDocBlock.git", - "reference": "9d07b3f7fdcf5efec5d1609cba3c19c5ea2bdc9c" + "reference": "54e10d44fc1a84e2598d26f70d4f6f1f233e228a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpDocumentor/ReflectionDocBlock/zipball/9d07b3f7fdcf5efec5d1609cba3c19c5ea2bdc9c", - "reference": "9d07b3f7fdcf5efec5d1609cba3c19c5ea2bdc9c", + "url": "https://api.github.com/repos/phpDocumentor/ReflectionDocBlock/zipball/54e10d44fc1a84e2598d26f70d4f6f1f233e228a", + "reference": "54e10d44fc1a84e2598d26f70d4f6f1f233e228a", "shasum": "" }, "require": { @@ -4026,13 +4026,13 @@ "webmozart/assert": "^1.9.1" }, "require-dev": { - "mockery/mockery": "~1.3.5", + "mockery/mockery": "~1.3.5 || ~1.6.0", "phpstan/extension-installer": "^1.1", "phpstan/phpstan": "^1.8", "phpstan/phpstan-mockery": "^1.1", "phpstan/phpstan-webmozart-assert": "^1.2", "phpunit/phpunit": "^9.5", - "vimeo/psalm": "^5.13" + "psalm/phar": "^5.26" }, "type": "library", "extra": { @@ -4062,9 +4062,9 @@ "description": "With this component, a library can provide support for annotations via DocBlocks or otherwise retrieve information that is embedded in a DocBlock.", "support": { "issues": "https://github.com/phpDocumentor/ReflectionDocBlock/issues", - "source": "https://github.com/phpDocumentor/ReflectionDocBlock/tree/5.4.1" + "source": "https://github.com/phpDocumentor/ReflectionDocBlock/tree/5.5.0" }, - "time": "2024-05-21T05:55:05+00:00" + "time": "2024-11-04T21:26:31+00:00" }, { "name": "phpdocumentor/type-resolver", diff --git a/docker-compose.yml b/docker-compose.yml index 479ca38b8f..f2845dc137 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1053,4 +1053,4 @@ volumes: appwrite-certificates: appwrite-functions: appwrite-builds: - appwrite-config: + appwrite-config: \ No newline at end of file diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 43eda511df..3f166ad7a4 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -204,19 +204,6 @@ class Event return $this->payload; } - public function getRealtimePayload(): array - { - $payload = []; - - foreach ($this->payload as $key => $value) { - if (!isset($this->sensitive[$key])) { - $payload[$key] = $value; - } - } - - return $payload; - } - /** * Set context for this event. * @@ -315,10 +302,6 @@ class Event */ public function trigger(): string|bool { - if ($this->paused) { - return false; - } - $client = new Client($this->queue, $this->connection); return $client->enqueue([ @@ -530,20 +513,21 @@ class Event } /** - * Get the value of paused + * Generate a function event from a base event + * + * @param Event $event + * + * @return self + * */ - public function isPaused(): bool + public function from(Event $event): self { - return $this->paused; - } - - /** - * Set the value of paused - */ - public function setPaused(bool $paused): self - { - $this->paused = $paused; - + $this->project = $event->getProject(); + $this->user = $event->getUser(); + $this->payload = $event->getPayload(); + $this->event = $event->getEvent(); + $this->params = $event->getParams(); + $this->context = $event->context; return $this; } } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 4dad5802f7..11a445d8ed 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -213,10 +213,6 @@ class Func extends Event */ public function trigger(): string|bool { - if ($this->paused) { - return false; - } - $client = new Client($this->queue, $this->connection); $events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null; @@ -238,22 +234,4 @@ class Func extends Event 'method' => $this->method, ]); } - - /** - * Generate a function event from a base event - * - * @param Event $event - * - * @return self - * - */ - public function from(Event $event): self - { - $this->project = $event->getProject(); - $this->user = $event->getUser(); - $this->payload = $event->getPayload(); - $this->event = $event->getEvent(); - $this->params = $event->getParams(); - return $this; - } } diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php new file mode 100644 index 0000000000..e158076f9b --- /dev/null +++ b/src/Appwrite/Event/Realtime.php @@ -0,0 +1,70 @@ +payload as $key => $value) { + if (!isset($this->sensitive[$key])) { + $payload[$key] = $value; + } + } + + return $payload; + } + + /** + * Execute Event. + * + * @return string|bool + * @throws InvalidArgumentException + */ + public function trigger(): string|bool + { + if (empty($this->event)) { + return false; + } + + $allEvents = Event::generateEvents($this->getEvent(), $this->getParams()); + $payload = new Document($this->getPayload()); + + $db = $this->getContext('database'); + $collection = $this->getContext('collection'); + $bucket = $this->getContext('bucket'); + + $target = RealtimeAdapter::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $payload, + project: $this->getProject(), + database: $db, + collection: $collection, + bucket: $bucket, + ); + + RealtimeAdapter::send( + projectId: $target['projectId'] ?? $this->getProject()->getId(), + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + + return true; + } +} diff --git a/src/Appwrite/Event/Webhook.php b/src/Appwrite/Event/Webhook.php new file mode 100644 index 0000000000..125c9a78d5 --- /dev/null +++ b/src/Appwrite/Event/Webhook.php @@ -0,0 +1,17 @@ +setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME); + } +} diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index c437d4d487..dceafacf6e 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -7,7 +7,6 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\ID; use Utopia\Database\Helpers\Role; -use Utopia\System\System; class Realtime extends Adapter { @@ -139,20 +138,26 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - $redis = new \Redis(); //TODO: make this part of the constructor - $redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', '')); - $redis->publish('realtime', json_encode([ - 'project' => $projectId, - 'roles' => $roles, - 'permissionsChanged' => $permissionsChanged, - 'userId' => $userId, - 'data' => [ - 'events' => $events, - 'channels' => $channels, - 'timestamp' => DateTime::formatTz(DateTime::now()), - 'payload' => $payload - ] - ])); + global $register; + $pubsub = $register->get('pools')->get('pubsub')->pop(); + try { + /** @var \Appwrite\PubSub\Adapter $redis */ + $redis = $pubsub->getResource(); + $redis->publish('realtime', json_encode([ + 'project' => $projectId, + 'roles' => $roles, + 'permissionsChanged' => $permissionsChanged, + 'userId' => $userId, + 'data' => [ + 'events' => $events, + 'channels' => $channels, + 'timestamp' => DateTime::formatTz(DateTime::now()), + 'payload' => $payload + ] + ])); + } finally { + $pubsub->reclaim(); + } } /** diff --git a/src/Appwrite/Platform/Tasks/Doctor.php b/src/Appwrite/Platform/Tasks/Doctor.php index 82d1ca2d59..c43afea527 100644 --- a/src/Appwrite/Platform/Tasks/Doctor.php +++ b/src/Appwrite/Platform/Tasks/Doctor.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\ClamAV\Network; +use Appwrite\PubSub\Adapter; use Utopia\App; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -158,6 +159,7 @@ class Doctor extends Action foreach ($configs as $key => $config) { foreach ($config as $pool) { try { + /** @var Adapter $adapter */ $adapter = $pools->get($pool)->pop()->getResource(); if ($adapter->ping()) { diff --git a/src/Appwrite/PubSub/Adapter.php b/src/Appwrite/PubSub/Adapter.php new file mode 100644 index 0000000000..e5ddbe5e62 --- /dev/null +++ b/src/Appwrite/PubSub/Adapter.php @@ -0,0 +1,13 @@ +client = $client; + + } + + public function ping($message = null): bool + { + return $this->client->ping($message); + } + + public function subscribe($channels, $callback) + { + return $this->client->subscribe($channels, $callback); + } + + public function publish($channel, $message) + { + return $this->client->publish($channel, $message); + } +} diff --git a/tests/e2e/Services/Webhooks/WebhooksBase.php b/tests/e2e/Services/Webhooks/WebhooksBase.php index 6be3e16c1f..2ef41003ee 100644 --- a/tests/e2e/Services/Webhooks/WebhooksBase.php +++ b/tests/e2e/Services/Webhooks/WebhooksBase.php @@ -901,6 +901,17 @@ trait WebhooksBase $teamId = $data['teamId'] ?? ''; $email = uniqid() . 'friend@localhost.test'; + // Create user to ensure team event is triggered after user event + $user = $this->client->call(Client::METHOD_POST, '/account', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'userId' => ID::unique(), + 'email' => $email, + 'password' => 'password', + 'name' => 'Friend User', + ]); + /** * Test for SUCCESS */ @@ -909,7 +920,6 @@ trait WebhooksBase 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), [ 'email' => $email, - 'name' => 'Friend User', 'roles' => ['admin', 'editor'], 'url' => 'http://localhost:5000/join-us#title' ]); diff --git a/tests/unit/Event/EventTest.php b/tests/unit/Event/EventTest.php index dd9833378f..079bb47b65 100644 --- a/tests/unit/Event/EventTest.php +++ b/tests/unit/Event/EventTest.php @@ -3,13 +3,9 @@ namespace Tests\Unit\Event; use Appwrite\Event\Event; -use Appwrite\URL\URL; use InvalidArgumentException; use PHPUnit\Framework\TestCase; -use Utopia\DSN\DSN; -use Utopia\Queue; use Utopia\Queue\Client; -use Utopia\System\System; require_once __DIR__ . '/../../../app/init.php'; @@ -20,19 +16,8 @@ class EventTest extends TestCase public function setUp(): void { - $fallbackForRedis = 'redis_main=' . URL::unparse([ - 'scheme' => 'redis', - 'host' => System::getEnv('_APP_REDIS_HOST', 'redis'), - 'port' => System::getEnv('_APP_REDIS_PORT', '6379'), - 'user' => System::getEnv('_APP_REDIS_USER', ''), - 'pass' => System::getEnv('_APP_REDIS_PASS', ''), - ]); - - $dsn = System::getEnv('_APP_CONNECTIONS_QUEUE', $fallbackForRedis); - $dsn = explode('=', $dsn); - $dsn = $dsn[1] ?? ''; - $dsn = new DSN($dsn); - $connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + global $register; + $connection = $register->get('pools')->get('queue')->pop()->getResource(); $this->queue = 'v1-tests' . uniqid(); $this->object = new Event($connection); $this->object->setClass('TestsV1'); diff --git a/tests/unit/Usage/StatsTest.php b/tests/unit/Usage/StatsTest.php index 67e39d8974..79fa1f58ec 100644 --- a/tests/unit/Usage/StatsTest.php +++ b/tests/unit/Usage/StatsTest.php @@ -2,13 +2,9 @@ namespace Tests\Unit\Usage; -use Appwrite\URL\URL as AppwriteURL; use PHPUnit\Framework\TestCase; -use Utopia\DSN\DSN; -use Utopia\Queue; use Utopia\Queue\Client; use Utopia\Queue\Connection; -use Utopia\System\System; class StatsTest extends TestCase { @@ -19,18 +15,9 @@ class StatsTest extends TestCase public function setUp(): void { - $env = System::getEnv('_APP_CONNECTIONS_QUEUE', 'redis_main=' . AppwriteURL::unparse([ - 'scheme' => 'redis', - 'host' => System::getEnv('_APP_REDIS_HOST', 'redis'), - 'port' => System::getEnv('_APP_REDIS_PORT', '6379'), - 'user' => System::getEnv('_APP_REDIS_USER', ''), - 'pass' => System::getEnv('_APP_REDIS_PASS', ''), - ])); - - $dsn = explode('=', $env); - $dsn = count($dsn) > 1 ? $dsn[1] : $dsn[0]; - $dsn = new DSN($dsn); - $this->connection = new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()); + global $register; + $connection = $register->get('pools')->get('queue')->pop()->getResource(); + $this->connection = $connection; $this->client = new Client(self::QUEUE_NAME, $this->connection); }