diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 23bbb12183..fffe544330 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -10,12 +10,12 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\Method; use Appwrite\Utopia\Database\Documents\User; use Appwrite\Utopia\Request; @@ -31,7 +31,6 @@ use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\Authorization\Input; -use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; @@ -75,151 +74,6 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -/** - * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. - * - * Accounts can be created in many ways beyond `createAccount` - * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. - */ -$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { - // Only trigger events for user creation with the database listener. - if ($document->getCollection() !== 'users') { - return; - } - - $queueForEvents - ->setEvent('users.[userId].create') - ->setParam('userId', $document->getId()) - ->setPayload($response->output($document, Response::MODEL_USER)); - - // Trigger functions, webhooks, and realtime events - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - - /** Trigger webhooks events only if a project has them enabled */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); - } - - /** Trigger realtime events only for non console events */ - if ($queueForEvents->getProject()->getId() !== 'console') { - $queueForRealtime - ->from($queueForEvents) - ->trigger(); - } -}; - -$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { - $value = 1; - - switch ($event) { - case Database::EVENT_DOCUMENT_DELETE: - $value = -1; - break; - case Database::EVENT_DOCUMENTS_DELETE: - $value = -1 * $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_CREATE: - $value = $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_UPSERT: - $value = $document->getAttribute('created', 0); - break; - } - - switch (true) { - case $document->getCollection() === 'teams': - $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project - break; - case $document->getCollection() === 'users': - $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case $document->getCollection() === 'sessions': // sessions - $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project - break; - case $document->getCollection() === 'databases': // databases - $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_COLLECTIONS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $collectionInternalId = $parts[3] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_DOCUMENTS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database - ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection - break; - case $document->getCollection() === 'buckets': //buckets - $queueForStatsUsage - ->addMetric(METRIC_BUCKETS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'bucket_'): // files - $parts = explode('_', $document->getCollection()); - $bucketInternalId = $parts[1]; - $queueForStatsUsage - ->addMetric(METRIC_FILES, $value) // per project - ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket - break; - case $document->getCollection() === 'functions': - $queueForStatsUsage - ->addMetric(METRIC_FUNCTIONS, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'sites': - $queueForStatsUsage - ->addMetric(METRIC_SITES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'deployments': - $queueForStatsUsage - ->addMetric(METRIC_DEPLOYMENTS, $value) // per project - ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); - break; - default: - break; - } -}; - App::init() ->groups(['api']) ->inject('utopia') @@ -489,9 +343,6 @@ App::init() ->inject('response') ->inject('project') ->inject('user') - ->inject('publisher') - ->inject('publisherFunctions') - ->inject('publisherWebhooks') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -501,7 +352,6 @@ App::init() ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('queueForMails') - ->inject('queueForMigrations') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -512,7 +362,7 @@ App::init() ->inject('telemetry') ->inject('platform') ->inject('authorization') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) { $route = $utopia->getRoute(); @@ -632,28 +482,6 @@ App::init() $queueForBuilds->setPlatform($platform); $queueForMails->setPlatform($platform); - // Clone the queues, to prevent events triggered by the database listener - // from overwriting the events that are supposed to be triggered in the shutdown hook. - $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisherFunctions); - $queueForWebhooks = new Webhook($publisherWebhooks); - $queueForRealtime = new Realtime(); - - $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( - $project, - $document, - $response, - $queueForEventsClone->from($queueForEvents), - $queueForFunctions->from($queueForEvents), - $queueForWebhooks->from($queueForEvents), - $queueForRealtime->from($queueForEvents) - )); $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); @@ -818,7 +646,8 @@ App::shutdown() ->inject('dbForProject') ->inject('authorization') ->inject('timelimit') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit) use ($parseLabel) { + ->inject('eventProcessor') + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -827,9 +656,15 @@ App::shutdown() $queueForEvents->setPayload($responsePayload); } - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + // Get project and function/webhook events (cached) + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); + + // Generate events for this operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); if ($project->getId() !== 'console') { $queueForRealtime @@ -837,15 +672,28 @@ App::shutdown() ->trigger(); } - /** Trigger webhooks events only if a project has them enabled - * A future optimisation is to only trigger webhooks if the webhook is "enabled" - * But it might have performance implications on the API due to the number of webhooks etc. - * Some profiling is needed to see if this is a problem. - */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); + // Only trigger functions if there are matching function events + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + break; + } + } + } + + // Only trigger webhooks if there are matching webhook events + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + break; + } + } } } diff --git a/app/init/resources.php b/app/init/resources.php index 2f43ee008b..fc0e0152d0 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -20,6 +20,7 @@ use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\GraphQL\Schema; use Appwrite\Network\Cors; use Appwrite\Network\Platform; @@ -157,6 +158,9 @@ App::setResource('queueForAudits', function (Publisher $publisher) { App::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); +App::setResource('eventProcessor', function () { + return new EventProcessor(); +}, []); App::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); @@ -517,7 +521,7 @@ App::setResource('authorization', function () { return new Authorization(); }, []); -App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Authorization $authorization) { +App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Authorization $authorization) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForPlatform; } @@ -555,8 +559,209 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform ->setNamespace('_' . $project->getSequence()); } + /** + * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. + * + * Accounts can be created in many ways beyond `createAccount` + * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. + */ + $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + // Only trigger events for user creation with the database listener. + if ($document->getCollection() !== 'users') { + return; + } + + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)); + + // Trigger functions, webhooks, and realtime events + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + + /** Trigger webhooks events only if a project has them enabled */ + if (!empty($project->getAttribute('webhooks'))) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + } + + /** Trigger realtime events only for non console events */ + if ($queueForEvents->getProject()->getId() !== 'console') { + $queueForRealtime + ->from($queueForEvents) + ->trigger(); + } + }; + + /** + * Purge function events cache when functions are created, updated or deleted. + */ + $functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { + + + if ($document->getCollection() !== 'functions') { + return; + } + + if ($project->isEmpty() || $project->getId() === 'console') { + return; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $dbForProject->getCache()->purge($cacheKey); + }; + + $usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { + $value = 1; + + switch ($event) { + case Database::EVENT_DOCUMENT_DELETE: + $value = -1; + break; + case Database::EVENT_DOCUMENTS_DELETE: + $value = -1 * $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_CREATE: + $value = $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_UPSERT: + $value = $document->getAttribute('created', 0); + break; + } + + switch (true) { + case $document->getCollection() === 'teams': + $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project + break; + case $document->getCollection() === 'users': + $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case $document->getCollection() === 'sessions': // sessions + $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project + break; + case $document->getCollection() === 'databases': // databases + $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_COLLECTIONS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $collectionInternalId = $parts[3] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_DOCUMENTS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database + ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection + break; + case $document->getCollection() === 'buckets': //buckets + $queueForStatsUsage + ->addMetric(METRIC_BUCKETS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'bucket_'): // files + $parts = explode('_', $document->getCollection()); + $bucketInternalId = $parts[1]; + $queueForStatsUsage + ->addMetric(METRIC_FILES, $value) // per project + ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket + break; + case $document->getCollection() === 'functions': + $queueForStatsUsage + ->addMetric(METRIC_FUNCTIONS, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'sites': + $queueForStatsUsage + ->addMetric(METRIC_SITES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'deployments': + $queueForStatsUsage + ->addMetric(METRIC_DEPLOYMENTS, $value) // per project + ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); + break; + default: + break; + } + }; + + // Clone the queues, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. + $queueForEventsClone = new Event($publisher); + $queueForFunctions = new Func($publisherFunctions); + $queueForWebhooks = new Webhook($publisherWebhooks); + $queueForRealtime = new Realtime(); + + + $database + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( + $project, + $document, + $response, + $queueForEventsClone->from($queueForEvents), + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) + )) + ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ; + + return $database; -}, ['pools', 'dbForPlatform', 'cache', 'project', 'authorization']); +}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'queueForStatsUsage', 'authorization']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { diff --git a/composer.lock b/composer.lock index 8b60305dff..96b945d0d2 100644 --- a/composer.lock +++ b/composer.lock @@ -1365,16 +1365,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.3.3", + "version": "1.3.4", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "07b02bc71838463f6edcc78d3485c04b48fb263d" + "reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/07b02bc71838463f6edcc78d3485c04b48fb263d", - "reference": "07b02bc71838463f6edcc78d3485c04b48fb263d", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/62e680d587beb42e5247aa6ecd89ad1ca406e8ca", + "reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca", "shasum": "" }, "require": { @@ -1425,7 +1425,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-11-13T08:04:37+00:00" + "time": "2026-01-15T09:31:34+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -1492,16 +1492,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.10.0", + "version": "1.11.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99" + "reference": "d91f21addcdb42da9a451c002777f8318432461a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99", - "reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/d91f21addcdb42da9a451c002777f8318432461a", + "reference": "d91f21addcdb42da9a451c002777f8318432461a", "shasum": "" }, "require": { @@ -1585,7 +1585,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-11-25T10:59:15+00:00" + "time": "2026-01-15T11:21:03+00:00" }, { "name": "open-telemetry/sem-conv", @@ -4516,16 +4516,16 @@ }, { "name": "utopia-php/migration", - "version": "1.4.3", + "version": "1.4.4", "source": { "type": "git", "url": "https://github.com/utopia-php/migration.git", - "reference": "52ca4234d8229b68e27e052248734a08784d9d3d" + "reference": "3fe751902012d09d323420cd3523be1ed855e868" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/migration/zipball/52ca4234d8229b68e27e052248734a08784d9d3d", - "reference": "52ca4234d8229b68e27e052248734a08784d9d3d", + "url": "https://api.github.com/repos/utopia-php/migration/zipball/3fe751902012d09d323420cd3523be1ed855e868", + "reference": "3fe751902012d09d323420cd3523be1ed855e868", "shasum": "" }, "require": { @@ -4565,9 +4565,9 @@ ], "support": { "issues": "https://github.com/utopia-php/migration/issues", - "source": "https://github.com/utopia-php/migration/tree/1.4.3" + "source": "https://github.com/utopia-php/migration/tree/1.4.4" }, - "time": "2026-01-13T09:51:08+00:00" + "time": "2026-01-16T10:00:07+00:00" }, { "name": "utopia-php/mongo", @@ -8988,7 +8988,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": {}, + "stability-flags": [], "prefer-stable": false, "prefer-lowest": false, "platform": { @@ -9012,5 +9012,5 @@ "platform-overrides": { "php": "8.3" }, - "plugin-api-version": "2.6.0" + "plugin-api-version": "2.2.0" } diff --git a/src/Appwrite/Functions/EventProcessor.php b/src/Appwrite/Functions/EventProcessor.php new file mode 100644 index 0000000000..8ed841d30d --- /dev/null +++ b/src/Appwrite/Functions/EventProcessor.php @@ -0,0 +1,106 @@ + + */ + public function getFunctionsEvents(?Document $project, Database $dbForProject): array + { + if ($project === null || + $project->isEmpty() || + $project->getId() === 'console') { + return []; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $ttl = 3600; // 1 hour cache TTL + $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); + + if ($cachedFunctionEvents !== false) { + return \json_decode($cachedFunctionEvents, true) ?? []; + } + + try { + $events = []; + $limit = 100; + $sum = 100; + $offset = 0; + + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::select(['$id', 'events']), + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('$sequence'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + foreach ($functions as $function) { + $functionEvents = $function->getAttribute('events', []); + if (!empty($functionEvents)) { + $events = array_merge($events, $functionEvents); + } + } + } + + $uniqueEvents = \array_flip(\array_unique($events)); + $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); + + return $uniqueEvents; + } catch (\Throwable $e) { + return []; + } + } + + /** + * Get webhook events for a project from the project's webhooks attribute + * @param Document|null $project + * @return array + */ + public function getWebhooksEvents(?Document $project): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $webhooks = $project->getAttribute('webhooks', []); + if (empty($webhooks)) { + return []; + } + + $events = []; + foreach ($webhooks as $webhook) { + if ($webhook->getAttribute('enabled', false) !== true) { + continue; + } + + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return \array_flip(\array_unique($events)); + } +} diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index ec65135a05..4df180ee0b 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; use Utopia\Database\Database; use Utopia\Database\Document; @@ -349,6 +350,8 @@ abstract class Action extends DatabasesAction * @param Event $queueForRealtime * @param Event $queueForFunctions * @param Event $queueForWebhooks + * @param Database $dbForProject + * @param EventProcessor $eventProcessor * @return void */ protected function triggerBulk( @@ -359,7 +362,9 @@ abstract class Action extends DatabasesAction Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, - Event $queueForWebhooks + Event $queueForWebhooks, + Database $dbForProject, + EventProcessor $eventProcessor ): void { $queueForEvents ->setEvent($event) @@ -369,6 +374,11 @@ abstract class Action extends DatabasesAction ->setParam('tableId', $collection->getId()) ->setContext($this->getCollectionsEventsContext(), $collection); + // Get project and function events (cached) + $project = $queueForEvents->getProject(); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); + foreach ($documents as $document) { $queueForEvents ->setParam('documentId', $document->getId()) @@ -379,14 +389,33 @@ abstract class Action extends DatabasesAction ->from($queueForEvents) ->trigger(); - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + // Generate events for this document operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); - if (!empty($queueForEvents->getProject()?->getAttribute('webhooks', []))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); + + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + break; + } + } + } + + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + break; + } + } } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php index 070ee09450..a3d3535065 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -81,10 +82,11 @@ class Delete extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -203,7 +205,9 @@ class Delete extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php index 192b10c956..fdc49b1901 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -85,10 +86,11 @@ class Update extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -234,7 +236,9 @@ class Update extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index 45db6cc96b..00e4663171 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -83,10 +84,11 @@ class Upsert extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -209,7 +211,9 @@ class Upsert extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index bbc63da499..15811db92e 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Deprecated; @@ -134,9 +135,10 @@ class Create extends Action ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization): void + public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -498,7 +500,9 @@ class Create extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject, + $eventProcessor ); return; } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index e4f1051464..eca92da313 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -7,6 +7,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; @@ -77,6 +78,7 @@ class Update extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } @@ -94,6 +96,7 @@ class Update extends Action * @param Event $queueForRealtime * @param Event $queueForFunctions * @param Event $queueForWebhooks + * @param EventProcessor $eventProcessor * @return void * @throws ConflictException * @throws Exception @@ -103,7 +106,7 @@ class Update extends Action * @throws Structure * @throws \Utopia\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization): void + public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -371,6 +374,11 @@ class Update extends Action $queueForEvents->setEvent($eventString); + // Get project and function/webhook events (cached) + $project = $queueForEvents->getProject(); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); + foreach ($documentsToTrigger as $doc) { $payload = $doc->getArrayCopy(); $payload['$tableId'] = $collection->getId(); @@ -381,9 +389,33 @@ class Update extends Action ->setParam('rowId', $doc->getId()) ->setPayload($payload); + // Generate events for this document operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); + $queueForRealtime->from($queueForEvents)->trigger(); - $queueForFunctions->from($queueForEvents)->trigger(); - $queueForWebhooks->from($queueForEvents)->trigger(); + + // Only trigger functions if there are matching function events + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions->from($queueForEvents)->trigger(); + break; + } + } + } + + // Only trigger webhooks if there are matching webhook events + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks->from($queueForEvents)->trigger(); + break; + } + } + } } $queueForEvents->reset(); diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php index b9896d282d..45e5b84774 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php @@ -66,7 +66,7 @@ class Delete extends DocumentsDelete ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php index f4ccea1698..3062186624 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php @@ -68,7 +68,7 @@ class Update extends DocumentsUpdate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php index 69a687d92f..3f837917c8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php @@ -68,7 +68,7 @@ class Upsert extends DocumentsUpsert ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php index c70ed71378..0f28a561ed 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php @@ -112,6 +112,7 @@ class Create extends DocumentCreate ->inject('queueForWebhooks') ->inject('plan') ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php index 8be28ce9f7..807e96800e 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php @@ -61,6 +61,7 @@ class Update extends TransactionsUpdate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('authorization') + ->inject('eventProcessor') ->callback($this->action(...)); } }