From 6c1f9675099ac039c4797991b3f5b9d6b93a1dad Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 28 Dec 2025 18:10:44 +0200 Subject: [PATCH 01/15] add functionsEvents and webhooksEvents --- app/config/collections/platform.php | 22 +++++++++++ app/init/database/filters.php | 38 +++++++++++++++++++ .../Collections/Documents/Action.php | 18 +++++++-- 3 files changed, 74 insertions(+), 4 deletions(-) diff --git a/app/config/collections/platform.php b/app/config/collections/platform.php index d44d9b725c..9f46d5e8c7 100644 --- a/app/config/collections/platform.php +++ b/app/config/collections/platform.php @@ -276,6 +276,28 @@ return [ 'array' => false, 'filters' => ['subQueryWebhooks'], ], + [ + '$id' => ID::custom('webhookEvents'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 16384, + 'signed' => true, + 'required' => false, + 'default' => [], + 'array' => true, + 'filters' => ['subQueryWebhookEvents'], + ], + [ + '$id' => ID::custom('functionEvents'), + 'type' => Database::VAR_STRING, + 'format' => '', + 'size' => 16384, + 'signed' => true, + 'required' => false, + 'default' => [], + 'array' => true, + 'filters' => ['subQueryFunctionEvents'], + ], [ '$id' => ID::custom('keys'), 'type' => Database::VAR_STRING, diff --git a/app/init/database/filters.php b/app/init/database/filters.php index c4cfd1ac81..ef40e55379 100644 --- a/app/init/database/filters.php +++ b/app/init/database/filters.php @@ -170,6 +170,44 @@ Database::addFilter( } ); +Database::addFilter( + 'subQueryWebhookEvents', + function (mixed $value) { + return; + }, + function (mixed $value, Document $document, Database $database) { + $webhooks = $database + ->find('webhooks', [ + Query::equal('projectInternalId', [$document->getSequence()]), + Query::limit(APP_LIMIT_SUBQUERY), + ]); + + $events = []; + foreach ($webhooks as $webhook) { + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return array_unique($events); + } +); + +Database::addFilter( + 'subQueryFunctionEvents', + function (mixed $value) { + return; + }, + function (mixed $value, Document $document, Database $database) { + // Functions are stored in the project database, not platform database + // This filter will return empty array when called from platform DB + // Function events will need to be computed separately when dbForProject is available + // For now, return empty to avoid errors + return []; + } +); + Database::addFilter( 'subQuerySessions', function (mixed $value) { diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index f16d00998d..a0cb5c20f5 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -378,11 +378,21 @@ abstract class Action extends DatabasesAction ->from($queueForEvents) ->trigger(); - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + $project = $queueForEvents->getProject(); + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); - if (!empty($queueForEvents->getProject()?->getAttribute('webhooks', []))) { + $functionEvents = $project?->getAttribute('functionEvents', []); + if (!empty($functionEvents) && !empty(array_intersect($functionEvents, $generatedEvents))) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + } + + $webhookEvents = $project?->getAttribute('webhookEvents', []); + if (!empty($webhookEvents) && !empty(array_intersect($webhookEvents, $generatedEvents))) { $queueForWebhooks ->from($queueForEvents) ->trigger(); From e9dac6710f05dd951b4a05e4c84e91b80cc6177b Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 4 Jan 2026 09:53:29 +0200 Subject: [PATCH 02/15] Refactor: Remove unused webhook and function event filters, implement caching for function events retrieval --- app/config/collections/platform.php | 22 ---- app/init/database/filters.php | 38 ------- .../Platform/Modules/Compute/Base.php | 25 +++++ .../Collections/Documents/Action.php | 100 +++++++++++++++++- .../Collections/Documents/Bulk/Delete.php | 3 +- .../Collections/Documents/Bulk/Update.php | 3 +- .../Collections/Documents/Bulk/Upsert.php | 3 +- .../Collections/Documents/Create.php | 3 +- .../Functions/Http/Functions/Delete.php | 6 ++ .../Functions/Http/Functions/Update.php | 3 + 10 files changed, 138 insertions(+), 68 deletions(-) diff --git a/app/config/collections/platform.php b/app/config/collections/platform.php index 9f46d5e8c7..d44d9b725c 100644 --- a/app/config/collections/platform.php +++ b/app/config/collections/platform.php @@ -276,28 +276,6 @@ return [ 'array' => false, 'filters' => ['subQueryWebhooks'], ], - [ - '$id' => ID::custom('webhookEvents'), - 'type' => Database::VAR_STRING, - 'format' => '', - 'size' => 16384, - 'signed' => true, - 'required' => false, - 'default' => [], - 'array' => true, - 'filters' => ['subQueryWebhookEvents'], - ], - [ - '$id' => ID::custom('functionEvents'), - 'type' => Database::VAR_STRING, - 'format' => '', - 'size' => 16384, - 'signed' => true, - 'required' => false, - 'default' => [], - 'array' => true, - 'filters' => ['subQueryFunctionEvents'], - ], [ '$id' => ID::custom('keys'), 'type' => Database::VAR_STRING, diff --git a/app/init/database/filters.php b/app/init/database/filters.php index ef40e55379..c4cfd1ac81 100644 --- a/app/init/database/filters.php +++ b/app/init/database/filters.php @@ -170,44 +170,6 @@ Database::addFilter( } ); -Database::addFilter( - 'subQueryWebhookEvents', - function (mixed $value) { - return; - }, - function (mixed $value, Document $document, Database $database) { - $webhooks = $database - ->find('webhooks', [ - Query::equal('projectInternalId', [$document->getSequence()]), - Query::limit(APP_LIMIT_SUBQUERY), - ]); - - $events = []; - foreach ($webhooks as $webhook) { - $webhookEvents = $webhook->getAttribute('events', []); - if (!empty($webhookEvents)) { - $events = array_merge($events, $webhookEvents); - } - } - - return array_unique($events); - } -); - -Database::addFilter( - 'subQueryFunctionEvents', - function (mixed $value) { - return; - }, - function (mixed $value, Document $document, Database $database) { - // Functions are stored in the project database, not platform database - // This filter will return empty array when called from platform DB - // Function events will need to be computed separately when dbForProject is available - // For now, return empty to avoid errors - return []; - } -); - Database::addFilter( 'subQuerySessions', function (mixed $value) { diff --git a/src/Appwrite/Platform/Modules/Compute/Base.php b/src/Appwrite/Platform/Modules/Compute/Base.php index 47afc90986..0ef22f9383 100644 --- a/src/Appwrite/Platform/Modules/Compute/Base.php +++ b/src/Appwrite/Platform/Modules/Compute/Base.php @@ -336,4 +336,29 @@ class Base extends Action return $deployment; } + + /** + * Purge function events cache for a project + * @param Document $project + * @param Database $dbForProject + * @return void + */ + protected function purgeFunctionEventsCache(Document $project, Database $dbForProject): void + { + if ($project->isEmpty() || $project->getId() === 'console') { + return; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functionEvents', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $dbForProject->getCache()->purge($cacheKey); + } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index a0cb5c20f5..f8196b582b 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -7,6 +7,7 @@ use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; use Utopia\Database\Database; use Utopia\Database\Document; +use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; abstract class Action extends DatabasesAction @@ -348,6 +349,7 @@ abstract class Action extends DatabasesAction * @param Event $queueForRealtime * @param Event $queueForFunctions * @param Event $queueForWebhooks + * @param Database $dbForProject * @return void */ protected function triggerBulk( @@ -358,7 +360,8 @@ abstract class Action extends DatabasesAction Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, - Event $queueForWebhooks + Event $queueForWebhooks, + Database $dbForProject ): void { $queueForEvents ->setEvent($event) @@ -368,6 +371,11 @@ abstract class Action extends DatabasesAction ->setParam('tableId', $collection->getId()) ->setContext($this->getCollectionsEventsContext(), $collection); + // Get project and function events (cached) + $project = $queueForEvents->getProject(); + $functionEvents = $this->getFunctionEvents($project, $dbForProject); + $webhookEvents = $this->getWebhookEvents($project); + foreach ($documents as $document) { $queueForEvents ->setParam('documentId', $document->getId()) @@ -378,20 +386,20 @@ abstract class Action extends DatabasesAction ->from($queueForEvents) ->trigger(); - $project = $queueForEvents->getProject(); + // Generate events for this document operation $generatedEvents = Event::generateEvents( $queueForEvents->getEvent(), $queueForEvents->getParams() ); - $functionEvents = $project?->getAttribute('functionEvents', []); + // Only trigger functions if there are matching function events if (!empty($functionEvents) && !empty(array_intersect($functionEvents, $generatedEvents))) { $queueForFunctions ->from($queueForEvents) ->trigger(); } - $webhookEvents = $project?->getAttribute('webhookEvents', []); + // Only trigger webhooks if there are matching webhook events if (!empty($webhookEvents) && !empty(array_intersect($webhookEvents, $generatedEvents))) { $queueForWebhooks ->from($queueForEvents) @@ -404,4 +412,88 @@ abstract class Action extends DatabasesAction $queueForFunctions->reset(); $queueForWebhooks->reset(); } + + /** + * Get function events for a project, using Redis cache + * @param Document|null $project + * @param Database $dbForProject + * @return array + */ + protected function getFunctionEvents(?Document $project, Database $dbForProject): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functionEvents', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $ttl = 3600; // 1 hour cache TTL + $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); + + if ($cachedFunctionEvents !== false) { + return \json_decode($cachedFunctionEvents, true) ?? []; + } + + try { + $functions = $dbForProject->skipValidation(fn () => $dbForProject->find('functions', [ + Query::limit(APP_LIMIT_SUBQUERY), + ])); + + $events = []; + foreach ($functions as $function) { + $functionEvents = $function->getAttribute('events', []); + if (!empty($functionEvents)) { + $events = array_merge($events, $functionEvents); + } + } + + $uniqueEvents = array_unique($events); + + // Save to cache + $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents), $ttl); + + return $uniqueEvents; + } catch (\Throwable $e) { + return []; + } + } + + /** + * Get webhook events for a project from the project's webhooks attribute + * @param Document|null $project + * @return array + */ + protected function getWebhookEvents(?Document $project): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $webhooks = $project->getAttribute('webhooks', []); + if (empty($webhooks)) { + return []; + } + + $events = []; + foreach ($webhooks as $webhook) { + if ($webhook->getAttribute('enabled', false) !== true) { + continue; + } + + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return array_unique($events); + } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php index 070ee09450..e3ba6a37e7 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php @@ -203,7 +203,8 @@ class Delete extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php index 192b10c956..892ab7f0da 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php @@ -234,7 +234,8 @@ class Update extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index 45db6cc96b..c15a8b94ae 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -209,7 +209,8 @@ class Upsert extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index 6ec06f5c8a..60a1f66f36 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -491,7 +491,8 @@ class Create extends Action $queueForEvents, $queueForRealtime, $queueForFunctions, - $queueForWebhooks + $queueForWebhooks, + $dbForProject ); return; } diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php index dfa6636554..ee4db800a2 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php @@ -13,6 +13,7 @@ use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\Database\Database; use Utopia\Database\DateTime; +use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\UID; use Utopia\Platform\Action; @@ -58,6 +59,7 @@ class Delete extends Base ->param('functionId', '', new UID(), 'Function ID.') ->inject('response') ->inject('dbForProject') + ->inject('project') ->inject('queueForDeletes') ->inject('queueForEvents') ->inject('dbForPlatform') @@ -68,6 +70,7 @@ class Delete extends Base string $functionId, Response $response, Database $dbForProject, + Document $project, DeleteEvent $queueForDeletes, Event $queueForEvents, Database $dbForPlatform @@ -95,6 +98,9 @@ class Delete extends Base $queueForEvents->setParam('functionId', $function->getId()); + // Purge function events cache when function is deleted + $this->purgeFunctionEventsCache($project, $dbForProject); + $response->noContent(); } } diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php index adb29bc533..3623e26ec6 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php @@ -286,6 +286,9 @@ class Update extends Base $queueForEvents->setParam('functionId', $function->getId()); + // Purge function events cache when function is updated + $this->purgeFunctionEventsCache($project, $dbForProject); + $response->dynamic($function, Response::MODEL_FUNCTION); } } From cd651dbdb8c8c974a92b968ced46b0dce78db681 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 4 Jan 2026 11:35:20 +0200 Subject: [PATCH 03/15] chore: update dependencies and fix formatting issues in composer files; change Traefik image version in docker-compose; add debug output in Action.php --- composer.json | 2 +- composer.lock | 26 +++++++++---------- docker-compose.yml | 3 ++- .../Collections/Documents/Action.php | 1 + 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/composer.json b/composer.json index 844a10d7e8..b760cac65c 100644 --- a/composer.json +++ b/composer.json @@ -109,4 +109,4 @@ "tbachert/spi": true } } -} \ No newline at end of file +} diff --git a/composer.lock b/composer.lock index c678d1c01e..bd73303a71 100644 --- a/composer.lock +++ b/composer.lock @@ -5438,16 +5438,16 @@ "packages-dev": [ { "name": "appwrite/sdk-generator", - "version": "1.8.6", + "version": "1.8.9", "source": { "type": "git", "url": "https://github.com/appwrite/sdk-generator.git", - "reference": "b6cc29d3bd247e193f3c06b4168dc69d884645f0" + "reference": "5fc210f7403f9ecfa068cd2a74210ec6e2a3cec1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/b6cc29d3bd247e193f3c06b4168dc69d884645f0", - "reference": "b6cc29d3bd247e193f3c06b4168dc69d884645f0", + "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/5fc210f7403f9ecfa068cd2a74210ec6e2a3cec1", + "reference": "5fc210f7403f9ecfa068cd2a74210ec6e2a3cec1", "shasum": "" }, "require": { @@ -5483,9 +5483,9 @@ "description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms", "support": { "issues": "https://github.com/appwrite/sdk-generator/issues", - "source": "https://github.com/appwrite/sdk-generator/tree/1.8.6" + "source": "https://github.com/appwrite/sdk-generator/tree/1.8.9" }, - "time": "2025-12-31T10:22:17+00:00" + "time": "2026-01-02T12:09:51+00:00" }, { "name": "doctrine/annotations", @@ -8562,16 +8562,16 @@ }, { "name": "symfony/process", - "version": "v8.0.0", + "version": "v8.0.3", "source": { "type": "git", "url": "https://github.com/symfony/process.git", - "reference": "a0a750500c4ce900d69ba4e9faf16f82c10ee149" + "reference": "0cbbd88ec836f8757641c651bb995335846abb78" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/process/zipball/a0a750500c4ce900d69ba4e9faf16f82c10ee149", - "reference": "a0a750500c4ce900d69ba4e9faf16f82c10ee149", + "url": "https://api.github.com/repos/symfony/process/zipball/0cbbd88ec836f8757641c651bb995335846abb78", + "reference": "0cbbd88ec836f8757641c651bb995335846abb78", "shasum": "" }, "require": { @@ -8603,7 +8603,7 @@ "description": "Executes commands in sub-processes", "homepage": "https://symfony.com", "support": { - "source": "https://github.com/symfony/process/tree/v8.0.0" + "source": "https://github.com/symfony/process/tree/v8.0.3" }, "funding": [ { @@ -8623,7 +8623,7 @@ "type": "tidelift" } ], - "time": "2025-10-16T16:25:44+00:00" + "time": "2025-12-19T10:01:18+00:00" }, { "name": "symfony/string", @@ -8971,5 +8971,5 @@ "platform-overrides": { "php": "8.3" }, - "plugin-api-version": "2.9.0" + "plugin-api-version": "2.2.0" } diff --git a/docker-compose.yml b/docker-compose.yml index 14591db926..b04e9b7c34 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,8 @@ x-logging: &x-logging services: traefik: - image: traefik:2.11 + #image: traefik:2.11 not working with docker api version 1.52 + image: traefik:3.6 <<: *x-logging container_name: appwrite-traefik command: diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index f8196b582b..8451c64ee5 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -392,6 +392,7 @@ abstract class Action extends DatabasesAction $queueForEvents->getParams() ); + // Only trigger functions if there are matching function events if (!empty($functionEvents) && !empty(array_intersect($functionEvents, $generatedEvents))) { $queueForFunctions From 23dfb23a3bf8c3357f264e24129bb00f730d0a23 Mon Sep 17 00:00:00 2001 From: shimon Date: Tue, 6 Jan 2026 18:28:37 +0200 Subject: [PATCH 04/15] fix: revert Traefik image version to 2.11; implement caching for function events and webhooks; add cache purging on function create/update/delete events --- app/controllers/shared/api.php | 35 +++++- docker-compose.yml | 3 +- .../Databases/Http/Databases/Action.php | 98 +++++++++++++++ .../Collections/Documents/Action.php | 118 +++--------------- .../Http/Databases/Transactions/Update.php | 33 ++++- .../Functions/Http/Functions/Delete.php | 6 - .../Functions/Http/Functions/Update.php | 2 - 7 files changed, 183 insertions(+), 112 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 83b56f626a..b0c7aec41e 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -112,6 +112,33 @@ $eventDatabaseListener = function (Document $project, Document $document, Respon } }; +/** + * Purge function events cache when functions are created, updated or deleted. + */ +$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { + + + if ($document->getCollection() !== 'functions') { + return; + } + + if ($project->isEmpty() || $project->getId() === 'console') { + return; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + var_dump(['purged' => $cacheKey]); + $dbForProject->getCache()->purge($cacheKey); +}; + $usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { $value = 1; @@ -509,7 +536,7 @@ App::init() ->inject('devKey') ->inject('telemetry') ->inject('platform') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) use ($usageDatabaseListener, $eventDatabaseListener, $functionsEventsCacheListener) { $route = $utopia->getRoute(); @@ -650,7 +677,11 @@ App::init() $queueForFunctions->from($queueForEvents), $queueForWebhooks->from($queueForEvents), $queueForRealtime->from($queueForEvents) - )); + )) + ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) + ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) + ; $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); diff --git a/docker-compose.yml b/docker-compose.yml index b04e9b7c34..14591db926 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,8 +12,7 @@ x-logging: &x-logging services: traefik: - #image: traefik:2.11 not working with docker api version 1.52 - image: traefik:3.6 + image: traefik:2.11 <<: *x-logging container_name: appwrite-traefik command: diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php index 728e732cc5..5c85ac1de6 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php @@ -7,6 +7,7 @@ use Appwrite\Platform\Action as AppwriteAction; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Operator; +use Utopia\Database\Query; class Action extends AppwriteAction { @@ -94,4 +95,101 @@ class Action extends AppwriteAction return $data; } + + /** + * Get function events for a project, using Redis cache + * @param Document|null $project + * @param Database $dbForProject + * @return array + */ + protected function getFunctionsEvents(?Document $project, Database $dbForProject): array + { + if ($project === null || + $project->isEmpty() || + $project->getId() === 'console') { + return []; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $ttl = 3600; // 1 hour cache TTL + $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); + + if ($cachedFunctionEvents !== false) { + return \json_decode($cachedFunctionEvents, true) ?? []; + + } + + try { + $events = []; + $limit = 100; + $sum = 100; + $offset = 0; + + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::select(['$id', 'events']), + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('$sequence'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + foreach ($functions as $function) { + $functionEvents = $function->getAttribute('events', []); + if (!empty($functionEvents)) { + $events = array_merge($events, $functionEvents); + } + } + } + + $uniqueEvents = \array_flip(\array_unique($events)); + $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); + + return $uniqueEvents; + } catch (\Throwable $e) { + return []; + } + } + + /** + * Get webhook events for a project from the project's webhooks attribute + * @param Document|null $project + * @return array + */ + protected function getWebhooksEvents(?Document $project): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $webhooks = $project->getAttribute('webhooks', []); + if (empty($webhooks)) { + return []; + } + + $events = []; + foreach ($webhooks as $webhook) { + if ($webhook->getAttribute('enabled', false) !== true) { + continue; + } + + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return \array_flip(\array_unique($events)); + } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index 8451c64ee5..f154372983 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -7,7 +7,6 @@ use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; use Utopia\Database\Database; use Utopia\Database\Document; -use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; abstract class Action extends DatabasesAction @@ -373,8 +372,8 @@ abstract class Action extends DatabasesAction // Get project and function events (cached) $project = $queueForEvents->getProject(); - $functionEvents = $this->getFunctionEvents($project, $dbForProject); - $webhookEvents = $this->getWebhookEvents($project); + $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $this->getWebhooksEvents($project); foreach ($documents as $document) { $queueForEvents @@ -392,19 +391,26 @@ abstract class Action extends DatabasesAction $queueForEvents->getParams() ); - - // Only trigger functions if there are matching function events - if (!empty($functionEvents) && !empty(array_intersect($functionEvents, $generatedEvents))) { - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + break; + } + } } - // Only trigger webhooks if there are matching webhook events - if (!empty($webhookEvents) && !empty(array_intersect($webhookEvents, $generatedEvents))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + break; + } + } } } @@ -413,88 +419,4 @@ abstract class Action extends DatabasesAction $queueForFunctions->reset(); $queueForWebhooks->reset(); } - - /** - * Get function events for a project, using Redis cache - * @param Document|null $project - * @param Database $dbForProject - * @return array - */ - protected function getFunctionEvents(?Document $project, Database $dbForProject): array - { - if ($project === null || $project->isEmpty() || $project->getId() === 'console') { - return []; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functionEvents', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $ttl = 3600; // 1 hour cache TTL - $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); - - if ($cachedFunctionEvents !== false) { - return \json_decode($cachedFunctionEvents, true) ?? []; - } - - try { - $functions = $dbForProject->skipValidation(fn () => $dbForProject->find('functions', [ - Query::limit(APP_LIMIT_SUBQUERY), - ])); - - $events = []; - foreach ($functions as $function) { - $functionEvents = $function->getAttribute('events', []); - if (!empty($functionEvents)) { - $events = array_merge($events, $functionEvents); - } - } - - $uniqueEvents = array_unique($events); - - // Save to cache - $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents), $ttl); - - return $uniqueEvents; - } catch (\Throwable $e) { - return []; - } - } - - /** - * Get webhook events for a project from the project's webhooks attribute - * @param Document|null $project - * @return array - */ - protected function getWebhookEvents(?Document $project): array - { - if ($project === null || $project->isEmpty() || $project->getId() === 'console') { - return []; - } - - $webhooks = $project->getAttribute('webhooks', []); - if (empty($webhooks)) { - return []; - } - - $events = []; - foreach ($webhooks as $webhook) { - if ($webhook->getAttribute('enabled', false) !== true) { - continue; - } - - $webhookEvents = $webhook->getAttribute('events', []); - if (!empty($webhookEvents)) { - $events = array_merge($events, $webhookEvents); - } - } - - return array_unique($events); - } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index 9235c81b8e..30f4a7e05c 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -370,6 +370,11 @@ class Update extends Action $queueForEvents->setEvent($eventString); + // Get project and function/webhook events (cached) + $project = $queueForEvents->getProject(); + $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $this->getWebhooksEvents($project); + foreach ($documentsToTrigger as $doc) { $payload = $doc->getArrayCopy(); $payload['$tableId'] = $collection->getId(); @@ -380,9 +385,33 @@ class Update extends Action ->setParam('rowId', $doc->getId()) ->setPayload($payload); + // Generate events for this document operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); + $queueForRealtime->from($queueForEvents)->trigger(); - $queueForFunctions->from($queueForEvents)->trigger(); - $queueForWebhooks->from($queueForEvents)->trigger(); + + // Only trigger functions if there are matching function events + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions->from($queueForEvents)->trigger(); + break; + } + } + } + + // Only trigger webhooks if there are matching webhook events + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks->from($queueForEvents)->trigger(); + break; + } + } + } } $queueForEvents->reset(); diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php index ee4db800a2..dfa6636554 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Delete.php @@ -13,7 +13,6 @@ use Appwrite\SDK\Response as SDKResponse; use Appwrite\Utopia\Response; use Utopia\Database\Database; use Utopia\Database\DateTime; -use Utopia\Database\Document; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\UID; use Utopia\Platform\Action; @@ -59,7 +58,6 @@ class Delete extends Base ->param('functionId', '', new UID(), 'Function ID.') ->inject('response') ->inject('dbForProject') - ->inject('project') ->inject('queueForDeletes') ->inject('queueForEvents') ->inject('dbForPlatform') @@ -70,7 +68,6 @@ class Delete extends Base string $functionId, Response $response, Database $dbForProject, - Document $project, DeleteEvent $queueForDeletes, Event $queueForEvents, Database $dbForPlatform @@ -98,9 +95,6 @@ class Delete extends Base $queueForEvents->setParam('functionId', $function->getId()); - // Purge function events cache when function is deleted - $this->purgeFunctionEventsCache($project, $dbForProject); - $response->noContent(); } } diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php index 3623e26ec6..fe2ae83807 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php @@ -286,8 +286,6 @@ class Update extends Base $queueForEvents->setParam('functionId', $function->getId()); - // Purge function events cache when function is updated - $this->purgeFunctionEventsCache($project, $dbForProject); $response->dynamic($function, Response::MODEL_FUNCTION); } From 573d8423a379324fa228577a20ce683f347c9888 Mon Sep 17 00:00:00 2001 From: shimon Date: Tue, 6 Jan 2026 18:40:17 +0200 Subject: [PATCH 05/15] refactor: remove unused purgeFunctionEventsCache method and clean up whitespace in Update class --- composer.json | 2 +- .../Platform/Modules/Compute/Base.php | 24 ------------------- .../Functions/Http/Functions/Update.php | 1 - 3 files changed, 1 insertion(+), 26 deletions(-) diff --git a/composer.json b/composer.json index c19ed94e75..55e4e08402 100644 --- a/composer.json +++ b/composer.json @@ -109,4 +109,4 @@ "tbachert/spi": true } } -} +} \ No newline at end of file diff --git a/src/Appwrite/Platform/Modules/Compute/Base.php b/src/Appwrite/Platform/Modules/Compute/Base.php index 0ef22f9383..b1b34609d9 100644 --- a/src/Appwrite/Platform/Modules/Compute/Base.php +++ b/src/Appwrite/Platform/Modules/Compute/Base.php @@ -337,28 +337,4 @@ class Base extends Action return $deployment; } - /** - * Purge function events cache for a project - * @param Document $project - * @param Database $dbForProject - * @return void - */ - protected function purgeFunctionEventsCache(Document $project, Database $dbForProject): void - { - if ($project->isEmpty() || $project->getId() === 'console') { - return; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functionEvents', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $dbForProject->getCache()->purge($cacheKey); - } } diff --git a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php index fe2ae83807..adb29bc533 100644 --- a/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php +++ b/src/Appwrite/Platform/Modules/Functions/Http/Functions/Update.php @@ -286,7 +286,6 @@ class Update extends Base $queueForEvents->setParam('functionId', $function->getId()); - $response->dynamic($function, Response::MODEL_FUNCTION); } } From 0582cdf3945de27e7f59d8b22b7f7fde2a0458f8 Mon Sep 17 00:00:00 2001 From: shimon Date: Tue, 6 Jan 2026 18:41:06 +0200 Subject: [PATCH 06/15] refactor: clean up whitespace and remove commented-out abuse handling code in api.php --- app/controllers/shared/api.php | 46 ++++------------------------------ 1 file changed, 5 insertions(+), 41 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index cd823c9c6b..fe7dd7ce9b 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -117,11 +117,11 @@ $eventDatabaseListener = function (Document $project, Document $document, Respon */ $functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { - + if ($document->getCollection() !== 'functions') { return; } - + if ($project->isEmpty() || $project->getId() === 'console') { return; } @@ -135,7 +135,7 @@ $functionsEventsCacheListener = function (string $event, Document $document, Doc $dbForProject->getTenant(), $project->getId() ); - var_dump(['purged' => $cacheKey]); + $dbForProject->getCache()->purge($cacheKey); }; @@ -681,7 +681,7 @@ App::init() ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ; + ; $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); @@ -845,8 +845,7 @@ App::shutdown() ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') - ->inject('timelimit') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, callable $timelimit) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -880,41 +879,6 @@ App::shutdown() $route = $utopia->getRoute(); $requestParams = $route->getParamsValues(); - /** - * Abuse labels - */ - $abuseEnabled = System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') !== 'disabled'; - $abuseResetCode = $route->getLabel('abuse-reset', []); - $abuseResetCode = \is_array($abuseResetCode) ? $abuseResetCode : [$abuseResetCode]; - - if ($abuseEnabled && \count($abuseResetCode) > 0 && \in_array($response->getStatusCode(), $abuseResetCode)) { - $abuseKeyLabel = $route->getLabel('abuse-key', 'url:{url},ip:{ip}'); - $abuseKeyLabel = (!is_array($abuseKeyLabel)) ? [$abuseKeyLabel] : $abuseKeyLabel; - - foreach ($abuseKeyLabel as $abuseKey) { - $start = $request->getContentRangeStart(); - $end = $request->getContentRangeEnd(); - $timeLimit = $timelimit($abuseKey, $route->getLabel('abuse-limit', 0), $route->getLabel('abuse-time', 3600)); - $timeLimit - ->setParam('{projectId}', $project->getId()) - ->setParam('{userId}', $user->getId()) - ->setParam('{userAgent}', $request->getUserAgent('')) - ->setParam('{ip}', $request->getIP()) - ->setParam('{url}', $request->getHostname() . $route->getPath()) - ->setParam('{method}', $request->getMethod()) - ->setParam('{chunkId}', (int)($start / ($end + 1 - $start))); - - foreach ($request->getParams() as $key => $value) { // Set request params as potential abuse keys - if (!empty($value)) { - $timeLimit->setParam('{param-' . $key . '}', (\is_array($value)) ? \json_encode($value) : $value); - } - } - - $abuse = new Abuse($timeLimit); - $abuse->reset(); - } - } - /** * Audit labels */ From db4dcd164e67fabb2e3be20b0a26cd9d974575d8 Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 7 Jan 2026 16:57:57 +0200 Subject: [PATCH 07/15] refactor: integrate EventProcessor for handling function and webhook events; streamline event triggering in database actions --- app/controllers/shared/api.php | 253 +++--------------- app/init/resources.php | 211 ++++++++++++++- src/Appwrite/Functions/EventProcessor.php | 106 ++++++++ .../Databases/Http/Databases/Action.php | 97 ------- .../Collections/Documents/Action.php | 10 +- .../Collections/Documents/Bulk/Delete.php | 7 +- .../Collections/Documents/Bulk/Update.php | 7 +- .../Collections/Documents/Bulk/Upsert.php | 7 +- .../Collections/Documents/Create.php | 7 +- .../Http/Databases/Transactions/Update.php | 9 +- .../Http/TablesDB/Tables/Rows/Bulk/Delete.php | 1 + .../Http/TablesDB/Tables/Rows/Bulk/Update.php | 1 + .../Http/TablesDB/Tables/Rows/Bulk/Upsert.php | 1 + .../Http/TablesDB/Tables/Rows/Create.php | 1 + 14 files changed, 387 insertions(+), 331 deletions(-) create mode 100644 src/Appwrite/Functions/EventProcessor.php diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index fe7dd7ce9b..a7d5478920 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -10,12 +10,12 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\Method; use Appwrite\Utopia\Database\Documents\User; use Appwrite\Utopia\Request; @@ -30,7 +30,6 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; -use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; @@ -74,178 +73,6 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; -/** - * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. - * - * Accounts can be created in many ways beyond `createAccount` - * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. - */ -$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { - // Only trigger events for user creation with the database listener. - if ($document->getCollection() !== 'users') { - return; - } - - $queueForEvents - ->setEvent('users.[userId].create') - ->setParam('userId', $document->getId()) - ->setPayload($response->output($document, Response::MODEL_USER)); - - // Trigger functions, webhooks, and realtime events - $queueForFunctions - ->from($queueForEvents) - ->trigger(); - - - /** Trigger webhooks events only if a project has them enabled */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); - } - - /** Trigger realtime events only for non console events */ - if ($queueForEvents->getProject()->getId() !== 'console') { - $queueForRealtime - ->from($queueForEvents) - ->trigger(); - } -}; - -/** - * Purge function events cache when functions are created, updated or deleted. - */ -$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { - - - if ($document->getCollection() !== 'functions') { - return; - } - - if ($project->isEmpty() || $project->getId() === 'console') { - return; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functions:events', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $dbForProject->getCache()->purge($cacheKey); -}; - -$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { - $value = 1; - - switch ($event) { - case Database::EVENT_DOCUMENT_DELETE: - $value = -1; - break; - case Database::EVENT_DOCUMENTS_DELETE: - $value = -1 * $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_CREATE: - $value = $document->getAttribute('modified', 0); - break; - case Database::EVENT_DOCUMENTS_UPSERT: - $value = $document->getAttribute('created', 0); - break; - } - - switch (true) { - case $document->getCollection() === 'teams': - $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project - break; - case $document->getCollection() === 'users': - $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case $document->getCollection() === 'sessions': // sessions - $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project - break; - case $document->getCollection() === 'databases': // databases - $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_COLLECTIONS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents - $parts = explode('_', $document->getCollection()); - $databaseInternalId = $parts[1] ?? 0; - $collectionInternalId = $parts[3] ?? 0; - $queueForStatsUsage - ->addMetric(METRIC_DOCUMENTS, $value) // per project - ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database - ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection - break; - case $document->getCollection() === 'buckets': //buckets - $queueForStatsUsage - ->addMetric(METRIC_BUCKETS, $value); // per project - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case str_starts_with($document->getCollection(), 'bucket_'): // files - $parts = explode('_', $document->getCollection()); - $bucketInternalId = $parts[1]; - $queueForStatsUsage - ->addMetric(METRIC_FILES, $value) // per project - ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket - ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket - break; - case $document->getCollection() === 'functions': - $queueForStatsUsage - ->addMetric(METRIC_FUNCTIONS, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'sites': - $queueForStatsUsage - ->addMetric(METRIC_SITES, $value); // per project - - if ($event === Database::EVENT_DOCUMENT_DELETE) { - $queueForStatsUsage - ->addReduce($document); - } - break; - case $document->getCollection() === 'deployments': - $queueForStatsUsage - ->addMetric(METRIC_DEPLOYMENTS, $value) // per project - ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function - ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); - break; - default: - break; - } -}; - App::init() ->groups(['api']) ->inject('utopia') @@ -514,9 +341,6 @@ App::init() ->inject('response') ->inject('project') ->inject('user') - ->inject('publisher') - ->inject('publisherFunctions') - ->inject('publisherWebhooks') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -526,7 +350,6 @@ App::init() ->inject('queueForStatsUsage') ->inject('queueForFunctions') ->inject('queueForMails') - ->inject('queueForMigrations') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -536,7 +359,7 @@ App::init() ->inject('devKey') ->inject('telemetry') ->inject('platform') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) use ($usageDatabaseListener, $eventDatabaseListener, $functionsEventsCacheListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform) { $route = $utopia->getRoute(); @@ -656,32 +479,6 @@ App::init() $queueForBuilds->setPlatform($platform); $queueForMails->setPlatform($platform); - // Clone the queues, to prevent events triggered by the database listener - // from overwriting the events that are supposed to be triggered in the shutdown hook. - $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisherFunctions); - $queueForWebhooks = new Webhook($publisherWebhooks); - $queueForRealtime = new Realtime(); - - $dbForProject - ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) - ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( - $project, - $document, - $response, - $queueForEventsClone->from($queueForEvents), - $queueForFunctions->from($queueForEvents), - $queueForWebhooks->from($queueForEvents), - $queueForRealtime->from($queueForEvents) - )) - ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $dbForProject)) - ; $useCache = $route->getLabel('cache', false); $storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load'); @@ -845,7 +642,8 @@ App::shutdown() ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject) use ($parseLabel) { + ->inject('eventProcessor') + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -854,9 +652,15 @@ App::shutdown() $queueForEvents->setPayload($responsePayload); } - $queueForFunctions - ->from($queueForEvents) - ->trigger(); + // Get project and function/webhook events (cached) + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); + + // Generate events for this operation + $generatedEvents = Event::generateEvents( + $queueForEvents->getEvent(), + $queueForEvents->getParams() + ); if ($project->getId() !== 'console') { $queueForRealtime @@ -864,15 +668,28 @@ App::shutdown() ->trigger(); } - /** Trigger webhooks events only if a project has them enabled - * A future optimisation is to only trigger webhooks if the webhook is "enabled" - * But it might have performance implications on the API due to the number of webhooks etc. - * Some profiling is needed to see if this is a problem. - */ - if (!empty($project->getAttribute('webhooks'))) { - $queueForWebhooks - ->from($queueForEvents) - ->trigger(); + // Only trigger functions if there are matching function events + if (!empty($functionsEvents)) { + foreach ($generatedEvents as $event) { + if (isset($functionsEvents[$event])) { + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + break; + } + } + } + + // Only trigger webhooks if there are matching webhook events + if (!empty($webhooksEvents)) { + foreach ($generatedEvents as $event) { + if (isset($webhooksEvents[$event])) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + break; + } + } } } diff --git a/app/init/resources.php b/app/init/resources.php index d56354c14b..44234425a4 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -19,6 +19,7 @@ use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\GraphQL\Schema; use Appwrite\Network\Cors; use Appwrite\Network\Platform; @@ -153,6 +154,10 @@ App::setResource('queueForAudits', function (Publisher $publisher) { App::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); + +App::setResource('eventProcessor', function () { + return new EventProcessor(); +}, []); App::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); @@ -509,7 +514,7 @@ App::setResource('proofForCode', function (): Code { return $code; }); -App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project) { +App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage) { if ($project->isEmpty() || $project->getId() === 'console') { return $dbForPlatform; } @@ -545,8 +550,210 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform ->setNamespace('_' . $project->getSequence()); } + /** + * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. + * + * Accounts can be created in many ways beyond `createAccount` + * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. + */ + $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { + // Only trigger events for user creation with the database listener. + if ($document->getCollection() !== 'users') { + return; + } + + $queueForEvents + ->setEvent('users.[userId].create') + ->setParam('userId', $document->getId()) + ->setPayload($response->output($document, Response::MODEL_USER)); + + // Trigger functions, webhooks, and realtime events + $queueForFunctions + ->from($queueForEvents) + ->trigger(); + + + /** Trigger webhooks events only if a project has them enabled */ + if (!empty($project->getAttribute('webhooks'))) { + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); + } + + /** Trigger realtime events only for non console events */ + if ($queueForEvents->getProject()->getId() !== 'console') { + $queueForRealtime + ->from($queueForEvents) + ->trigger(); + } + }; + + /** + * Purge function events cache when functions are created, updated or deleted. + */ + $functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) { + + + if ($document->getCollection() !== 'functions') { + return; + } + + if ($project->isEmpty() || $project->getId() === 'console') { + return; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $dbForProject->getCache()->purge($cacheKey); + }; + + $usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) { + $value = 1; + + switch ($event) { + case Database::EVENT_DOCUMENT_DELETE: + $value = -1; + break; + case Database::EVENT_DOCUMENTS_DELETE: + $value = -1 * $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_CREATE: + $value = $document->getAttribute('modified', 0); + break; + case Database::EVENT_DOCUMENTS_UPSERT: + $value = $document->getAttribute('created', 0); + break; + } + + switch (true) { + case $document->getCollection() === 'teams': + $queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project + break; + case $document->getCollection() === 'users': + $queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case $document->getCollection() === 'sessions': // sessions + $queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project + break; + case $document->getCollection() === 'databases': // databases + $queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_COLLECTIONS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value); + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents + $parts = explode('_', $document->getCollection()); + $databaseInternalId = $parts[1] ?? 0; + $collectionInternalId = $parts[3] ?? 0; + $queueForStatsUsage + ->addMetric(METRIC_DOCUMENTS, $value) // per project + ->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database + ->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection + break; + case $document->getCollection() === 'buckets': //buckets + $queueForStatsUsage + ->addMetric(METRIC_BUCKETS, $value); // per project + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case str_starts_with($document->getCollection(), 'bucket_'): // files + $parts = explode('_', $document->getCollection()); + $bucketInternalId = $parts[1]; + $queueForStatsUsage + ->addMetric(METRIC_FILES, $value) // per project + ->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket + ->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket + break; + case $document->getCollection() === 'functions': + $queueForStatsUsage + ->addMetric(METRIC_FUNCTIONS, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'sites': + $queueForStatsUsage + ->addMetric(METRIC_SITES, $value); // per project + + if ($event === Database::EVENT_DOCUMENT_DELETE) { + $queueForStatsUsage + ->addReduce($document); + } + break; + case $document->getCollection() === 'deployments': + $queueForStatsUsage + ->addMetric(METRIC_DEPLOYMENTS, $value) // per project + ->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value) + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function + ->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value); + break; + default: + break; + } + }; + + // Clone the queues, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. + $queueForEventsClone = new Event($publisher); + $queueForFunctions = new Func($publisherFunctions); + $queueForWebhooks = new Webhook($publisherWebhooks); + $queueForRealtime = new Realtime(); + + + $database + ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) + ->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener( + $project, + $document, + $response, + $queueForEventsClone->from($queueForEvents), + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) + )) + ->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database)) + ; + + return $database; -}, ['pools', 'dbForPlatform', 'cache', 'project']); + +}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'queueForStatsUsage']); App::setResource('dbForPlatform', function (Group $pools, Cache $cache) { $adapter = new DatabasePool($pools->get('console')); diff --git a/src/Appwrite/Functions/EventProcessor.php b/src/Appwrite/Functions/EventProcessor.php new file mode 100644 index 0000000000..8ed841d30d --- /dev/null +++ b/src/Appwrite/Functions/EventProcessor.php @@ -0,0 +1,106 @@ + + */ + public function getFunctionsEvents(?Document $project, Database $dbForProject): array + { + if ($project === null || + $project->isEmpty() || + $project->getId() === 'console') { + return []; + } + + $hostname = $dbForProject->getAdapter()->getHostname(); + $cacheKey = \sprintf( + '%s-cache-%s:%s:%s:project:%s:functions:events', + $dbForProject->getCacheName(), + $hostname ?? '', + $dbForProject->getNamespace(), + $dbForProject->getTenant(), + $project->getId() + ); + + $ttl = 3600; // 1 hour cache TTL + $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); + + if ($cachedFunctionEvents !== false) { + return \json_decode($cachedFunctionEvents, true) ?? []; + } + + try { + $events = []; + $limit = 100; + $sum = 100; + $offset = 0; + + while ($sum >= $limit) { + $functions = $dbForProject->find('functions', [ + Query::select(['$id', 'events']), + Query::limit($limit), + Query::offset($offset), + Query::orderAsc('$sequence'), + ]); + + $sum = \count($functions); + $offset = $offset + $limit; + + foreach ($functions as $function) { + $functionEvents = $function->getAttribute('events', []); + if (!empty($functionEvents)) { + $events = array_merge($events, $functionEvents); + } + } + } + + $uniqueEvents = \array_flip(\array_unique($events)); + $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); + + return $uniqueEvents; + } catch (\Throwable $e) { + return []; + } + } + + /** + * Get webhook events for a project from the project's webhooks attribute + * @param Document|null $project + * @return array + */ + public function getWebhooksEvents(?Document $project): array + { + if ($project === null || $project->isEmpty() || $project->getId() === 'console') { + return []; + } + + $webhooks = $project->getAttribute('webhooks', []); + if (empty($webhooks)) { + return []; + } + + $events = []; + foreach ($webhooks as $webhook) { + if ($webhook->getAttribute('enabled', false) !== true) { + continue; + } + + $webhookEvents = $webhook->getAttribute('events', []); + if (!empty($webhookEvents)) { + $events = array_merge($events, $webhookEvents); + } + } + + return \array_flip(\array_unique($events)); + } +} diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php index 5c85ac1de6..8a3d178bde 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php @@ -7,7 +7,6 @@ use Appwrite\Platform\Action as AppwriteAction; use Utopia\Database\Database; use Utopia\Database\Document; use Utopia\Database\Operator; -use Utopia\Database\Query; class Action extends AppwriteAction { @@ -96,100 +95,4 @@ class Action extends AppwriteAction return $data; } - /** - * Get function events for a project, using Redis cache - * @param Document|null $project - * @param Database $dbForProject - * @return array - */ - protected function getFunctionsEvents(?Document $project, Database $dbForProject): array - { - if ($project === null || - $project->isEmpty() || - $project->getId() === 'console') { - return []; - } - - $hostname = $dbForProject->getAdapter()->getHostname(); - $cacheKey = \sprintf( - '%s-cache-%s:%s:%s:project:%s:functions:events', - $dbForProject->getCacheName(), - $hostname ?? '', - $dbForProject->getNamespace(), - $dbForProject->getTenant(), - $project->getId() - ); - - $ttl = 3600; // 1 hour cache TTL - $cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl); - - if ($cachedFunctionEvents !== false) { - return \json_decode($cachedFunctionEvents, true) ?? []; - - } - - try { - $events = []; - $limit = 100; - $sum = 100; - $offset = 0; - - while ($sum >= $limit) { - $functions = $dbForProject->find('functions', [ - Query::select(['$id', 'events']), - Query::limit($limit), - Query::offset($offset), - Query::orderAsc('$sequence'), - ]); - - $sum = \count($functions); - $offset = $offset + $limit; - - foreach ($functions as $function) { - $functionEvents = $function->getAttribute('events', []); - if (!empty($functionEvents)) { - $events = array_merge($events, $functionEvents); - } - } - } - - $uniqueEvents = \array_flip(\array_unique($events)); - $dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents)); - - return $uniqueEvents; - } catch (\Throwable $e) { - return []; - } - } - - /** - * Get webhook events for a project from the project's webhooks attribute - * @param Document|null $project - * @return array - */ - protected function getWebhooksEvents(?Document $project): array - { - if ($project === null || $project->isEmpty() || $project->getId() === 'console') { - return []; - } - - $webhooks = $project->getAttribute('webhooks', []); - if (empty($webhooks)) { - return []; - } - - $events = []; - foreach ($webhooks as $webhook) { - if ($webhook->getAttribute('enabled', false) !== true) { - continue; - } - - $webhookEvents = $webhook->getAttribute('events', []); - if (!empty($webhookEvents)) { - $events = array_merge($events, $webhookEvents); - } - } - - return \array_flip(\array_unique($events)); - } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index f154372983..b4ed8adbaf 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -4,6 +4,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction; use Utopia\Database\Database; use Utopia\Database\Document; @@ -349,6 +350,7 @@ abstract class Action extends DatabasesAction * @param Event $queueForFunctions * @param Event $queueForWebhooks * @param Database $dbForProject + * @param EventProcessor $eventProcessor * @return void */ protected function triggerBulk( @@ -360,7 +362,8 @@ abstract class Action extends DatabasesAction Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, - Database $dbForProject + Database $dbForProject, + EventProcessor $eventProcessor ): void { $queueForEvents ->setEvent($event) @@ -372,8 +375,8 @@ abstract class Action extends DatabasesAction // Get project and function events (cached) $project = $queueForEvents->getProject(); - $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); - $webhooksEvents = $this->getWebhooksEvents($project); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); foreach ($documents as $document) { $queueForEvents @@ -391,6 +394,7 @@ abstract class Action extends DatabasesAction $queueForEvents->getParams() ); + if (!empty($functionsEvents)) { foreach ($generatedEvents as $event) { if (isset($functionsEvents[$event])) { diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php index e3ba6a37e7..a3d3535065 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Delete.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -81,10 +82,11 @@ class Delete extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -204,7 +206,8 @@ class Delete extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php index 892ab7f0da..fdc49b1901 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Update.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -85,10 +86,11 @@ class Update extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -235,7 +237,8 @@ class Update extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php index c15a8b94ae..00e4663171 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Bulk/Upsert.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -83,10 +84,11 @@ class Upsert extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $database = $dbForProject->getDocument('databases', $databaseId); if ($database->isEmpty()) { @@ -210,7 +212,8 @@ class Upsert extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index 60a1f66f36..e32ad29cce 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Deprecated; @@ -132,9 +133,10 @@ class Create extends Action ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } - public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void + public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void { $data = \is_string($data) ? \json_decode($data, true) @@ -492,7 +494,8 @@ class Create extends Action $queueForRealtime, $queueForFunctions, $queueForWebhooks, - $dbForProject + $dbForProject, + $eventProcessor ); return; } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index 30f4a7e05c..ee77f2e578 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -7,6 +7,7 @@ use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; use Appwrite\Extend\Exception; +use Appwrite\Functions\EventProcessor; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; use Appwrite\SDK\Method; @@ -76,6 +77,7 @@ class Update extends Action ->inject('queueForRealtime') ->inject('queueForFunctions') ->inject('queueForWebhooks') + ->inject('eventProcessor') ->callback($this->action(...)); } @@ -93,6 +95,7 @@ class Update extends Action * @param Event $queueForRealtime * @param Event $queueForFunctions * @param Event $queueForWebhooks + * @param EventProcessor $eventProcessor * @return void * @throws ConflictException * @throws Exception @@ -102,7 +105,7 @@ class Update extends Action * @throws Structure * @throws \Utopia\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void + public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, EventProcessor $eventProcessor): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -372,8 +375,8 @@ class Update extends Action // Get project and function/webhook events (cached) $project = $queueForEvents->getProject(); - $functionsEvents = $this->getFunctionsEvents($project, $dbForProject); - $webhooksEvents = $this->getWebhooksEvents($project); + $functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject); + $webhooksEvents = $eventProcessor->getWebhooksEvents($project); foreach ($documentsToTrigger as $doc) { $payload = $doc->getArrayCopy(); diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php index accb0392fe..45e5b84774 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php @@ -66,6 +66,7 @@ class Delete extends DocumentsDelete ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php index fea59b8b13..3062186624 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php @@ -68,6 +68,7 @@ class Update extends DocumentsUpdate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php index 492af25e9f..3f837917c8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php @@ -68,6 +68,7 @@ class Upsert extends DocumentsUpsert ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php index b5491a593b..b610be3ea4 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Create.php @@ -111,6 +111,7 @@ class Create extends DocumentCreate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') + ->inject('eventProcessor') ->callback($this->action(...)); } } From 4b92e3781fc09df95aab3cb8c9852c39a86eda7a Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 7 Jan 2026 17:24:53 +0200 Subject: [PATCH 08/15] removing blank lines --- src/Appwrite/Platform/Modules/Compute/Base.php | 1 - .../Platform/Modules/Databases/Http/Databases/Action.php | 1 - 2 files changed, 2 deletions(-) diff --git a/src/Appwrite/Platform/Modules/Compute/Base.php b/src/Appwrite/Platform/Modules/Compute/Base.php index b1b34609d9..47afc90986 100644 --- a/src/Appwrite/Platform/Modules/Compute/Base.php +++ b/src/Appwrite/Platform/Modules/Compute/Base.php @@ -336,5 +336,4 @@ class Base extends Action return $deployment; } - } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php index 8a3d178bde..728e732cc5 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Action.php @@ -94,5 +94,4 @@ class Action extends AppwriteAction return $data; } - } From 1a87bde88ebac3d9529840fcbf21a9c704792e51 Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 7 Jan 2026 17:25:12 +0200 Subject: [PATCH 09/15] removing blank lines --- app/controllers/shared/api.php | 38 +++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index a7d5478920..69f16b6bcc 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -642,8 +642,9 @@ App::shutdown() ->inject('queueForWebhooks') ->inject('queueForRealtime') ->inject('dbForProject') + ->inject('timelimit') ->inject('eventProcessor') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, EventProcessor $eventProcessor) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -696,6 +697,41 @@ App::shutdown() $route = $utopia->getRoute(); $requestParams = $route->getParamsValues(); + /** + * Abuse labels + */ + $abuseEnabled = System::getEnv('_APP_OPTIONS_ABUSE', 'enabled') !== 'disabled'; + $abuseResetCode = $route->getLabel('abuse-reset', []); + $abuseResetCode = \is_array($abuseResetCode) ? $abuseResetCode : [$abuseResetCode]; + + if ($abuseEnabled && \count($abuseResetCode) > 0 && \in_array($response->getStatusCode(), $abuseResetCode)) { + $abuseKeyLabel = $route->getLabel('abuse-key', 'url:{url},ip:{ip}'); + $abuseKeyLabel = (!is_array($abuseKeyLabel)) ? [$abuseKeyLabel] : $abuseKeyLabel; + + foreach ($abuseKeyLabel as $abuseKey) { + $start = $request->getContentRangeStart(); + $end = $request->getContentRangeEnd(); + $timeLimit = $timelimit($abuseKey, $route->getLabel('abuse-limit', 0), $route->getLabel('abuse-time', 3600)); + $timeLimit + ->setParam('{projectId}', $project->getId()) + ->setParam('{userId}', $user->getId()) + ->setParam('{userAgent}', $request->getUserAgent('')) + ->setParam('{ip}', $request->getIP()) + ->setParam('{url}', $request->getHostname() . $route->getPath()) + ->setParam('{method}', $request->getMethod()) + ->setParam('{chunkId}', (int)($start / ($end + 1 - $start))); + + foreach ($request->getParams() as $key => $value) { // Set request params as potential abuse keys + if (!empty($value)) { + $timeLimit->setParam('{param-' . $key . '}', (\is_array($value)) ? \json_encode($value) : $value); + } + } + + $abuse = new Abuse($timeLimit); + $abuse->reset(); + } + } + /** * Audit labels */ From 2cfaa2223e9766f16d784c32ffb257981393f308 Mon Sep 17 00:00:00 2001 From: shimon Date: Wed, 7 Jan 2026 18:19:20 +0200 Subject: [PATCH 10/15] feat: inject EventProcessor into Update transaction for enhanced event handling --- .../Modules/Databases/Http/TablesDB/Transactions/Update.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php index 4337a8d28d..86c18a32f6 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Transactions/Update.php @@ -60,6 +60,7 @@ class Update extends TransactionsUpdate ->inject('queueForRealtime') ->inject('queueForFunctions') ->inject('queueForWebhooks') + ->inject('eventProcessor') ->callback($this->action(...)); } } From 1306c85eb5d0609bbdbbb4ba6d94694401e4a94d Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 18 Jan 2026 11:03:06 +0200 Subject: [PATCH 11/15] merge with 1.8x --- composer.lock | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/composer.lock b/composer.lock index bf830a27e3..96b945d0d2 100644 --- a/composer.lock +++ b/composer.lock @@ -1365,16 +1365,16 @@ }, { "name": "open-telemetry/exporter-otlp", - "version": "1.3.3", + "version": "1.3.4", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/exporter-otlp.git", - "reference": "07b02bc71838463f6edcc78d3485c04b48fb263d" + "reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/07b02bc71838463f6edcc78d3485c04b48fb263d", - "reference": "07b02bc71838463f6edcc78d3485c04b48fb263d", + "url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/62e680d587beb42e5247aa6ecd89ad1ca406e8ca", + "reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca", "shasum": "" }, "require": { @@ -1425,7 +1425,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-11-13T08:04:37+00:00" + "time": "2026-01-15T09:31:34+00:00" }, { "name": "open-telemetry/gen-otlp-protobuf", @@ -1492,16 +1492,16 @@ }, { "name": "open-telemetry/sdk", - "version": "1.10.0", + "version": "1.11.0", "source": { "type": "git", "url": "https://github.com/opentelemetry-php/sdk.git", - "reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99" + "reference": "d91f21addcdb42da9a451c002777f8318432461a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99", - "reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99", + "url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/d91f21addcdb42da9a451c002777f8318432461a", + "reference": "d91f21addcdb42da9a451c002777f8318432461a", "shasum": "" }, "require": { @@ -1585,7 +1585,7 @@ "issues": "https://github.com/open-telemetry/opentelemetry-php/issues", "source": "https://github.com/open-telemetry/opentelemetry-php" }, - "time": "2025-11-25T10:59:15+00:00" + "time": "2026-01-15T11:21:03+00:00" }, { "name": "open-telemetry/sem-conv", @@ -4516,16 +4516,16 @@ }, { "name": "utopia-php/migration", - "version": "1.4.3", + "version": "1.4.4", "source": { "type": "git", "url": "https://github.com/utopia-php/migration.git", - "reference": "52ca4234d8229b68e27e052248734a08784d9d3d" + "reference": "3fe751902012d09d323420cd3523be1ed855e868" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/migration/zipball/52ca4234d8229b68e27e052248734a08784d9d3d", - "reference": "52ca4234d8229b68e27e052248734a08784d9d3d", + "url": "https://api.github.com/repos/utopia-php/migration/zipball/3fe751902012d09d323420cd3523be1ed855e868", + "reference": "3fe751902012d09d323420cd3523be1ed855e868", "shasum": "" }, "require": { @@ -4565,9 +4565,9 @@ ], "support": { "issues": "https://github.com/utopia-php/migration/issues", - "source": "https://github.com/utopia-php/migration/tree/1.4.3" + "source": "https://github.com/utopia-php/migration/tree/1.4.4" }, - "time": "2026-01-13T09:51:08+00:00" + "time": "2026-01-16T10:00:07+00:00" }, { "name": "utopia-php/mongo", @@ -8988,7 +8988,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": {}, + "stability-flags": [], "prefer-stable": false, "prefer-lowest": false, "platform": { From d015a75e81a48c815d9a69f4864b6157e0460cee Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 18 Jan 2026 12:48:36 +0200 Subject: [PATCH 12/15] linter --- app/controllers/shared/api.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index f991f90023..6cae689d2a 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -10,7 +10,6 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; -use Appwrite\Event\Migration; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; @@ -32,7 +31,6 @@ use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; use Utopia\Database\Validator\Authorization\Input; -use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; From 72def3b2fb289e7c11df5cd9e88b12dc74fa2d75 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 18 Jan 2026 13:05:43 +0200 Subject: [PATCH 13/15] Refactor API action parameters to include Authorization dependency --- app/controllers/shared/api.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 6cae689d2a..f16c1e5972 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -647,7 +647,7 @@ App::shutdown() ->inject('authorization') ->inject('timelimit') ->inject('eventProcessor') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject,Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); From 94e29cff5307db3af4be5fc54ab8c71f955a8f70 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 18 Jan 2026 13:11:05 +0200 Subject: [PATCH 14/15] Fix typo in Authorization parameter in API action definition --- app/controllers/shared/api.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index f16c1e5972..fffe544330 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -647,7 +647,7 @@ App::shutdown() ->inject('authorization') ->inject('timelimit') ->inject('eventProcessor') - ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject,Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) { $responsePayload = $response->getPayload(); From 0203323b4ae39f5ca6bf5d9cfca9c1a912bd3ca3 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 18 Jan 2026 14:01:35 +0200 Subject: [PATCH 15/15] Remove 'authorization' injection from Bulk Delete, Update, and Upsert classes --- .../Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php | 1 - .../Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php | 1 - .../Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php | 1 - 3 files changed, 3 deletions(-) diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php index 7e0adce9f6..45e5b84774 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Delete.php @@ -66,7 +66,6 @@ class Delete extends DocumentsDelete ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') ->inject('eventProcessor') ->callback($this->action(...)); } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php index 3ef5e10033..3062186624 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Update.php @@ -68,7 +68,6 @@ class Update extends DocumentsUpdate ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') ->inject('eventProcessor') ->callback($this->action(...)); } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php index 90b06a54d2..3f837917c8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/TablesDB/Tables/Rows/Bulk/Upsert.php @@ -68,7 +68,6 @@ class Upsert extends DocumentsUpsert ->inject('queueForFunctions') ->inject('queueForWebhooks') ->inject('plan') - ->inject('authorization') ->inject('eventProcessor') ->callback($this->action(...)); }