From ab5ef9776709448243c145ea7ce087be26110c52 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 25 Sep 2024 10:39:11 +0100 Subject: [PATCH 01/18] fix: oauth trigger create user event --- app/controllers/api/account.php | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 737bd3e09d..4dd89dbde3 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1498,6 +1498,12 @@ App::get('/v1/account/sessions/oauth2/:provider/redirect') 'providerType' => MESSAGE_TYPE_EMAIL, 'identifier' => $email, ])); + + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $user->getId()) + ->trigger(); + } catch (Duplicate) { $failureRedirect(Exception::USER_ALREADY_EXISTS); } From 3ba7d7c7deda23e49ef6d4d7f5b8174c6b3bb710 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 25 Sep 2024 10:48:39 +0100 Subject: [PATCH 02/18] fix: add payload --- app/controllers/api/account.php | 1 + 1 file changed, 1 insertion(+) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 4dd89dbde3..9222ef5546 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1502,6 +1502,7 @@ App::get('/v1/account/sessions/oauth2/:provider/redirect') $queueForEvents ->setEvent('users.[userId].create') ->setParam('userId', $user->getId()) + ->setPayload($response->output($user, Response::MODEL_ACCOUNT)) ->trigger(); } catch (Duplicate) { From 5afa8c6158131f7e799688cbe8a3fa2b1f2c0ffe Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 29 Oct 2024 10:51:40 +0000 Subject: [PATCH 03/18] feat: usage db listener --- app/controllers/api/account.php | 7 ------- app/controllers/shared/api.php | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 10f44e94ea..92d9123840 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -441,7 +441,6 @@ App::get('/v1/account') App::delete('/v1/account') ->desc('Delete account') ->groups(['api', 'account']) - ->label('event', 'users.[userId].delete') ->label('scope', 'account') ->label('audits.event', 'user.delete') ->label('audits.resource', 'user/{response.$id}') @@ -1499,12 +1498,6 @@ App::get('/v1/account/sessions/oauth2/:provider/redirect') 'identifier' => $email, ])); - $queueForEvents - ->setEvent('users.[userId].create') - ->setParam('userId', $user->getId()) - ->setPayload($response->output($user, Response::MODEL_ACCOUNT)) - ->trigger(); - } catch (Duplicate) { $failureRedirect(Exception::USER_ALREADY_EXISTS); } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 6d87940ff7..ceb3ee2f18 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -57,8 +57,17 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$databaseListener = function (string $event, Document $document, Document $project, Usage $queueForUsage, Database $dbForProject) { +$eventDatabaseListener = function (string $event, Document $document, EventDatabase $queueForEvents, Response $response) { + if ($document->getCollection() === 'users' && $event === Database::EVENT_DOCUMENT_CREATE) { + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)) + ->trigger(); + } +}; +$usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { $value = 1; if ($event === Database::EVENT_DOCUMENT_DELETE) { $value = -1; @@ -357,7 +366,7 @@ App::init() ->inject('queueForUsage') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -452,8 +461,9 @@ App::init() $queueForMessaging->setProject($project); $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, $queueForEvents, $response)); $useCache = $route->getLabel('cache', false); if ($useCache) { From d16251d261dbf1ee271eeea042f85b9b741b842e Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 29 Oct 2024 10:58:57 +0000 Subject: [PATCH 04/18] fix: remove old create user events --- app/controllers/api/account.php | 6 +--- app/controllers/api/users.php | 53 +++++++++++---------------------- 2 files changed, 18 insertions(+), 41 deletions(-) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index 92d9123840..d32836d0ec 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -273,7 +273,6 @@ $createSession = function (string $userId, string $secret, Request $request, Res App::post('/v1/account') ->desc('Create account') ->groups(['api', 'account', 'auth']) - ->label('event', 'users.[userId].create') ->label('scope', 'sessions.write') ->label('auth.type', 'emailPassword') ->label('audits.event', 'user.create') @@ -296,9 +295,8 @@ App::post('/v1/account') ->inject('user') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $name, Request $request, Response $response, Document $user, Document $project, Database $dbForProject, Hooks $hooks) { $email = \strtolower($email); if ('console' === $project->getId()) { @@ -408,8 +406,6 @@ App::post('/v1/account') Authorization::setRole(Role::user($user->getId())->toString()); Authorization::setRole(Role::users()->toString()); - $queueForEvents->setParam('userId', $user->getId()); - $response ->setStatusCode(Response::STATUS_CODE_CREATED) ->dynamic($user, Response::MODEL_ACCOUNT); diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 571df4fdb2..42d0875720 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -50,7 +50,7 @@ use Utopia\Validator\Text; use Utopia\Validator\WhiteList; /** TODO: Remove function when we move to using utopia/platform */ -function createUser(string $hash, mixed $hashOptions, string $userId, ?string $email, ?string $password, ?string $phone, string $name, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks): Document +function createUser(string $hash, mixed $hashOptions, string $userId, ?string $email, ?string $password, ?string $phone, string $name, Document $project, Database $dbForProject, Hooks $hooks): Document { $plaintextPassword = $password; $hashOptionsObject = (\is_string($hashOptions)) ? \json_decode($hashOptions, true) : $hashOptions; // Cast to JSON array @@ -175,15 +175,12 @@ function createUser(string $hash, mixed $hashOptions, string $userId, ?string $e throw new Exception(Exception::USER_ALREADY_EXISTS); } - $queueForEvents->setParam('userId', $user->getId()); - return $user; } App::post('/v1/users') ->desc('Create user') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -202,10 +199,9 @@ App::post('/v1/users') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, ?string $email, ?string $phone, ?string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('plaintext', '{}', $userId, $email, $password, $phone, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) ->dynamic($user, Response::MODEL_USER); @@ -214,7 +210,6 @@ App::post('/v1/users') App::post('/v1/users/bcrypt') ->desc('Create user with bcrypt password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -232,10 +227,9 @@ App::post('/v1/users/bcrypt') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('bcrypt', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('bcrypt', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -245,7 +239,6 @@ App::post('/v1/users/bcrypt') App::post('/v1/users/md5') ->desc('Create user with MD5 password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -263,10 +256,9 @@ App::post('/v1/users/md5') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('md5', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('md5', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -276,7 +268,6 @@ App::post('/v1/users/md5') App::post('/v1/users/argon2') ->desc('Create user with Argon2 password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -294,10 +285,9 @@ App::post('/v1/users/argon2') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('argon2', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('argon2', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -307,7 +297,6 @@ App::post('/v1/users/argon2') App::post('/v1/users/sha') ->desc('Create user with SHA password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -326,16 +315,15 @@ App::post('/v1/users/sha') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordVersion, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $passwordVersion, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { $options = '{}'; if (!empty($passwordVersion)) { $options = '{"version":"' . $passwordVersion . '"}'; } - $user = createUser('sha', $options, $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + $user = createUser('sha', $options, $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -345,7 +333,6 @@ App::post('/v1/users/sha') App::post('/v1/users/phpass') ->desc('Create user with PHPass password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -363,10 +350,9 @@ App::post('/v1/users/phpass') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('phpass', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('phpass', '{}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -376,7 +362,6 @@ App::post('/v1/users/phpass') App::post('/v1/users/scrypt') ->desc('Create user with Scrypt password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -399,9 +384,8 @@ App::post('/v1/users/scrypt') ->inject('response') ->inject('project') ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordSalt, int $passwordCpu, int $passwordMemory, int $passwordParallel, int $passwordLength, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { + ->action(function (string $userId, string $email, string $password, string $passwordSalt, int $passwordCpu, int $passwordMemory, int $passwordParallel, int $passwordLength, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { $options = [ 'salt' => $passwordSalt, 'costCpu' => $passwordCpu, @@ -410,7 +394,7 @@ App::post('/v1/users/scrypt') 'length' => $passwordLength ]; - $user = createUser('scrypt', \json_encode($options), $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + $user = createUser('scrypt', \json_encode($options), $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) @@ -420,7 +404,6 @@ App::post('/v1/users/scrypt') App::post('/v1/users/scrypt-modified') ->desc('Create user with Scrypt modified password') ->groups(['api', 'users']) - ->label('event', 'users.[userId].create') ->label('scope', 'users.write') ->label('audits.event', 'user.create') ->label('audits.resource', 'user/{response.$id}') @@ -440,11 +423,9 @@ App::post('/v1/users/scrypt-modified') ->param('name', '', new Text(128), 'User name. Max length: 128 chars.', true) ->inject('response') ->inject('project') - ->inject('dbForProject') - ->inject('queueForEvents') ->inject('hooks') - ->action(function (string $userId, string $email, string $password, string $passwordSalt, string $passwordSaltSeparator, string $passwordSignerKey, string $name, Response $response, Document $project, Database $dbForProject, Event $queueForEvents, Hooks $hooks) { - $user = createUser('scryptMod', '{"signerKey":"' . $passwordSignerKey . '","saltSeparator":"' . $passwordSaltSeparator . '","salt":"' . $passwordSalt . '"}', $userId, $email, $password, null, $name, $project, $dbForProject, $queueForEvents, $hooks); + ->action(function (string $userId, string $email, string $password, string $passwordSalt, string $passwordSaltSeparator, string $passwordSignerKey, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { + $user = createUser('scryptMod', '{"signerKey":"' . $passwordSignerKey . '","saltSeparator":"' . $passwordSaltSeparator . '","salt":"' . $passwordSalt . '"}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); $response ->setStatusCode(Response::STATUS_CODE_CREATED) From 3020c0beea46700cc3d280ab73eb6805f8c5cd0c Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:01:29 +0000 Subject: [PATCH 05/18] fix: missing dep --- app/controllers/api/users.php | 1 + 1 file changed, 1 insertion(+) diff --git a/app/controllers/api/users.php b/app/controllers/api/users.php index 42d0875720..3beb1526b0 100644 --- a/app/controllers/api/users.php +++ b/app/controllers/api/users.php @@ -423,6 +423,7 @@ App::post('/v1/users/scrypt-modified') ->param('name', '', new Text(128), 'User name. Max length: 128 chars.', true) ->inject('response') ->inject('project') + ->inject('dbForProject') ->inject('hooks') ->action(function (string $userId, string $email, string $password, string $passwordSalt, string $passwordSaltSeparator, string $passwordSignerKey, string $name, Response $response, Document $project, Database $dbForProject, Hooks $hooks) { $user = createUser('scryptMod', '{"signerKey":"' . $passwordSignerKey . '","saltSeparator":"' . $passwordSaltSeparator . '","salt":"' . $passwordSalt . '"}', $userId, $email, $password, null, $name, $project, $dbForProject, $hooks); From 13d1376e282fb592ad4f598d5bb42b288825e01b Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:20:52 +0000 Subject: [PATCH 06/18] fix: type --- app/controllers/shared/api.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index ceb3ee2f18..a0151ed9ed 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -57,7 +57,7 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (string $event, Document $document, EventDatabase $queueForEvents, Response $response) { +$eventDatabaseListener = function (string $event, Document $document, Event $queueForEvents, Response $response) { if ($document->getCollection() === 'users' && $event === Database::EVENT_DOCUMENT_CREATE) { $queueForEvents ->setEvent('users.[userId].create') From 9d4fd4170158dbd16715f489a48f12220d6c2f51 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 29 Oct 2024 11:56:41 +0000 Subject: [PATCH 07/18] fix: webhook test --- tests/e2e/Services/Webhooks/WebhooksBase.php | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/e2e/Services/Webhooks/WebhooksBase.php b/tests/e2e/Services/Webhooks/WebhooksBase.php index 6be3e16c1f..2ef41003ee 100644 --- a/tests/e2e/Services/Webhooks/WebhooksBase.php +++ b/tests/e2e/Services/Webhooks/WebhooksBase.php @@ -901,6 +901,17 @@ trait WebhooksBase $teamId = $data['teamId'] ?? ''; $email = uniqid() . 'friend@localhost.test'; + // Create user to ensure team event is triggered after user event + $user = $this->client->call(Client::METHOD_POST, '/account', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ], $this->getHeaders()), [ + 'userId' => ID::unique(), + 'email' => $email, + 'password' => 'password', + 'name' => 'Friend User', + ]); + /** * Test for SUCCESS */ @@ -909,7 +920,6 @@ trait WebhooksBase 'x-appwrite-project' => $this->getProject()['$id'], ], $this->getHeaders()), [ 'email' => $email, - 'name' => 'Friend User', 'roles' => ['admin', 'editor'], 'url' => 'http://localhost:5000/join-us#title' ]); From 26c53bcf9cda31f1c2385b49c2c9d4d5011a1b92 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:01:12 +0000 Subject: [PATCH 08/18] feat: clone queue --- app/controllers/shared/api.php | 2 +- src/Appwrite/Event/Event.php | 36 ++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index c5f0f3ac85..c65f6356a3 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -468,7 +468,7 @@ App::init() $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, $queueForEvents, $response)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, clone $queueForEvents, $response)); $useCache = $route->getLabel('cache', false); if ($useCache) { diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 43eda511df..8e5325545c 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -319,6 +319,10 @@ class Event return false; } + if (empty($this->event)) { + return false; + } + $client = new Client($this->queue, $this->connection); return $client->enqueue([ @@ -546,4 +550,36 @@ class Event return $this; } + + /** + * Clone event. + */ + public function __clone() + { + if ($this->project instanceof Document) { + $this->project = clone $this->project; + } + + if ($this->user instanceof Document) { + $this->user = clone $this->user; + } + + foreach ($this->context as $key => $value) { + if ($value instanceof Document) { + $this->context[$key] = clone $value; + } + } + + foreach ($this->params as $key => $value) { + if (is_object($value)) { + $this->params[$key] = clone $value; + } + } + + foreach ($this->payload as $key => $value) { + if (is_object($value)) { + $this->payload[$key] = clone $value; + } + } + } } From 2736f6009a651e63d90795f9de85019d70efe91a Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:18:37 +0000 Subject: [PATCH 09/18] feat: dirty solution --- app/controllers/shared/api.php | 59 ++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 3 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index c65f6356a3..e9a404de48 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -57,7 +57,7 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (string $event, Document $document, Event $queueForEvents, Response $response) { +$eventDatabaseListener = function (string $event, Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { if ($document->getCollection() === 'users' && $event === Database::EVENT_DOCUMENT_CREATE) { $queueForEvents ->setEvent('users.[userId].create') @@ -65,6 +65,58 @@ $eventDatabaseListener = function (string $event, Document $document, Event $que ->setPayload($response->output($document, Response::MODEL_USER)) ->trigger(); } + + // FIXME: This is a temporary solution, this should be moved to the shutdown hook + /** + * Trigger functions. + */ + if (!$queueForEvents->isPaused()) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + } + + /** + * Trigger webhooks. + */ + $queueForEvents + ->setClass(Event::WEBHOOK_CLASS_NAME) + ->setQueue(Event::WEBHOOK_QUEUE_NAME) + ->trigger(); + + /** + * Trigger realtime. + */ + if ($project->getId() !== 'console') { + $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); + $payload = new Document($queueForEvents->getPayload()); + + $db = $queueForEvents->getContext('database'); + $collection = $queueForEvents->getContext('collection'); + $bucket = $queueForEvents->getContext('bucket'); + + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $payload, + project: $project, + database: $db, + collection: $collection, + bucket: $bucket, + ); + + Realtime::send( + projectId: $target['projectId'] ?? $project->getId(), + payload: $queueForEvents->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $queueForEvents->getParam('userId') + ] + ); + } }; $usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { @@ -369,9 +421,10 @@ App::init() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForUsage') + ->inject('queueForFunctions') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -468,7 +521,7 @@ App::init() $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, clone $queueForEvents, $response)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, clone $queueForEvents, $response, $queueForFunctions, $project)); $useCache = $route->getLabel('cache', false); if ($useCache) { From 51ae3a6475c090715f724d090e309792971f5a42 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 30 Oct 2024 18:31:14 +0000 Subject: [PATCH 10/18] fix: tests --- app/controllers/shared/api.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index e9a404de48..ea2d03ccf2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -58,6 +58,9 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar }; $eventDatabaseListener = function (string $event, Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { + // Set event empty to avoid triggering events by default + $queueForEvents->setEvent(''); + if ($document->getCollection() === 'users' && $event === Database::EVENT_DOCUMENT_CREATE) { $queueForEvents ->setEvent('users.[userId].create') From e87ed0e13a3df31ddec1703811309898dad34e87 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Fri, 1 Nov 2024 15:24:29 +0000 Subject: [PATCH 11/18] chore: refactor use `from` over clonse --- app/controllers/shared/api.php | 46 ++++++++++++++++++---------------- src/Appwrite/Event/Event.php | 40 ++++++++++------------------- src/Appwrite/Event/Func.php | 18 ------------- 3 files changed, 37 insertions(+), 67 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index ea2d03ccf2..392fc85cf5 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -28,6 +28,7 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Connection; use Utopia\System\System; use Utopia\Validator\WhiteList; @@ -57,31 +58,28 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (string $event, Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { - // Set event empty to avoid triggering events by default - $queueForEvents->setEvent(''); - - if ($document->getCollection() === 'users' && $event === Database::EVENT_DOCUMENT_CREATE) { - $queueForEvents - ->setEvent('users.[userId].create') - ->setParam('userId', $document->getId()) - ->setPayload($response->output($document, Response::MODEL_USER)) - ->trigger(); +$eventDatabaseListener = function (Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { + switch ($document->getCollection()) { + case 'users': + $queueForEvents + ->setEvent('users.[userId].update') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)) + ->trigger(); + break; } // FIXME: This is a temporary solution, this should be moved to the shutdown hook - /** - * Trigger functions. - */ - if (!$queueForEvents->isPaused()) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + if (empty($queueForEvents->getEvent())) { + return; } - /** - * Trigger webhooks. - */ + // Trigger functions + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + // Trigger webhooks $queueForEvents ->setClass(Event::WEBHOOK_CLASS_NAME) ->setQueue(Event::WEBHOOK_QUEUE_NAME) @@ -417,6 +415,7 @@ App::init() ->inject('response') ->inject('project') ->inject('user') + ->inject('queue') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -427,7 +426,7 @@ App::init() ->inject('queueForFunctions') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -521,10 +520,13 @@ App::init() $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); + $queueForEventsClone = new Event($queue); + $queueForEventsClone->from($queueForEvents); + $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($event, $document, clone $queueForEvents, $response, $queueForFunctions, $project)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($document, clone $queueForEvents, $response, $queueForFunctions, $project)); $useCache = $route->getLabel('cache', false); if ($useCache) { diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 8e5325545c..14cb1ef4b7 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -552,34 +552,20 @@ class Event } /** - * Clone event. + * Generate a function event from a base event + * + * @param Event $event + * + * @return self + * */ - public function __clone() + public function from(Event $event): self { - if ($this->project instanceof Document) { - $this->project = clone $this->project; - } - - if ($this->user instanceof Document) { - $this->user = clone $this->user; - } - - foreach ($this->context as $key => $value) { - if ($value instanceof Document) { - $this->context[$key] = clone $value; - } - } - - foreach ($this->params as $key => $value) { - if (is_object($value)) { - $this->params[$key] = clone $value; - } - } - - foreach ($this->payload as $key => $value) { - if (is_object($value)) { - $this->payload[$key] = clone $value; - } - } + $this->project = $event->getProject(); + $this->user = $event->getUser(); + $this->payload = $event->getPayload(); + $this->event = $event->getEvent(); + $this->params = $event->getParams(); + return $this; } } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 4dad5802f7..0ad639a9f5 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -238,22 +238,4 @@ class Func extends Event 'method' => $this->method, ]); } - - /** - * Generate a function event from a base event - * - * @param Event $event - * - * @return self - * - */ - public function from(Event $event): self - { - $this->project = $event->getProject(); - $this->user = $event->getUser(); - $this->payload = $event->getPayload(); - $this->event = $event->getEvent(); - $this->params = $event->getParams(); - return $this; - } } From ff95532ccb7369fc182af8f26c35dcb62906d498 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Fri, 1 Nov 2024 16:29:34 +0000 Subject: [PATCH 12/18] chore: refactor --- app/controllers/shared/api.php | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 392fc85cf5..043c346587 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -59,21 +59,15 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar }; $eventDatabaseListener = function (Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { - switch ($document->getCollection()) { - case 'users': - $queueForEvents - ->setEvent('users.[userId].update') - ->setParam('userId', $document->getId()) - ->setPayload($response->output($document, Response::MODEL_USER)) - ->trigger(); - break; - } - - // FIXME: This is a temporary solution, this should be moved to the shutdown hook - if (empty($queueForEvents->getEvent())) { + if (!$document->getCollection() === 'users') { return; } + $queueForEvents + ->setEvent('users.[userId].update') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)); + // Trigger functions $queueForFunctions ->from($queueForEvents) @@ -85,9 +79,7 @@ $eventDatabaseListener = function (Document $document, Event $queueForEvents, Re ->setQueue(Event::WEBHOOK_QUEUE_NAME) ->trigger(); - /** - * Trigger realtime. - */ + // Trigger realtime if ($project->getId() !== 'console') { $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); $payload = new Document($queueForEvents->getPayload()); From d2deca7f1faeb2549ee2c9067454d7e60b0b1ff2 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:05:54 +0000 Subject: [PATCH 13/18] feat: refactor events --- app/controllers/shared/api.php | 154 +++++++++++--------------------- app/init.php | 8 ++ src/Appwrite/Event/Event.php | 13 --- src/Appwrite/Event/Realtime.php | 74 +++++++++++++++ src/Appwrite/Event/Webhook.php | 17 ++++ 5 files changed, 151 insertions(+), 115 deletions(-) create mode 100644 src/Appwrite/Event/Realtime.php create mode 100644 src/Appwrite/Event/Webhook.php diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 043c346587..b2a037d33a 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -12,6 +12,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Messaging; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; @@ -58,7 +59,8 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { +$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) use ($triggerEventQueues) { + // For now, we only use user creation events with the database listener. if (!$document->getCollection() === 'users') { return; } @@ -68,48 +70,13 @@ $eventDatabaseListener = function (Document $document, Event $queueForEvents, Re ->setParam('userId', $document->getId()) ->setPayload($response->output($document, Response::MODEL_USER)); - // Trigger functions - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - // Trigger webhooks - $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->trigger(); - - // Trigger realtime - if ($project->getId() !== 'console') { - $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); - $payload = new Document($queueForEvents->getPayload()); - - $db = $queueForEvents->getContext('database'); - $collection = $queueForEvents->getContext('collection'); - $bucket = $queueForEvents->getContext('bucket'); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $payload, - project: $project, - database: $db, - collection: $collection, - bucket: $bucket, - ); - - Realtime::send( - projectId: $target['projectId'] ?? $project->getId(), - payload: $queueForEvents->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $queueForEvents->getParam('userId') - ] - ); - } + // Trigger functions, webhooks, and realtime events + $triggerEventQueues( + $queueForEvents, + $queueForFunctions, + $queueForWebhooks, + $queueForRealtime + ); }; $usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { @@ -400,6 +367,29 @@ App::init() } }); +$triggerEventQueues = function ($queueForEvents, $queueForFunctions, $queueForWebhooks, $queueForRealtime) { + if (empty($queueForEvents->getEvent())) { + return; + } + + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + // Console can listen to events from other projects, but it should not trigger events for them. + if ($queueForEvents->getProject()->getId() === 'console') { + return; + } + + $queueForRealtime + ->from($queueForEvents) + ->trigger(); +}; + App::init() ->groups(['api']) ->inject('utopia') @@ -416,9 +406,11 @@ App::init() ->inject('queueForBuilds') ->inject('queueForUsage') ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -512,13 +504,14 @@ App::init() $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); + // Clone the queueForEvents, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($queue); - $queueForEventsClone->from($queueForEvents); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($document, clone $queueForEvents, $response, $queueForFunctions, $project)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener($document, $response, $queueForEventsClone->from($queueForEvents), $queueForFunctions, $queueForWebhooks, $queueForRealtime)); $useCache = $route->getLabel('cache', false); if ($useCache) { @@ -651,70 +644,27 @@ App::shutdown() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForMessaging') - ->inject('dbForProject') ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') + ->inject('dbForProject') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel, $triggerEventQueues) { $responsePayload = $response->getPayload(); - if (!empty($queueForEvents->getEvent())) { - if (empty($queueForEvents->getPayload())) { - $queueForEvents->setPayload($responsePayload); - } - - /** - * Trigger functions. - */ - if (!$queueForEvents->isPaused()) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - } - /** - * Trigger webhooks. - */ - $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->trigger(); - - /** - * Trigger realtime. - */ - if ($project->getId() !== 'console') { - $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); - $payload = new Document($queueForEvents->getPayload()); - - $db = $queueForEvents->getContext('database'); - $collection = $queueForEvents->getContext('collection'); - $bucket = $queueForEvents->getContext('bucket'); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $payload, - project: $project, - database: $db, - collection: $collection, - bucket: $bucket, - ); - - Realtime::send( - projectId: $target['projectId'] ?? $project->getId(), - payload: $queueForEvents->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $queueForEvents->getParam('userId') - ] - ); - } + if (empty($queueForEvents->getPayload())) { + $queueForEvents->setPayload($responsePayload); } + $triggerEventQueues( + $queueForEvents, + $queueForFunctions, + $queueForWebhooks, + $queueForRealtime + ); + $route = $utopia->getRoute(); $requestParams = $route->getParamsValues(); diff --git a/app/init.php b/app/init.php index e962c58b67..d062e218e9 100644 --- a/app/init.php +++ b/app/init.php @@ -31,7 +31,9 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\Specification; use Appwrite\GraphQL\Promises\Adapter\Swoole; @@ -1134,6 +1136,12 @@ App::setResource('queueForDeletes', function (Connection $queue) { App::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); +App::setResource('queueForWebhooks', function (Connection $queue) { + return new Webhook($queue); +}, ['queue']); +App::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); App::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 14cb1ef4b7..187203b04c 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -204,19 +204,6 @@ class Event return $this->payload; } - public function getRealtimePayload(): array - { - $payload = []; - - foreach ($this->payload as $key => $value) { - if (!isset($this->sensitive[$key])) { - $payload[$key] = $value; - } - } - - return $payload; - } - /** * Set context for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php new file mode 100644 index 0000000000..394de9a612 --- /dev/null +++ b/src/Appwrite/Event/Realtime.php @@ -0,0 +1,74 @@ +payload as $key => $value) { + if (!isset($this->sensitive[$key])) { + $payload[$key] = $value; + } + } + + return $payload; + } + + /** + * Execute Event. + * + * @return string|bool + * @throws InvalidArgumentException + */ + public function trigger(): string|bool + { + if ($this->paused) { + return false; + } + + if (empty($this->event)) { + return false; + } + + $allEvents = Event::generateEvents($this->getEvent(), $this->getParams()); + $payload = new Document($this->getPayload()); + + $db = $this->getContext('database'); + $collection = $this->getContext('collection'); + $bucket = $this->getContext('bucket'); + + $target = RealtimeAdapter::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $payload, + project: $this->getProject(), + database: $db, + collection: $collection, + bucket: $bucket, + ); + + RealtimeAdapter::send( + projectId: $target['projectId'] ?? $this->getProject()->getId(), + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + + return true; + } +} diff --git a/src/Appwrite/Event/Webhook.php b/src/Appwrite/Event/Webhook.php new file mode 100644 index 0000000000..125c9a78d5 --- /dev/null +++ b/src/Appwrite/Event/Webhook.php @@ -0,0 +1,17 @@ +setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME); + } +} From 20d8a702a126b31eaaab18598864632a14bab56c Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:20:43 +0000 Subject: [PATCH 14/18] chore: refactor --- app/controllers/shared/api.php | 81 ++++++++++++++++----------------- src/Appwrite/Event/Event.php | 22 --------- src/Appwrite/Event/Func.php | 4 -- src/Appwrite/Event/Realtime.php | 4 -- 4 files changed, 38 insertions(+), 73 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index b2a037d33a..56137631e8 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -11,11 +11,11 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Messaging; +use Appwrite\Event\Realtime; use Appwrite\Event\Usage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Utopia\Abuse\Abuse; @@ -59,9 +59,9 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) use ($triggerEventQueues) { - // For now, we only use user creation events with the database listener. - if (!$document->getCollection() === 'users') { +$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + // Only trigger events for user creation with the database listener. + if ($document->getCollection() !== 'users') { return; } @@ -71,12 +71,21 @@ $eventDatabaseListener = function (Document $document, Response $response, Event ->setPayload($response->output($document, Response::MODEL_USER)); // Trigger functions, webhooks, and realtime events - $triggerEventQueues( - $queueForEvents, - $queueForFunctions, - $queueForWebhooks, - $queueForRealtime - ); + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + if ($queueForEvents->getProject()->getId() === 'console') { + return; + } + + $queueForRealtime + ->from($queueForEvents) + ->trigger(); }; $usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { @@ -367,29 +376,6 @@ App::init() } }); -$triggerEventQueues = function ($queueForEvents, $queueForFunctions, $queueForWebhooks, $queueForRealtime) { - if (empty($queueForEvents->getEvent())) { - return; - } - - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); - - // Console can listen to events from other projects, but it should not trigger events for them. - if ($queueForEvents->getProject()->getId() === 'console') { - return; - } - - $queueForRealtime - ->from($queueForEvents) - ->trigger(); -}; - App::init() ->groups(['api']) ->inject('utopia') @@ -650,20 +636,29 @@ App::shutdown() ->inject('dbForProject') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel, $triggerEventQueues) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel) { $responsePayload = $response->getPayload(); - if (empty($queueForEvents->getPayload())) { - $queueForEvents->setPayload($responsePayload); - } + if (!empty($queueForEvents->getEvent())) { + if (empty($queueForEvents->getPayload())) { + $queueForEvents->setPayload($responsePayload); + } - $triggerEventQueues( - $queueForEvents, - $queueForFunctions, - $queueForWebhooks, - $queueForRealtime - ); + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + if ($project->getId() !== 'console') { + $queueForRealtime + ->from($queueForEvents) + ->trigger(); + } + } $route = $utopia->getRoute(); $requestParams = $route->getParamsValues(); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 187203b04c..d15ccc39bc 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -302,10 +302,6 @@ class Event */ public function trigger(): string|bool { - if ($this->paused) { - return false; - } - if (empty($this->event)) { return false; } @@ -520,24 +516,6 @@ class Event return \array_values($events); } - /** - * Get the value of paused - */ - public function isPaused(): bool - { - return $this->paused; - } - - /** - * Set the value of paused - */ - public function setPaused(bool $paused): self - { - $this->paused = $paused; - - return $this; - } - /** * Generate a function event from a base event * diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 0ad639a9f5..11a445d8ed 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -213,10 +213,6 @@ class Func extends Event */ public function trigger(): string|bool { - if ($this->paused) { - return false; - } - $client = new Client($this->queue, $this->connection); $events = $this->getEvent() ? Event::generateEvents($this->getEvent(), $this->getParams()) : null; diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 394de9a612..e158076f9b 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -32,10 +32,6 @@ class Realtime extends Event */ public function trigger(): string|bool { - if ($this->paused) { - return false; - } - if (empty($this->event)) { return false; } From e9a0f47711d1a2eddbc7a2bcab1c8b9139449b27 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:55:56 +0100 Subject: [PATCH 15/18] fix: event test --- src/Appwrite/Event/Event.php | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index d15ccc39bc..325aca8c62 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -302,10 +302,6 @@ class Event */ public function trigger(): string|bool { - if (empty($this->event)) { - return false; - } - $client = new Client($this->queue, $this->connection); return $client->enqueue([ From 2dea9b1f51a1cfde8c0437b2eaf8b53b165164fe Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:35:35 +0100 Subject: [PATCH 16/18] chore: clone --- app/controllers/shared/api.php | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 56137631e8..a07bc06a91 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -490,14 +490,24 @@ App::init() $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); - // Clone the queueForEvents, to prevent events triggered by the database listener + // Clone the queues, to prevent events triggered by the database listener // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($queue); + $queueForFunctionsClone = new Func($queue); + $queueForWebhooksClone = new Webhook($queue); + $queueForRealtimeClone = new Realtime($queue); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener($document, $response, $queueForEventsClone->from($queueForEvents), $queueForFunctions, $queueForWebhooks, $queueForRealtime)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( + $document, + $response, + $queueForEventsClone->from($queueForEvents), + $queueForFunctionsClone->from($queueForEvents), + $queueForWebhooksClone->from($queueForEvents), + $queueForRealtimeClone->from($queueForEvents) + )); $useCache = $route->getLabel('cache', false); if ($useCache) { From ab7e45429222e36150a0c74ee234badcf554db1f Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:17:41 +0100 Subject: [PATCH 17/18] fix: event name --- app/controllers/shared/api.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index a07bc06a91..8835247574 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -66,7 +66,7 @@ $eventDatabaseListener = function (Document $document, Response $response, Event } $queueForEvents - ->setEvent('users.[userId].update') + ->setEvent('users.[userId].create') ->setParam('userId', $document->getId()) ->setPayload($response->output($document, Response::MODEL_USER)); From 229fe9ecd5b37b1fc5db664c3b5d7f311da81b53 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Tue, 5 Nov 2024 15:50:32 +0100 Subject: [PATCH 18/18] fix: tests --- app/controllers/shared/api.php | 17 +++++++---------- src/Appwrite/Event/Event.php | 1 + 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 8835247574..f5921bf6e8 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -391,12 +391,9 @@ App::init() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForUsage') - ->inject('queueForFunctions') - ->inject('queueForWebhooks') - ->inject('queueForRealtime') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -493,9 +490,9 @@ App::init() // Clone the queues, to prevent events triggered by the database listener // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($queue); - $queueForFunctionsClone = new Func($queue); - $queueForWebhooksClone = new Webhook($queue); - $queueForRealtimeClone = new Realtime($queue); + $queueForFunctions = new Func($queue); + $queueForWebhooks = new Webhook($queue); + $queueForRealtime = new Realtime(); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) @@ -504,9 +501,9 @@ App::init() $document, $response, $queueForEventsClone->from($queueForEvents), - $queueForFunctionsClone->from($queueForEvents), - $queueForWebhooksClone->from($queueForEvents), - $queueForRealtimeClone->from($queueForEvents) + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) )); $useCache = $route->getLabel('cache', false); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 325aca8c62..3f166ad7a4 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -527,6 +527,7 @@ class Event $this->payload = $event->getPayload(); $this->event = $event->getEvent(); $this->params = $event->getParams(); + $this->context = $event->context; return $this; } }