From d2deca7f1faeb2549ee2c9067454d7e60b0b1ff2 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 4 Nov 2024 15:05:54 +0000 Subject: [PATCH] feat: refactor events --- app/controllers/shared/api.php | 154 +++++++++++--------------------- app/init.php | 8 ++ src/Appwrite/Event/Event.php | 13 --- src/Appwrite/Event/Realtime.php | 74 +++++++++++++++ src/Appwrite/Event/Webhook.php | 17 ++++ 5 files changed, 151 insertions(+), 115 deletions(-) create mode 100644 src/Appwrite/Event/Realtime.php create mode 100644 src/Appwrite/Event/Webhook.php diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 043c346587..b2a037d33a 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -12,6 +12,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Messaging; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Messaging\Adapter\Realtime; @@ -58,7 +59,8 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -$eventDatabaseListener = function (Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) { +$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) use ($triggerEventQueues) { + // For now, we only use user creation events with the database listener. if (!$document->getCollection() === 'users') { return; } @@ -68,48 +70,13 @@ $eventDatabaseListener = function (Document $document, Event $queueForEvents, Re ->setParam('userId', $document->getId()) ->setPayload($response->output($document, Response::MODEL_USER)); - // Trigger functions - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - // Trigger webhooks - $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->trigger(); - - // Trigger realtime - if ($project->getId() !== 'console') { - $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); - $payload = new Document($queueForEvents->getPayload()); - - $db = $queueForEvents->getContext('database'); - $collection = $queueForEvents->getContext('collection'); - $bucket = $queueForEvents->getContext('bucket'); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $payload, - project: $project, - database: $db, - collection: $collection, - bucket: $bucket, - ); - - Realtime::send( - projectId: $target['projectId'] ?? $project->getId(), - payload: $queueForEvents->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $queueForEvents->getParam('userId') - ] - ); - } + // Trigger functions, webhooks, and realtime events + $triggerEventQueues( + $queueForEvents, + $queueForFunctions, + $queueForWebhooks, + $queueForRealtime + ); }; $usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) { @@ -400,6 +367,29 @@ App::init() } }); +$triggerEventQueues = function ($queueForEvents, $queueForFunctions, $queueForWebhooks, $queueForRealtime) { + if (empty($queueForEvents->getEvent())) { + return; + } + + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + + // Console can listen to events from other projects, but it should not trigger events for them. + if ($queueForEvents->getProject()->getId() === 'console') { + return; + } + + $queueForRealtime + ->from($queueForEvents) + ->trigger(); +}; + App::init() ->groups(['api']) ->inject('utopia') @@ -416,9 +406,11 @@ App::init() ->inject('queueForBuilds') ->inject('queueForUsage') ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -512,13 +504,14 @@ App::init() $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); + // Clone the queueForEvents, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($queue); - $queueForEventsClone->from($queueForEvents); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($document, clone $queueForEvents, $response, $queueForFunctions, $project)); + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener($document, $response, $queueForEventsClone->from($queueForEvents), $queueForFunctions, $queueForWebhooks, $queueForRealtime)); $useCache = $route->getLabel('cache', false); if ($useCache) { @@ -651,70 +644,27 @@ App::shutdown() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForMessaging') - ->inject('dbForProject') ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') + ->inject('dbForProject') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel, $triggerEventQueues) { $responsePayload = $response->getPayload(); - if (!empty($queueForEvents->getEvent())) { - if (empty($queueForEvents->getPayload())) { - $queueForEvents->setPayload($responsePayload); - } - - /** - * Trigger functions. - */ - if (!$queueForEvents->isPaused()) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - } - /** - * Trigger webhooks. - */ - $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->trigger(); - - /** - * Trigger realtime. - */ - if ($project->getId() !== 'console') { - $allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams()); - $payload = new Document($queueForEvents->getPayload()); - - $db = $queueForEvents->getContext('database'); - $collection = $queueForEvents->getContext('collection'); - $bucket = $queueForEvents->getContext('bucket'); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $payload, - project: $project, - database: $db, - collection: $collection, - bucket: $bucket, - ); - - Realtime::send( - projectId: $target['projectId'] ?? $project->getId(), - payload: $queueForEvents->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $queueForEvents->getParam('userId') - ] - ); - } + if (empty($queueForEvents->getPayload())) { + $queueForEvents->setPayload($responsePayload); } + $triggerEventQueues( + $queueForEvents, + $queueForFunctions, + $queueForWebhooks, + $queueForRealtime + ); + $route = $utopia->getRoute(); $requestParams = $route->getParamsValues(); diff --git a/app/init.php b/app/init.php index e962c58b67..d062e218e9 100644 --- a/app/init.php +++ b/app/init.php @@ -31,7 +31,9 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\Usage; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\Specification; use Appwrite\GraphQL\Promises\Adapter\Swoole; @@ -1134,6 +1136,12 @@ App::setResource('queueForDeletes', function (Connection $queue) { App::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); +App::setResource('queueForWebhooks', function (Connection $queue) { + return new Webhook($queue); +}, ['queue']); +App::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); App::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 14cb1ef4b7..187203b04c 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -204,19 +204,6 @@ class Event return $this->payload; } - public function getRealtimePayload(): array - { - $payload = []; - - foreach ($this->payload as $key => $value) { - if (!isset($this->sensitive[$key])) { - $payload[$key] = $value; - } - } - - return $payload; - } - /** * Set context for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php new file mode 100644 index 0000000000..394de9a612 --- /dev/null +++ b/src/Appwrite/Event/Realtime.php @@ -0,0 +1,74 @@ +payload as $key => $value) { + if (!isset($this->sensitive[$key])) { + $payload[$key] = $value; + } + } + + return $payload; + } + + /** + * Execute Event. + * + * @return string|bool + * @throws InvalidArgumentException + */ + public function trigger(): string|bool + { + if ($this->paused) { + return false; + } + + if (empty($this->event)) { + return false; + } + + $allEvents = Event::generateEvents($this->getEvent(), $this->getParams()); + $payload = new Document($this->getPayload()); + + $db = $this->getContext('database'); + $collection = $this->getContext('collection'); + $bucket = $this->getContext('bucket'); + + $target = RealtimeAdapter::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $payload, + project: $this->getProject(), + database: $db, + collection: $collection, + bucket: $bucket, + ); + + RealtimeAdapter::send( + projectId: $target['projectId'] ?? $this->getProject()->getId(), + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + + return true; + } +} diff --git a/src/Appwrite/Event/Webhook.php b/src/Appwrite/Event/Webhook.php new file mode 100644 index 0000000000..125c9a78d5 --- /dev/null +++ b/src/Appwrite/Event/Webhook.php @@ -0,0 +1,17 @@ +setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME); + } +}