refactor: integrate EventProcessor for handling function and webhook events; streamline event triggering in database actions

This commit is contained in:
shimon 2026-01-07 16:57:57 +02:00
parent 0582cdf394
commit db4dcd164e
14 changed files with 387 additions and 331 deletions

View file

@ -10,12 +10,12 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\Method;
use Appwrite\Utopia\Database\Documents\User;
use Appwrite\Utopia\Request;
@ -30,7 +30,6 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Validator\WhiteList;
@ -74,178 +73,6 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar
return $label;
};
/**
* This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**.
*
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
return;
}
$queueForEvents
->setEvent('users.[userId].create')
->setParam('userId', $document->getId())
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions, webhooks, and realtime events
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger webhooks events only if a project has them enabled */
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
}
/** Trigger realtime events only for non console events */
if ($queueForEvents->getProject()->getId() !== 'console') {
$queueForRealtime
->from($queueForEvents)
->trigger();
}
};
/**
* Purge function events cache when functions are created, updated or deleted.
*/
$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) {
if ($document->getCollection() !== 'functions') {
return;
}
if ($project->isEmpty() || $project->getId() === 'console') {
return;
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$dbForProject->getCache()->purge($cacheKey);
};
$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) {
$value = 1;
switch ($event) {
case Database::EVENT_DOCUMENT_DELETE:
$value = -1;
break;
case Database::EVENT_DOCUMENTS_DELETE:
$value = -1 * $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_CREATE:
$value = $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_UPSERT:
$value = $document->getAttribute('created', 0);
break;
}
switch (true) {
case $document->getCollection() === 'teams':
$queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project
break;
case $document->getCollection() === 'users':
$queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case $document->getCollection() === 'sessions': // sessions
$queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project
break;
case $document->getCollection() === 'databases': // databases
$queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value);
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$collectionInternalId = $parts[3] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_DOCUMENTS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection
break;
case $document->getCollection() === 'buckets': //buckets
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'bucket_'): // files
$parts = explode('_', $document->getCollection());
$bucketInternalId = $parts[1];
$queueForStatsUsage
->addMetric(METRIC_FILES, $value) // per project
->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket
break;
case $document->getCollection() === 'functions':
$queueForStatsUsage
->addMetric(METRIC_FUNCTIONS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'sites':
$queueForStatsUsage
->addMetric(METRIC_SITES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'deployments':
$queueForStatsUsage
->addMetric(METRIC_DEPLOYMENTS, $value) // per project
->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value);
break;
default:
break;
}
};
App::init()
->groups(['api'])
->inject('utopia')
@ -514,9 +341,6 @@ App::init()
->inject('response')
->inject('project')
->inject('user')
->inject('publisher')
->inject('publisherFunctions')
->inject('publisherWebhooks')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
@ -526,7 +350,6 @@ App::init()
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('queueForMails')
->inject('queueForMigrations')
->inject('dbForProject')
->inject('timelimit')
->inject('resourceToken')
@ -536,7 +359,7 @@ App::init()
->inject('devKey')
->inject('telemetry')
->inject('platform')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) use ($usageDatabaseListener, $eventDatabaseListener, $functionsEventsCacheListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) {
$route = $utopia->getRoute();
@ -656,32 +479,6 @@ App::init()
$queueForBuilds->setPlatform($platform);
$queueForMails->setPlatform($platform);
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisherWebhooks);
$queueForRealtime = new Realtime();
$dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener(
$project,
$document,
$response,
$queueForEventsClone->from($queueForEvents),
$queueForFunctions->from($queueForEvents),
$queueForWebhooks->from($queueForEvents),
$queueForRealtime->from($queueForEvents)
))
->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject))
->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject))
->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject))
;
$useCache = $route->getLabel('cache', false);
$storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load');
@ -845,7 +642,8 @@ App::shutdown()
->inject('queueForWebhooks')
->inject('queueForRealtime')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) {
->inject('eventProcessor')
->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, EventProcessor $eventProcessor) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -854,9 +652,15 @@ App::shutdown()
$queueForEvents->setPayload($responsePayload);
}
$queueForFunctions
->from($queueForEvents)
->trigger();
// Get project and function/webhook events (cached)
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
// Generate events for this operation
$generatedEvents = Event::generateEvents(
$queueForEvents->getEvent(),
$queueForEvents->getParams()
);
if ($project->getId() !== 'console') {
$queueForRealtime
@ -864,15 +668,28 @@ App::shutdown()
->trigger();
}
/** Trigger webhooks events only if a project has them enabled
* A future optimisation is to only trigger webhooks if the webhook is "enabled"
* But it might have performance implications on the API due to the number of webhooks etc.
* Some profiling is needed to see if this is a problem.
*/
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
// Only trigger functions if there are matching function events
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions
->from($queueForEvents)
->trigger();
break;
}
}
}
// Only trigger webhooks if there are matching webhook events
if (!empty($webhooksEvents)) {
foreach ($generatedEvents as $event) {
if (isset($webhooksEvents[$event])) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
break;
}
}
}
}

View file

@ -19,6 +19,7 @@ use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\GraphQL\Schema;
use Appwrite\Network\Cors;
use Appwrite\Network\Platform;
@ -153,6 +154,10 @@ App::setResource('queueForAudits', function (Publisher $publisher) {
App::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
App::setResource('eventProcessor', function () {
return new EventProcessor();
}, []);
App::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
@ -509,7 +514,7 @@ App::setResource('proofForCode', function (): Code {
return $code;
});
App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project) {
App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForPlatform;
}
@ -545,8 +550,210 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
->setNamespace('_' . $project->getSequence());
}
/**
* This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**.
*
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
return;
}
$queueForEvents
->setEvent('users.[userId].create')
->setParam('userId', $document->getId())
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions, webhooks, and realtime events
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger webhooks events only if a project has them enabled */
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
}
/** Trigger realtime events only for non console events */
if ($queueForEvents->getProject()->getId() !== 'console') {
$queueForRealtime
->from($queueForEvents)
->trigger();
}
};
/**
* Purge function events cache when functions are created, updated or deleted.
*/
$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) {
if ($document->getCollection() !== 'functions') {
return;
}
if ($project->isEmpty() || $project->getId() === 'console') {
return;
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$dbForProject->getCache()->purge($cacheKey);
};
$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) {
$value = 1;
switch ($event) {
case Database::EVENT_DOCUMENT_DELETE:
$value = -1;
break;
case Database::EVENT_DOCUMENTS_DELETE:
$value = -1 * $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_CREATE:
$value = $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_UPSERT:
$value = $document->getAttribute('created', 0);
break;
}
switch (true) {
case $document->getCollection() === 'teams':
$queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project
break;
case $document->getCollection() === 'users':
$queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case $document->getCollection() === 'sessions': // sessions
$queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project
break;
case $document->getCollection() === 'databases': // databases
$queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value);
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$collectionInternalId = $parts[3] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_DOCUMENTS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection
break;
case $document->getCollection() === 'buckets': //buckets
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'bucket_'): // files
$parts = explode('_', $document->getCollection());
$bucketInternalId = $parts[1];
$queueForStatsUsage
->addMetric(METRIC_FILES, $value) // per project
->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket
break;
case $document->getCollection() === 'functions':
$queueForStatsUsage
->addMetric(METRIC_FUNCTIONS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'sites':
$queueForStatsUsage
->addMetric(METRIC_SITES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'deployments':
$queueForStatsUsage
->addMetric(METRIC_DEPLOYMENTS, $value) // per project
->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value);
break;
default:
break;
}
};
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisherWebhooks);
$queueForRealtime = new Realtime();
$database
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener(
$project,
$document,
$response,
$queueForEventsClone->from($queueForEvents),
$queueForFunctions->from($queueForEvents),
$queueForWebhooks->from($queueForEvents),
$queueForRealtime->from($queueForEvents)
))
->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
;
return $database;
}, ['pools', 'dbForPlatform', 'cache', 'project']);
}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'queueForStatsUsage']);
App::setResource('dbForPlatform', function (Group $pools, Cache $cache) {
$adapter = new DatabasePool($pools->get('console'));

View file

@ -0,0 +1,106 @@
<?php
namespace Appwrite\Functions;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Query;
class EventProcessor
{
/**
* Get function events for a project, using Redis cache
* @param Document|null $project
* @param Database $dbForProject
* @return array<string, bool>
*/
public function getFunctionsEvents(?Document $project, Database $dbForProject): array
{
if ($project === null ||
$project->isEmpty() ||
$project->getId() === 'console') {
return [];
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$ttl = 3600; // 1 hour cache TTL
$cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl);
if ($cachedFunctionEvents !== false) {
return \json_decode($cachedFunctionEvents, true) ?? [];
}
try {
$events = [];
$limit = 100;
$sum = 100;
$offset = 0;
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::select(['$id', 'events']),
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('$sequence'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
foreach ($functions as $function) {
$functionEvents = $function->getAttribute('events', []);
if (!empty($functionEvents)) {
$events = array_merge($events, $functionEvents);
}
}
}
$uniqueEvents = \array_flip(\array_unique($events));
$dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents));
return $uniqueEvents;
} catch (\Throwable $e) {
return [];
}
}
/**
* Get webhook events for a project from the project's webhooks attribute
* @param Document|null $project
* @return array<string, bool>
*/
public function getWebhooksEvents(?Document $project): array
{
if ($project === null || $project->isEmpty() || $project->getId() === 'console') {
return [];
}
$webhooks = $project->getAttribute('webhooks', []);
if (empty($webhooks)) {
return [];
}
$events = [];
foreach ($webhooks as $webhook) {
if ($webhook->getAttribute('enabled', false) !== true) {
continue;
}
$webhookEvents = $webhook->getAttribute('events', []);
if (!empty($webhookEvents)) {
$events = array_merge($events, $webhookEvents);
}
}
return \array_flip(\array_unique($events));
}
}

View file

@ -7,7 +7,6 @@ use Appwrite\Platform\Action as AppwriteAction;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Operator;
use Utopia\Database\Query;
class Action extends AppwriteAction
{
@ -96,100 +95,4 @@ class Action extends AppwriteAction
return $data;
}
/**
* Get function events for a project, using Redis cache
* @param Document|null $project
* @param Database $dbForProject
* @return array<string, bool>
*/
protected function getFunctionsEvents(?Document $project, Database $dbForProject): array
{
if ($project === null ||
$project->isEmpty() ||
$project->getId() === 'console') {
return [];
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$ttl = 3600; // 1 hour cache TTL
$cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl);
if ($cachedFunctionEvents !== false) {
return \json_decode($cachedFunctionEvents, true) ?? [];
}
try {
$events = [];
$limit = 100;
$sum = 100;
$offset = 0;
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::select(['$id', 'events']),
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('$sequence'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
foreach ($functions as $function) {
$functionEvents = $function->getAttribute('events', []);
if (!empty($functionEvents)) {
$events = array_merge($events, $functionEvents);
}
}
}
$uniqueEvents = \array_flip(\array_unique($events));
$dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents));
return $uniqueEvents;
} catch (\Throwable $e) {
return [];
}
}
/**
* Get webhook events for a project from the project's webhooks attribute
* @param Document|null $project
* @return array<string, bool>
*/
protected function getWebhooksEvents(?Document $project): array
{
if ($project === null || $project->isEmpty() || $project->getId() === 'console') {
return [];
}
$webhooks = $project->getAttribute('webhooks', []);
if (empty($webhooks)) {
return [];
}
$events = [];
foreach ($webhooks as $webhook) {
if ($webhook->getAttribute('enabled', false) !== true) {
continue;
}
$webhookEvents = $webhook->getAttribute('events', []);
if (!empty($webhookEvents)) {
$events = array_merge($events, $webhookEvents);
}
}
return \array_flip(\array_unique($events));
}
}

View file

@ -4,6 +4,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction;
use Utopia\Database\Database;
use Utopia\Database\Document;
@ -349,6 +350,7 @@ abstract class Action extends DatabasesAction
* @param Event $queueForFunctions
* @param Event $queueForWebhooks
* @param Database $dbForProject
* @param EventProcessor $eventProcessor
* @return void
*/
protected function triggerBulk(
@ -360,7 +362,8 @@ abstract class Action extends DatabasesAction
Event $queueForRealtime,
Event $queueForFunctions,
Event $queueForWebhooks,
Database $dbForProject
Database $dbForProject,
EventProcessor $eventProcessor
): void {
$queueForEvents
->setEvent($event)
@ -372,8 +375,8 @@ abstract class Action extends DatabasesAction
// Get project and function events (cached)
$project = $queueForEvents->getProject();
$functionsEvents = $this->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $this->getWebhooksEvents($project);
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
foreach ($documents as $document) {
$queueForEvents
@ -391,6 +394,7 @@ abstract class Action extends DatabasesAction
$queueForEvents->getParams()
);
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -81,10 +82,11 @@ class Delete extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@ -204,7 +206,8 @@ class Delete extends Action
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks,
$dbForProject
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -85,10 +86,11 @@ class Update extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@ -235,7 +237,8 @@ class Update extends Action
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks,
$dbForProject
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -83,10 +84,11 @@ class Upsert extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@ -210,7 +212,8 @@ class Upsert extends Action
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks,
$dbForProject
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Deprecated;
@ -132,9 +133,10 @@ class Create extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@ -492,7 +494,8 @@ class Create extends Action
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks,
$dbForProject
$dbForProject,
$eventProcessor
);
return;
}

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
@ -76,6 +77,7 @@ class Update extends Action
->inject('queueForRealtime')
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('eventProcessor')
->callback($this->action(...));
}
@ -93,6 +95,7 @@ class Update extends Action
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param Event $queueForWebhooks
* @param EventProcessor $eventProcessor
* @return void
* @throws ConflictException
* @throws Exception
@ -102,7 +105,7 @@ class Update extends Action
* @throws Structure
* @throws \Utopia\Exception
*/
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, EventProcessor $eventProcessor): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
@ -372,8 +375,8 @@ class Update extends Action
// Get project and function/webhook events (cached)
$project = $queueForEvents->getProject();
$functionsEvents = $this->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $this->getWebhooksEvents($project);
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
foreach ($documentsToTrigger as $doc) {
$payload = $doc->getArrayCopy();

View file

@ -66,6 +66,7 @@ class Delete extends DocumentsDelete
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -68,6 +68,7 @@ class Update extends DocumentsUpdate
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -68,6 +68,7 @@ class Upsert extends DocumentsUpsert
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -111,6 +111,7 @@ class Create extends DocumentCreate
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
}