Merge pull request #11033 from appwrite/add-webhooks-and-functions-events

This commit is contained in:
Jake Barnby 2026-01-19 22:02:49 +13:00 committed by GitHub
commit 8124b07860
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 467 additions and 229 deletions

View file

@ -10,12 +10,12 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\Method;
use Appwrite\Utopia\Database\Documents\User;
use Appwrite\Utopia\Request;
@ -31,7 +31,6 @@ use Utopia\Database\Document;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Validator\Authorization\Input;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Validator\WhiteList;
@ -75,151 +74,6 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar
return $label;
};
/**
* This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**.
*
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
return;
}
$queueForEvents
->setEvent('users.[userId].create')
->setParam('userId', $document->getId())
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions, webhooks, and realtime events
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger webhooks events only if a project has them enabled */
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
}
/** Trigger realtime events only for non console events */
if ($queueForEvents->getProject()->getId() !== 'console') {
$queueForRealtime
->from($queueForEvents)
->trigger();
}
};
$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) {
$value = 1;
switch ($event) {
case Database::EVENT_DOCUMENT_DELETE:
$value = -1;
break;
case Database::EVENT_DOCUMENTS_DELETE:
$value = -1 * $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_CREATE:
$value = $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_UPSERT:
$value = $document->getAttribute('created', 0);
break;
}
switch (true) {
case $document->getCollection() === 'teams':
$queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project
break;
case $document->getCollection() === 'users':
$queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case $document->getCollection() === 'sessions': // sessions
$queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project
break;
case $document->getCollection() === 'databases': // databases
$queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value);
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$collectionInternalId = $parts[3] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_DOCUMENTS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection
break;
case $document->getCollection() === 'buckets': //buckets
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'bucket_'): // files
$parts = explode('_', $document->getCollection());
$bucketInternalId = $parts[1];
$queueForStatsUsage
->addMetric(METRIC_FILES, $value) // per project
->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket
break;
case $document->getCollection() === 'functions':
$queueForStatsUsage
->addMetric(METRIC_FUNCTIONS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'sites':
$queueForStatsUsage
->addMetric(METRIC_SITES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'deployments':
$queueForStatsUsage
->addMetric(METRIC_DEPLOYMENTS, $value) // per project
->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value);
break;
default:
break;
}
};
App::init()
->groups(['api'])
->inject('utopia')
@ -489,9 +343,6 @@ App::init()
->inject('response')
->inject('project')
->inject('user')
->inject('publisher')
->inject('publisherFunctions')
->inject('publisherWebhooks')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
@ -501,7 +352,6 @@ App::init()
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('queueForMails')
->inject('queueForMigrations')
->inject('dbForProject')
->inject('timelimit')
->inject('resourceToken')
@ -512,7 +362,7 @@ App::init()
->inject('telemetry')
->inject('platform')
->inject('authorization')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Migration $queueForMigrations, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) {
$route = $utopia->getRoute();
@ -632,28 +482,6 @@ App::init()
$queueForBuilds->setPlatform($platform);
$queueForMails->setPlatform($platform);
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisherWebhooks);
$queueForRealtime = new Realtime();
$dbForProject
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener(
$project,
$document,
$response,
$queueForEventsClone->from($queueForEvents),
$queueForFunctions->from($queueForEvents),
$queueForWebhooks->from($queueForEvents),
$queueForRealtime->from($queueForEvents)
));
$useCache = $route->getLabel('cache', false);
$storageCacheOperationsCounter = $telemetry->createCounter('storage.cache.operations.load');
@ -818,7 +646,8 @@ App::shutdown()
->inject('dbForProject')
->inject('authorization')
->inject('timelimit')
->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit) use ($parseLabel) {
->inject('eventProcessor')
->action(function (App $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, StatsUsage $queueForStatsUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -827,9 +656,15 @@ App::shutdown()
$queueForEvents->setPayload($responsePayload);
}
$queueForFunctions
->from($queueForEvents)
->trigger();
// Get project and function/webhook events (cached)
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
// Generate events for this operation
$generatedEvents = Event::generateEvents(
$queueForEvents->getEvent(),
$queueForEvents->getParams()
);
if ($project->getId() !== 'console') {
$queueForRealtime
@ -837,15 +672,28 @@ App::shutdown()
->trigger();
}
/** Trigger webhooks events only if a project has them enabled
* A future optimisation is to only trigger webhooks if the webhook is "enabled"
* But it might have performance implications on the API due to the number of webhooks etc.
* Some profiling is needed to see if this is a problem.
*/
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
// Only trigger functions if there are matching function events
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions
->from($queueForEvents)
->trigger();
break;
}
}
}
// Only trigger webhooks if there are matching webhook events
if (!empty($webhooksEvents)) {
foreach ($generatedEvents as $event) {
if (isset($webhooksEvents[$event])) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
break;
}
}
}
}

View file

@ -20,6 +20,7 @@ use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\GraphQL\Schema;
use Appwrite\Network\Cors;
use Appwrite\Network\Platform;
@ -157,6 +158,9 @@ App::setResource('queueForAudits', function (Publisher $publisher) {
App::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
App::setResource('eventProcessor', function () {
return new EventProcessor();
}, []);
App::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
@ -517,7 +521,7 @@ App::setResource('authorization', function () {
return new Authorization();
}, []);
App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Authorization $authorization) {
App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform, Cache $cache, Document $project, Response $response, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Authorization $authorization) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForPlatform;
}
@ -555,8 +559,209 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
->setNamespace('_' . $project->getSequence());
}
/**
* This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**.
*
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
return;
}
$queueForEvents
->setEvent('users.[userId].create')
->setParam('userId', $document->getId())
->setPayload($response->output($document, Response::MODEL_USER));
// Trigger functions, webhooks, and realtime events
$queueForFunctions
->from($queueForEvents)
->trigger();
/** Trigger webhooks events only if a project has them enabled */
if (!empty($project->getAttribute('webhooks'))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
}
/** Trigger realtime events only for non console events */
if ($queueForEvents->getProject()->getId() !== 'console') {
$queueForRealtime
->from($queueForEvents)
->trigger();
}
};
/**
* Purge function events cache when functions are created, updated or deleted.
*/
$functionsEventsCacheListener = function (string $event, Document $document, Document $project, Database $dbForProject) {
if ($document->getCollection() !== 'functions') {
return;
}
if ($project->isEmpty() || $project->getId() === 'console') {
return;
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$dbForProject->getCache()->purge($cacheKey);
};
$usageDatabaseListener = function (string $event, Document $document, StatsUsage $queueForStatsUsage) {
$value = 1;
switch ($event) {
case Database::EVENT_DOCUMENT_DELETE:
$value = -1;
break;
case Database::EVENT_DOCUMENTS_DELETE:
$value = -1 * $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_CREATE:
$value = $document->getAttribute('modified', 0);
break;
case Database::EVENT_DOCUMENTS_UPSERT:
$value = $document->getAttribute('created', 0);
break;
}
switch (true) {
case $document->getCollection() === 'teams':
$queueForStatsUsage->addMetric(METRIC_TEAMS, $value); // per project
break;
case $document->getCollection() === 'users':
$queueForStatsUsage->addMetric(METRIC_USERS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case $document->getCollection() === 'sessions': // sessions
$queueForStatsUsage->addMetric(METRIC_SESSIONS, $value); //per project
break;
case $document->getCollection() === 'databases': // databases
$queueForStatsUsage->addMetric(METRIC_DATABASES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && !str_contains($document->getCollection(), 'collection'): //collections
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_COLLECTIONS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_COLLECTIONS), $value);
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'database_') && str_contains($document->getCollection(), '_collection_'): //documents
$parts = explode('_', $document->getCollection());
$databaseInternalId = $parts[1] ?? 0;
$collectionInternalId = $parts[3] ?? 0;
$queueForStatsUsage
->addMetric(METRIC_DOCUMENTS, $value) // per project
->addMetric(str_replace('{databaseInternalId}', $databaseInternalId, METRIC_DATABASE_ID_DOCUMENTS), $value) // per database
->addMetric(str_replace(['{databaseInternalId}', '{collectionInternalId}'], [$databaseInternalId, $collectionInternalId], METRIC_DATABASE_ID_COLLECTION_ID_DOCUMENTS), $value); // per collection
break;
case $document->getCollection() === 'buckets': //buckets
$queueForStatsUsage
->addMetric(METRIC_BUCKETS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case str_starts_with($document->getCollection(), 'bucket_'): // files
$parts = explode('_', $document->getCollection());
$bucketInternalId = $parts[1];
$queueForStatsUsage
->addMetric(METRIC_FILES, $value) // per project
->addMetric(METRIC_FILES_STORAGE, $document->getAttribute('sizeOriginal') * $value) // per project
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES), $value) // per bucket
->addMetric(str_replace('{bucketInternalId}', $bucketInternalId, METRIC_BUCKET_ID_FILES_STORAGE), $document->getAttribute('sizeOriginal') * $value); // per bucket
break;
case $document->getCollection() === 'functions':
$queueForStatsUsage
->addMetric(METRIC_FUNCTIONS, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'sites':
$queueForStatsUsage
->addMetric(METRIC_SITES, $value); // per project
if ($event === Database::EVENT_DOCUMENT_DELETE) {
$queueForStatsUsage
->addReduce($document);
}
break;
case $document->getCollection() === 'deployments':
$queueForStatsUsage
->addMetric(METRIC_DEPLOYMENTS, $value) // per project
->addMetric(METRIC_DEPLOYMENTS_STORAGE, $document->getAttribute('size') * $value) // per project
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}'], [$document->getAttribute('resourceType')], METRIC_RESOURCE_TYPE_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value)
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS), $value) // per function
->addMetric(str_replace(['{resourceType}', '{resourceInternalId}'], [$document->getAttribute('resourceType'), $document->getAttribute('resourceInternalId')], METRIC_RESOURCE_TYPE_ID_DEPLOYMENTS_STORAGE), $document->getAttribute('size') * $value);
break;
default:
break;
}
};
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisherWebhooks);
$queueForRealtime = new Realtime();
$database
->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_DELETE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENTS_UPSERT, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage))
->on(Database::EVENT_DOCUMENT_CREATE, 'create-trigger-events', fn ($event, $document) => $eventDatabaseListener(
$project,
$document,
$response,
$queueForEventsClone->from($queueForEvents),
$queueForFunctions->from($queueForEvents),
$queueForWebhooks->from($queueForEvents),
$queueForRealtime->from($queueForEvents)
))
->on(Database::EVENT_DOCUMENT_CREATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
->on(Database::EVENT_DOCUMENT_UPDATE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
->on(Database::EVENT_DOCUMENT_DELETE, 'purge-function-events-cache', fn ($event, $document) => $functionsEventsCacheListener($event, $document, $project, $database))
;
return $database;
}, ['pools', 'dbForPlatform', 'cache', 'project', 'authorization']);
}, ['pools', 'dbForPlatform', 'cache', 'project', 'response', 'publisher', 'publisherFunctions', 'publisherWebhooks', 'queueForEvents', 'queueForFunctions', 'queueForWebhooks', 'queueForRealtime', 'queueForStatsUsage', 'authorization']);
App::setResource('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) {

36
composer.lock generated
View file

@ -1365,16 +1365,16 @@
},
{
"name": "open-telemetry/exporter-otlp",
"version": "1.3.3",
"version": "1.3.4",
"source": {
"type": "git",
"url": "https://github.com/opentelemetry-php/exporter-otlp.git",
"reference": "07b02bc71838463f6edcc78d3485c04b48fb263d"
"reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/07b02bc71838463f6edcc78d3485c04b48fb263d",
"reference": "07b02bc71838463f6edcc78d3485c04b48fb263d",
"url": "https://api.github.com/repos/opentelemetry-php/exporter-otlp/zipball/62e680d587beb42e5247aa6ecd89ad1ca406e8ca",
"reference": "62e680d587beb42e5247aa6ecd89ad1ca406e8ca",
"shasum": ""
},
"require": {
@ -1425,7 +1425,7 @@
"issues": "https://github.com/open-telemetry/opentelemetry-php/issues",
"source": "https://github.com/open-telemetry/opentelemetry-php"
},
"time": "2025-11-13T08:04:37+00:00"
"time": "2026-01-15T09:31:34+00:00"
},
{
"name": "open-telemetry/gen-otlp-protobuf",
@ -1492,16 +1492,16 @@
},
{
"name": "open-telemetry/sdk",
"version": "1.10.0",
"version": "1.11.0",
"source": {
"type": "git",
"url": "https://github.com/opentelemetry-php/sdk.git",
"reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99"
"reference": "d91f21addcdb42da9a451c002777f8318432461a"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99",
"reference": "3dfc3d1ad729ec7eb25f1b9a4ae39fe779affa99",
"url": "https://api.github.com/repos/opentelemetry-php/sdk/zipball/d91f21addcdb42da9a451c002777f8318432461a",
"reference": "d91f21addcdb42da9a451c002777f8318432461a",
"shasum": ""
},
"require": {
@ -1585,7 +1585,7 @@
"issues": "https://github.com/open-telemetry/opentelemetry-php/issues",
"source": "https://github.com/open-telemetry/opentelemetry-php"
},
"time": "2025-11-25T10:59:15+00:00"
"time": "2026-01-15T11:21:03+00:00"
},
{
"name": "open-telemetry/sem-conv",
@ -4516,16 +4516,16 @@
},
{
"name": "utopia-php/migration",
"version": "1.4.3",
"version": "1.4.4",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/migration.git",
"reference": "52ca4234d8229b68e27e052248734a08784d9d3d"
"reference": "3fe751902012d09d323420cd3523be1ed855e868"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/migration/zipball/52ca4234d8229b68e27e052248734a08784d9d3d",
"reference": "52ca4234d8229b68e27e052248734a08784d9d3d",
"url": "https://api.github.com/repos/utopia-php/migration/zipball/3fe751902012d09d323420cd3523be1ed855e868",
"reference": "3fe751902012d09d323420cd3523be1ed855e868",
"shasum": ""
},
"require": {
@ -4565,9 +4565,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/migration/issues",
"source": "https://github.com/utopia-php/migration/tree/1.4.3"
"source": "https://github.com/utopia-php/migration/tree/1.4.4"
},
"time": "2026-01-13T09:51:08+00:00"
"time": "2026-01-16T10:00:07+00:00"
},
{
"name": "utopia-php/mongo",
@ -8988,7 +8988,7 @@
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {
@ -9012,5 +9012,5 @@
"platform-overrides": {
"php": "8.3"
},
"plugin-api-version": "2.6.0"
"plugin-api-version": "2.2.0"
}

View file

@ -0,0 +1,106 @@
<?php
namespace Appwrite\Functions;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Query;
class EventProcessor
{
/**
* Get function events for a project, using Redis cache
* @param Document|null $project
* @param Database $dbForProject
* @return array<string, bool>
*/
public function getFunctionsEvents(?Document $project, Database $dbForProject): array
{
if ($project === null ||
$project->isEmpty() ||
$project->getId() === 'console') {
return [];
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functions:events',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$ttl = 3600; // 1 hour cache TTL
$cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl);
if ($cachedFunctionEvents !== false) {
return \json_decode($cachedFunctionEvents, true) ?? [];
}
try {
$events = [];
$limit = 100;
$sum = 100;
$offset = 0;
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::select(['$id', 'events']),
Query::limit($limit),
Query::offset($offset),
Query::orderAsc('$sequence'),
]);
$sum = \count($functions);
$offset = $offset + $limit;
foreach ($functions as $function) {
$functionEvents = $function->getAttribute('events', []);
if (!empty($functionEvents)) {
$events = array_merge($events, $functionEvents);
}
}
}
$uniqueEvents = \array_flip(\array_unique($events));
$dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents));
return $uniqueEvents;
} catch (\Throwable $e) {
return [];
}
}
/**
* Get webhook events for a project from the project's webhooks attribute
* @param Document|null $project
* @return array<string, bool>
*/
public function getWebhooksEvents(?Document $project): array
{
if ($project === null || $project->isEmpty() || $project->getId() === 'console') {
return [];
}
$webhooks = $project->getAttribute('webhooks', []);
if (empty($webhooks)) {
return [];
}
$events = [];
foreach ($webhooks as $webhook) {
if ($webhook->getAttribute('enabled', false) !== true) {
continue;
}
$webhookEvents = $webhook->getAttribute('events', []);
if (!empty($webhookEvents)) {
$events = array_merge($events, $webhookEvents);
}
}
return \array_flip(\array_unique($events));
}
}

View file

@ -4,6 +4,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction;
use Utopia\Database\Database;
use Utopia\Database\Document;
@ -349,6 +350,8 @@ abstract class Action extends DatabasesAction
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param Event $queueForWebhooks
* @param Database $dbForProject
* @param EventProcessor $eventProcessor
* @return void
*/
protected function triggerBulk(
@ -359,7 +362,9 @@ abstract class Action extends DatabasesAction
Event $queueForEvents,
Event $queueForRealtime,
Event $queueForFunctions,
Event $queueForWebhooks
Event $queueForWebhooks,
Database $dbForProject,
EventProcessor $eventProcessor
): void {
$queueForEvents
->setEvent($event)
@ -369,6 +374,11 @@ abstract class Action extends DatabasesAction
->setParam('tableId', $collection->getId())
->setContext($this->getCollectionsEventsContext(), $collection);
// Get project and function events (cached)
$project = $queueForEvents->getProject();
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
foreach ($documents as $document) {
$queueForEvents
->setParam('documentId', $document->getId())
@ -379,14 +389,33 @@ abstract class Action extends DatabasesAction
->from($queueForEvents)
->trigger();
$queueForFunctions
->from($queueForEvents)
->trigger();
// Generate events for this document operation
$generatedEvents = Event::generateEvents(
$queueForEvents->getEvent(),
$queueForEvents->getParams()
);
if (!empty($queueForEvents->getProject()?->getAttribute('webhooks', []))) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions
->from($queueForEvents)
->trigger();
break;
}
}
}
if (!empty($webhooksEvents)) {
foreach ($generatedEvents as $event) {
if (isset($webhooksEvents[$event])) {
$queueForWebhooks
->from($queueForEvents)
->trigger();
break;
}
}
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -81,10 +82,11 @@ class Delete extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@ -203,7 +205,9 @@ class Delete extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -85,10 +86,11 @@ class Update extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@ -234,7 +236,9 @@ class Update extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documents\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -83,10 +84,11 @@ class Upsert extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, EventProcessor $eventProcessor): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {
@ -209,7 +211,9 @@ class Upsert extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject,
$eventProcessor
);
}
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Collections\Documen
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Deprecated;
@ -134,9 +135,10 @@ class Create extends Action
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization): void
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan, Authorization $authorization, EventProcessor $eventProcessor): void
{
$data = \is_string($data)
? \json_decode($data, true)
@ -498,7 +500,9 @@ class Create extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject,
$eventProcessor
);
return;
}

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\StatsUsage;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
@ -77,6 +78,7 @@ class Update extends Action
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
@ -94,6 +96,7 @@ class Update extends Action
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param Event $queueForWebhooks
* @param EventProcessor $eventProcessor
* @return void
* @throws ConflictException
* @throws Exception
@ -103,7 +106,7 @@ class Update extends Action
* @throws Structure
* @throws \Utopia\Exception
*/
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization): void
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, Authorization $authorization, EventProcessor $eventProcessor): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
@ -371,6 +374,11 @@ class Update extends Action
$queueForEvents->setEvent($eventString);
// Get project and function/webhook events (cached)
$project = $queueForEvents->getProject();
$functionsEvents = $eventProcessor->getFunctionsEvents($project, $dbForProject);
$webhooksEvents = $eventProcessor->getWebhooksEvents($project);
foreach ($documentsToTrigger as $doc) {
$payload = $doc->getArrayCopy();
$payload['$tableId'] = $collection->getId();
@ -381,9 +389,33 @@ class Update extends Action
->setParam('rowId', $doc->getId())
->setPayload($payload);
// Generate events for this document operation
$generatedEvents = Event::generateEvents(
$queueForEvents->getEvent(),
$queueForEvents->getParams()
);
$queueForRealtime->from($queueForEvents)->trigger();
$queueForFunctions->from($queueForEvents)->trigger();
$queueForWebhooks->from($queueForEvents)->trigger();
// Only trigger functions if there are matching function events
if (!empty($functionsEvents)) {
foreach ($generatedEvents as $event) {
if (isset($functionsEvents[$event])) {
$queueForFunctions->from($queueForEvents)->trigger();
break;
}
}
}
// Only trigger webhooks if there are matching webhook events
if (!empty($webhooksEvents)) {
foreach ($generatedEvents as $event) {
if (isset($webhooksEvents[$event])) {
$queueForWebhooks->from($queueForEvents)->trigger();
break;
}
}
}
}
$queueForEvents->reset();

View file

@ -66,7 +66,7 @@ class Delete extends DocumentsDelete
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -68,7 +68,7 @@ class Update extends DocumentsUpdate
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -68,7 +68,7 @@ class Upsert extends DocumentsUpsert
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -112,6 +112,7 @@ class Create extends DocumentCreate
->inject('queueForWebhooks')
->inject('plan')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
}

View file

@ -61,6 +61,7 @@ class Update extends TransactionsUpdate
->inject('queueForFunctions')
->inject('queueForWebhooks')
->inject('authorization')
->inject('eventProcessor')
->callback($this->action(...));
}
}