From db4dcd164e67fabb2e3be20b0a26cd9d974575d8 Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 7 Jan 2026 16:57:57 +0200 Subject: [PATCH] refactor: integrate EventProcessor for handling function and webhook events; streamline event triggering in database actions --- app/controllers/shared/api.php | 253 +++--------------- app/init/resources.php | 211 ++++++++++++++- src/Appwrite/Functions/EventProcessor.php | 106 ++++++++ .../Databases/Http/Databases/Action.php | 97 ------- .../Collections/Documents/Action.php | 10 +- .../Collections/Documents/Bulk/Delete.php | 7 +- .../Collections/Documents/Bulk/Update.php | 7 +- .../Collections/Documents/Bulk/Upsert.php | 7 +- .../Collections/Documents/Create.php | 7 +- .../Http/Databases/Transactions/Update.php | 9 +- .../Http/TablesDB/Tables/Rows/Bulk/Delete.php | 1 + .../Http/TablesDB/Tables/Rows/Bulk/Update.php | 1 + .../Http/TablesDB/Tables/Rows/Bulk/Upsert.php | 1 + .../Http/TablesDB/Tables/Rows/Create.php | 1 + 14 files changed, 387 insertions(+), 331 deletions(-) create mode 100644 src/Appwrite/Functions/EventProcessor.php diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index fe7dd7ce9b..a7d5478920 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -10,12 +10,12 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\Method; use Appwrite\Utopia\Database\Documents\User; use Appwrite\Utopia\Request; @@ -30,7 +30,6 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; -use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; @@ -74,178 +73,6 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -/** - * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. - * - * Accounts can be created in many ways beyond `createAccount` - * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. - */ -$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { - // Only trigger events for user creation with the database listener. - if ($document->getCollection() !== 'users') { - return; - } - - $queueForEvents - ->setEvent('users.[userId].create') - ->setParam('userId', $document->getId()) - ->setPayload($response->output($document, Response::MODEL_USER)); - - // Trigger functions, webhooks, and realtime events - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - - /** Trigger webhooks events only if a project has them enabled */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); - } - - /** Trigger realtime events only for non console events */ - if ($queueForEvents->getProject()->getId() !== 'console') { - $queueForRealtime - ->from($queueForEvents) - ->trigger(); - } -}; - -/** - * Purge function events cache when functions are created, updated or deleted. - */ -$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { - - - if ($document->getCollection() !== 'functions') { - return; - } - - if ($project->isEmpty() || $project->getId() === 'console') { - return; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functions:events', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $dbForProject->getCache()->purge($cacheKey); -}; - -$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { - $value = 1; - - switch ($event) { - case Database::EVENT_DOCUMENT_DELETE: - $value = -1; - break; - case Database::EVENT_DOCUMENTS_DELETE: - $value = -1 * $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_CREATE: - $value = $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_UPSERT: - $value = $document->getAttribute('created', 0); - break; - } - - switch (true) { - case $document->getCollection() === 'teams': - $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project - break; - case $document->getCollection() === 'users': - $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case $document->getCollection() === 'sessions': // sessions - $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project - break; - case $document->getCollection() === 'databases': // databases - $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_COLLECTIONS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $collectionInternalId = $parts[3] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_DOCUMENTS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database - ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection - break; - case $document->getCollection() === 'buckets': //buckets - $queueForStatsUsage - ->addMetric(METRIC_BUCKETS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'bucket_'): // files - $parts = explode('_', $document->getCollection()); - $bucketInternalId = $parts[1]; - $queueForStatsUsage - ->addMetric(METRIC_FILES, $value) // per project - ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket - break; - case $document->getCollection() === 'functions': - $queueForStatsUsage - ->addMetric(METRIC_FUNCTIONS, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'sites': - $queueForStatsUsage - ->addMetric(METRIC_SITES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'deployments': - $queueForStatsUsage - ->addMetric(METRIC_DEPLOYMENTS, $value) // per project - ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); - break; - default: - break; - } -}; - App::init() ->groups(['api']) ->inject('utopia') @@ -514,9 +341,6 @@ App::init() ->inject('response') ->inject('project') ->inject('user') - ->inject('publisher') - ->inject('publisherFunctions') - ->inject('publisherWebhooks') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -526,7 +350,6 @@ App::init() ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('queueForMails') - ->inject('queueForMigrations') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -536,7 +359,7 @@ App::init() ->inject('devKey') ->inject('telemetry') ->inject('platform') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) use ($usageDatabaseListener, $eventDatabaseListener, $functionsEventsCacheListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) { $route = $utopia->getRoute(); @@ -656,32 +479,6 @@ App::init() $queueForBuilds->setPlatform($platform); $queueForMails->setPlatform($platform); - // Clone the queues, to prevent events triggered by the database listener - // from overwriting the events that are supposed to be triggered in the shutdown hook. - $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisherFunctions); - $queueForWebhooks = new Webhook($publisherWebhooks); - $queueForRealtime = new Realtime(); - - $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( - $project, - $document, - $response, - $queueForEventsClone->from($queueForEvents), - $queueForFunctions->from($queueForEvents), - $queueForWebhooks->from($queueForEvents), - $queueForRealtime->from($queueForEvents) - )) - ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ; $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); @@ -845,7 +642,8 @@ App::shutdown() ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) { + ->inject('eventProcessor') + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -854,9 +652,15 @@ App::shutdown() $queueForEvents->setPayload($responsePayload); } - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + // Get project and function/webhook events (cached) + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); + + // Generate events for this operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); if ($project->getId() !== 'console') { $queueForRealtime @@ -864,15 +668,28 @@ App::shutdown() ->trigger(); } - /** Trigger webhooks events only if a project has them enabled - * A future optimisation is to only trigger webhooks if the webhook is "enabled" - * But it might have performance implications on the API due to the number of webhooks etc. - * Some profiling is needed to see if this is a problem. - */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); + // Only trigger functions if there are matching function events + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + break; + } + } + } + + // Only trigger webhooks if there are matching webhook events + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + break; + } + } } } diff --git a/app/init/resources.php b/app/init/resources.php index d56354c14b..44234425a4 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -19,6 +19,7 @@ use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\GraphQL\Schema; use Appwrite\Network\Cors; use Appwrite\Network\Platform; @@ -153,6 +154,10 @@ App::setResource('queueForAudits', function (Publisher $publisher) { App::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); + +App::setResource('eventProcessor', function () { + return new EventProcessor(); +}, []); App::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); @@ -509,7 +514,7 @@ App::setResource('proofForCode', function (): Code { return $code; }); -App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project) { +App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForPlatform; } @@ -545,8 +550,210 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform ->setNamespace('_' . $project->getSequence()); } + /** + * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. + * + * Accounts can be created in many ways beyond `createAccount` + * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. + */ + $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + // Only trigger events for user creation with the database listener. + if ($document->getCollection() !== 'users') { + return; + } + + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)); + + // Trigger functions, webhooks, and realtime events + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + + /** Trigger webhooks events only if a project has them enabled */ + if (!empty($project->getAttribute('webhooks'))) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + } + + /** Trigger realtime events only for non console events */ + if ($queueForEvents->getProject()->getId() !== 'console') { + $queueForRealtime + ->from($queueForEvents) + ->trigger(); + } + }; + + /** + * Purge function events cache when functions are created, updated or deleted. + */ + $functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { + + + if ($document->getCollection() !== 'functions') { + return; + } + + if ($project->isEmpty() || $project->getId() === 'console') { + return; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $dbForProject->getCache()->purge($cacheKey); + }; + + $usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { + $value = 1; + + switch ($event) { + case Database::EVENT_DOCUMENT_DELETE: + $value = -1; + break; + case Database::EVENT_DOCUMENTS_DELETE: + $value = -1 * $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_CREATE: + $value = $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_UPSERT: + $value = $document->getAttribute('created', 0); + break; + } + + switch (true) { + case $document->getCollection() === 'teams': + $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project + break; + case $document->getCollection() === 'users': + $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case $document->getCollection() === 'sessions': // sessions + $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project + break; + case $document->getCollection() === 'databases': // databases + $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_COLLECTIONS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $collectionInternalId = $parts[3] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_DOCUMENTS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database + ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection + break; + case $document->getCollection() === 'buckets': //buckets + $queueForStatsUsage + ->addMetric(METRIC_BUCKETS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'bucket_'): // files + $parts = explode('_', $document->getCollection()); + $bucketInternalId = $parts[1]; + $queueForStatsUsage + ->addMetric(METRIC_FILES, $value) // per project + ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket + break; + case $document->getCollection() === 'functions': + $queueForStatsUsage + ->addMetric(METRIC_FUNCTIONS, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'sites': + $queueForStatsUsage + ->addMetric(METRIC_SITES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'deployments': + $queueForStatsUsage + ->addMetric(METRIC_DEPLOYMENTS, $value) // per project + ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); + break; + default: + break; + } + }; + + // Clone the queues, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. + $queueForEventsClone = new Event($publisher); + $queueForFunctions = new Func($publisherFunctions); + $queueForWebhooks = new Webhook($publisherWebhooks); + $queueForRealtime = new Realtime(); + + + $database + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( + $project, + $document, + $response, + $queueForEventsClone->from($queueForEvents), + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) + )) + ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ; + + return $database; -}, ['pools', 'dbForPlatform', 'cache', 'project']); + +}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'queueForStatsUsage']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { $adapter = new DatabasePool($pools->get('console')); diff --git a/src/Appwrite/Functions/EventProcessor.php b/src/Appwrite/Functions/EventProcessor.php new file mode 100644 index 0000000000..8ed841d30d --- /dev/null +++ b/src/Appwrite/Functions/EventProcessor.php @@ -0,0 +1,106 @@ + + */ + public function getFunctionsEvents(?Document $project, Database $dbForProject): array + { + if ($project === null || + $project->isEmpty() || + $project->getId() === 'console') { + return []; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $ttl = 3600; // 1 hour cache TTL + $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); + + if ($cachedFunctionEvents !== false) { + return \json_decode($cachedFunctionEvents, true) ?? []; + } + + try { + $events = []; + $limit = 100; + $sum = 100; + $offset = 0; + + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::select(['$id', 'events']), + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('$sequence'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + foreach ($functions as $function) { + $functionEvents = $function->getAttribute('events', []); + if (!empty($functionEvents)) { + $events = array_merge($events, $functionEvents); + } + } + } + + $uniqueEvents = \array_flip(\array_unique($events)); + $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); + + return $uniqueEvents; + } catch (\Throwable $e) { + return []; + } + } + + /** + * Get webhook events for a project from the project's webhooks attribute + * @param Document|null $project + * @return array + */ + public function getWebhooksEvents(?Document $project): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $webhooks = $project->getAttribute('webhooks', []); + if (empty($webhooks)) { + return []; + } + + $events = []; + foreach ($webhooks as $webhook) { + if ($webhook->getAttribute('enabled', false) !== true) { + continue; + } + + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return \array_flip(\array_unique($events)); + } +} diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php index 5c85ac1de6..8a3d178bde 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php @@ -7,7 +7,6 @@ use Appwrite\Platform\Action as AppwriteAction; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Operator; -use Utopia\Database\Query; class Action extends AppwriteAction { @@ -96,100 +95,4 @@ class Action extends AppwriteAction return $data; } - /** - * Get function events for a project, using Redis cache - * @param Document|null $project - * @param Database $dbForProject - * @return array - */ - protected function getFunctionsEvents(?Document $project, Database $dbForProject): array - { - if ($project === null || - $project->isEmpty() || - $project->getId() === 'console') { - return []; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functions:events', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $ttl = 3600; // 1 hour cache TTL - $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); - - if ($cachedFunctionEvents !== false) { - return \json_decode($cachedFunctionEvents, true) ?? []; - - } - - try { - $events = []; - $limit = 100; - $sum = 100; - $offset = 0; - - while ($sum >= $limit) { - $functions = $dbForProject->find('functions', [ - Query::select(['$id', 'events']), - Query::limit($limit), - Query::offset($offset), - Query::orderAsc('$sequence'), - ]); - - $sum = \count($functions); - $offset = $offset + $limit; - - foreach ($functions as $function) { - $functionEvents = $function->getAttribute('events', []); - if (!empty($functionEvents)) { - $events = array_merge($events, $functionEvents); - } - } - } - - $uniqueEvents = \array_flip(\array_unique($events)); - $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); - - return $uniqueEvents; - } catch (\Throwable $e) { - return []; - } - } - - /** - * Get webhook events for a project from the project's webhooks attribute - * @param Document|null $project - * @return array - */ - protected function getWebhooksEvents(?Document $project): array - { - if ($project === null || $project->isEmpty() || $project->getId() === 'console') { - return []; - } - - $webhooks = $project->getAttribute('webhooks', []); - if (empty($webhooks)) { - return []; - } - - $events = []; - foreach ($webhooks as $webhook) { - if ($webhook->getAttribute('enabled', false) !== true) { - continue; - } - - $webhookEvents = $webhook->getAttribute('events', []); - if (!empty($webhookEvents)) { - $events = array_merge($events, $webhookEvents); - } - } - - return \array_flip(\array_unique($events)); - } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index f154372983..b4ed8adbaf 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; use Utopia\Database\Database; use Utopia\Database\Document; @@ -349,6 +350,7 @@ abstract class Action extends DatabasesAction * @param Event $queueForFunctions * @param Event $queueForWebhooks * @param Database $dbForProject + * @param EventProcessor $eventProcessor * @return void */ protected function triggerBulk( @@ -360,7 +362,8 @@ abstract class Action extends DatabasesAction Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, - Database $dbForProject + Database $dbForProject, + EventProcessor $eventProcessor ): void { $queueForEvents ->setEvent($event) @@ -372,8 +375,8 @@ abstract class Action extends DatabasesAction // Get project and function events (cached) $project = $queueForEvents->getProject(); - $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); - $webhooksEvents = $this->getWebhooksEvents($project); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); foreach ($documents as $document) { $queueForEvents @@ -391,6 +394,7 @@ abstract class Action extends DatabasesAction $queueForEvents->getParams() ); + if (!empty($functionsEvents)) { foreach ($generatedEvents as $event) { if (isset($functionsEvents[$event])) { diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php index e3ba6a37e7..a3d3535065 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -81,10 +82,11 @@ class Delete extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -204,7 +206,8 @@ class Delete extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php index 892ab7f0da..fdc49b1901 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -85,10 +86,11 @@ class Update extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -235,7 +237,8 @@ class Update extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index c15a8b94ae..00e4663171 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -83,10 +84,11 @@ class Upsert extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -210,7 +212,8 @@ class Upsert extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index 60a1f66f36..e32ad29cce 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Deprecated; @@ -132,9 +133,10 @@ class Create extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -492,7 +494,8 @@ class Create extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); return; } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index 30f4a7e05c..ee77f2e578 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -7,6 +7,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; @@ -76,6 +77,7 @@ class Update extends Action ->inject('queueForRealtime') ->inject('queueForFunctions') ->inject('queueForWebhooks') + ->inject('eventProcessor') ->callback($this->action(...)); } @@ -93,6 +95,7 @@ class Update extends Action * @param Event $queueForRealtime * @param Event $queueForFunctions * @param Event $queueForWebhooks + * @param EventProcessor $eventProcessor * @return void * @throws ConflictException * @throws Exception @@ -102,7 +105,7 @@ class Update extends Action * @throws Structure * @throws \Utopia\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void + public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, EventProcessor $eventProcessor): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -372,8 +375,8 @@ class Update extends Action // Get project and function/webhook events (cached) $project = $queueForEvents->getProject(); - $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); - $webhooksEvents = $this->getWebhooksEvents($project); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); foreach ($documentsToTrigger as $doc) { $payload = $doc->getArrayCopy(); diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php index accb0392fe..45e5b84774 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php @@ -66,6 +66,7 @@ class Delete extends DocumentsDelete ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php index fea59b8b13..3062186624 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php @@ -68,6 +68,7 @@ class Update extends DocumentsUpdate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php index 492af25e9f..3f837917c8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php @@ -68,6 +68,7 @@ class Upsert extends DocumentsUpsert ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php index b5491a593b..b610be3ea4 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php @@ -111,6 +111,7 @@ class Create extends DocumentCreate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } }