feat: refactor events

This commit is contained in:
loks0n 2024-11-04 15:05:54 +00:00
parent ff95532ccb
commit d2deca7f1f
5 changed files with 151 additions and 115 deletions

View file

@ -12,6 +12,7 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Messaging;
use Appwrite\Event\Usage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Messaging\Adapter\Realtime;
@ -58,7 +59,8 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar
return $label;
};
$eventDatabaseListener = function (Document $document, Event $queueForEvents, Response $response, Func $queueForFunctions, Document $project) {
$eventDatabaseListener = function (Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) use ($triggerEventQueues) {
// For now, we only use user creation events with the database listener.
if (!$document->getCollection() === 'users') {
return;
}
@ -68,48 +70,13 @@ $eventDatabaseListener = function (Document $document, Event $queueForEvents, Re
->setParam('userId', $document->getId())
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions
$queueForFunctions
->from($queueForEvents)
->trigger();
// Trigger webhooks
$queueForEvents
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->trigger();
// Trigger realtime
if ($project->getId() !== 'console') {
$allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams());
$payload = new Document($queueForEvents->getPayload());
$db = $queueForEvents->getContext('database');
$collection = $queueForEvents->getContext('collection');
$bucket = $queueForEvents->getContext('bucket');
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $payload,
project: $project,
database: $db,
collection: $collection,
bucket: $bucket,
);
Realtime::send(
projectId: $target['projectId'] ?? $project->getId(),
payload: $queueForEvents->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $queueForEvents->getParam('userId')
]
);
}
// Trigger functions, webhooks, and realtime events
$triggerEventQueues(
$queueForEvents,
$queueForFunctions,
$queueForWebhooks,
$queueForRealtime
);
};
$usageDatabaseListener = function (string $event, Document $document, Usage $queueForUsage) {
@ -400,6 +367,29 @@ App::init()
}
});
$triggerEventQueues = function ($queueForEvents, $queueForFunctions, $queueForWebhooks, $queueForRealtime) {
if (empty($queueForEvents->getEvent())) {
return;
}
$queueForFunctions
->from($queueForEvents)
->trigger();
$queueForWebhooks
->from($queueForEvents)
->trigger();
// Console can listen to events from other projects, but it should not trigger events for them.
if ($queueForEvents->getProject()->getId() === 'console') {
return;
}
$queueForRealtime
->from($queueForEvents)
->trigger();
};
App::init()
->groups(['api'])
->inject('utopia')
@ -416,9 +406,11 @@ App::init()
->inject('queueForBuilds')
->inject('queueForUsage')
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('queueForRealtime')
->inject('dbForProject')
->inject('mode')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
$route = $utopia->getRoute();
@ -512,13 +504,14 @@ App::init()
$queueForBuilds->setProject($project);
$queueForMessaging->setProject($project);
// Clone the queueForEvents, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($queue);
$queueForEventsClone->from($queueForEvents);
$dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'trigger-events', fn ($event, $document) => $eventDatabaseListener($document, clone $queueForEvents, $response, $queueForFunctions, $project));
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener($document, $response, $queueForEventsClone->from($queueForEvents), $queueForFunctions, $queueForWebhooks, $queueForRealtime));
$useCache = $route->getLabel('cache', false);
if ($useCache) {
@ -651,70 +644,27 @@ App::shutdown()
->inject('queueForDatabase')
->inject('queueForBuilds')
->inject('queueForMessaging')
->inject('dbForProject')
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('queueForRealtime')
->inject('dbForProject')
->inject('mode')
->inject('dbForConsole')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, string $mode, Database $dbForConsole) use ($parseLabel, $triggerEventQueues) {
$responsePayload = $response->getPayload();
if (!empty($queueForEvents->getEvent())) {
if (empty($queueForEvents->getPayload())) {
$queueForEvents->setPayload($responsePayload);
}
/**
* Trigger functions.
*/
if (!$queueForEvents->isPaused()) {
$queueForFunctions
->from($queueForEvents)
->trigger();
}
/**
* Trigger webhooks.
*/
$queueForEvents
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->trigger();
/**
* Trigger realtime.
*/
if ($project->getId() !== 'console') {
$allEvents = Event::generateEvents($queueForEvents->getEvent(), $queueForEvents->getParams());
$payload = new Document($queueForEvents->getPayload());
$db = $queueForEvents->getContext('database');
$collection = $queueForEvents->getContext('collection');
$bucket = $queueForEvents->getContext('bucket');
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $payload,
project: $project,
database: $db,
collection: $collection,
bucket: $bucket,
);
Realtime::send(
projectId: $target['projectId'] ?? $project->getId(),
payload: $queueForEvents->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $queueForEvents->getParam('userId')
]
);
}
if (empty($queueForEvents->getPayload())) {
$queueForEvents->setPayload($responsePayload);
}
$triggerEventQueues(
$queueForEvents,
$queueForFunctions,
$queueForWebhooks,
$queueForRealtime
);
$route = $utopia->getRoute();
$requestParams = $route->getParamsValues();

View file

@ -31,7 +31,9 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\Usage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Functions\Specification;
use Appwrite\GraphQL\Promises\Adapter\Swoole;
@ -1134,6 +1136,12 @@ App::setResource('queueForDeletes', function (Connection $queue) {
App::setResource('queueForEvents', function (Connection $queue) {
return new Event($queue);
}, ['queue']);
App::setResource('queueForWebhooks', function (Connection $queue) {
return new Webhook($queue);
}, ['queue']);
App::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
App::setResource('queueForAudits', function (Connection $queue) {
return new Audit($queue);
}, ['queue']);

View file

@ -204,19 +204,6 @@ class Event
return $this->payload;
}
public function getRealtimePayload(): array
{
$payload = [];
foreach ($this->payload as $key => $value) {
if (!isset($this->sensitive[$key])) {
$payload[$key] = $value;
}
}
return $payload;
}
/**
* Set context for this event.
*

View file

@ -0,0 +1,74 @@
<?php
namespace Appwrite\Event;
use Appwrite\Messaging\Adapter\Realtime as RealtimeAdapter;
use Utopia\Database\Document;
class Realtime extends Event
{
public function __construct()
{
}
public function getRealtimePayload(): array
{
$payload = [];
foreach ($this->payload as $key => $value) {
if (!isset($this->sensitive[$key])) {
$payload[$key] = $value;
}
}
return $payload;
}
/**
* Execute Event.
*
* @return string|bool
* @throws InvalidArgumentException
*/
public function trigger(): string|bool
{
if ($this->paused) {
return false;
}
if (empty($this->event)) {
return false;
}
$allEvents = Event::generateEvents($this->getEvent(), $this->getParams());
$payload = new Document($this->getPayload());
$db = $this->getContext('database');
$collection = $this->getContext('collection');
$bucket = $this->getContext('bucket');
$target = RealtimeAdapter::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $payload,
project: $this->getProject(),
database: $db,
collection: $collection,
bucket: $bucket,
);
RealtimeAdapter::send(
projectId: $target['projectId'] ?? $this->getProject()->getId(),
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
return true;
}
}

View file

@ -0,0 +1,17 @@
<?php
namespace Appwrite\Event;
use Utopia\Queue\Connection;
class Webhook extends Event
{
public function __construct(protected Connection $connection)
{
parent::__construct($connection);
$this
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME);
}
}