Refactor: Remove unused webhook and function event filters, implement caching for function events retrieval

This commit is contained in:
shimon 2026-01-04 09:53:29 +02:00
parent 6c1f967509
commit e9dac6710f
10 changed files with 138 additions and 68 deletions

View file

@ -276,28 +276,6 @@ return [
'array' => false,
'filters' => ['subQueryWebhooks'],
],
[
'$id' => ID::custom('webhookEvents'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 16384,
'signed' => true,
'required' => false,
'default' => [],
'array' => true,
'filters' => ['subQueryWebhookEvents'],
],
[
'$id' => ID::custom('functionEvents'),
'type' => Database::VAR_STRING,
'format' => '',
'size' => 16384,
'signed' => true,
'required' => false,
'default' => [],
'array' => true,
'filters' => ['subQueryFunctionEvents'],
],
[
'$id' => ID::custom('keys'),
'type' => Database::VAR_STRING,

View file

@ -170,44 +170,6 @@ Database::addFilter(
}
);
Database::addFilter(
'subQueryWebhookEvents',
function (mixed $value) {
return;
},
function (mixed $value, Document $document, Database $database) {
$webhooks = $database
->find('webhooks', [
Query::equal('projectInternalId', [$document->getSequence()]),
Query::limit(APP_LIMIT_SUBQUERY),
]);
$events = [];
foreach ($webhooks as $webhook) {
$webhookEvents = $webhook->getAttribute('events', []);
if (!empty($webhookEvents)) {
$events = array_merge($events, $webhookEvents);
}
}
return array_unique($events);
}
);
Database::addFilter(
'subQueryFunctionEvents',
function (mixed $value) {
return;
},
function (mixed $value, Document $document, Database $database) {
// Functions are stored in the project database, not platform database
// This filter will return empty array when called from platform DB
// Function events will need to be computed separately when dbForProject is available
// For now, return empty to avoid errors
return [];
}
);
Database::addFilter(
'subQuerySessions',
function (mixed $value) {

View file

@ -336,4 +336,29 @@ class Base extends Action
return $deployment;
}
/**
* Purge function events cache for a project
* @param Document $project
* @param Database $dbForProject
* @return void
*/
protected function purgeFunctionEventsCache(Document $project, Database $dbForProject): void
{
if ($project->isEmpty() || $project->getId() === 'console') {
return;
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functionEvents',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$dbForProject->getCache()->purge($cacheKey);
}
}

View file

@ -7,6 +7,7 @@ use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Databases\Http\Databases\Action as DatabasesAction;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
abstract class Action extends DatabasesAction
@ -348,6 +349,7 @@ abstract class Action extends DatabasesAction
* @param Event $queueForRealtime
* @param Event $queueForFunctions
* @param Event $queueForWebhooks
* @param Database $dbForProject
* @return void
*/
protected function triggerBulk(
@ -358,7 +360,8 @@ abstract class Action extends DatabasesAction
Event $queueForEvents,
Event $queueForRealtime,
Event $queueForFunctions,
Event $queueForWebhooks
Event $queueForWebhooks,
Database $dbForProject
): void {
$queueForEvents
->setEvent($event)
@ -368,6 +371,11 @@ abstract class Action extends DatabasesAction
->setParam('tableId', $collection->getId())
->setContext($this->getCollectionsEventsContext(), $collection);
// Get project and function events (cached)
$project = $queueForEvents->getProject();
$functionEvents = $this->getFunctionEvents($project, $dbForProject);
$webhookEvents = $this->getWebhookEvents($project);
foreach ($documents as $document) {
$queueForEvents
->setParam('documentId', $document->getId())
@ -378,20 +386,20 @@ abstract class Action extends DatabasesAction
->from($queueForEvents)
->trigger();
$project = $queueForEvents->getProject();
// Generate events for this document operation
$generatedEvents = Event::generateEvents(
$queueForEvents->getEvent(),
$queueForEvents->getParams()
);
$functionEvents = $project?->getAttribute('functionEvents', []);
// Only trigger functions if there are matching function events
if (!empty($functionEvents) && !empty(array_intersect($functionEvents, $generatedEvents))) {
$queueForFunctions
->from($queueForEvents)
->trigger();
}
$webhookEvents = $project?->getAttribute('webhookEvents', []);
// Only trigger webhooks if there are matching webhook events
if (!empty($webhookEvents) && !empty(array_intersect($webhookEvents, $generatedEvents))) {
$queueForWebhooks
->from($queueForEvents)
@ -404,4 +412,88 @@ abstract class Action extends DatabasesAction
$queueForFunctions->reset();
$queueForWebhooks->reset();
}
/**
* Get function events for a project, using Redis cache
* @param Document|null $project
* @param Database $dbForProject
* @return array
*/
protected function getFunctionEvents(?Document $project, Database $dbForProject): array
{
if ($project === null || $project->isEmpty() || $project->getId() === 'console') {
return [];
}
$hostname = $dbForProject->getAdapter()->getHostname();
$cacheKey = \sprintf(
'%s-cache-%s:%s:%s:project:%s:functionEvents',
$dbForProject->getCacheName(),
$hostname ?? '',
$dbForProject->getNamespace(),
$dbForProject->getTenant(),
$project->getId()
);
$ttl = 3600; // 1 hour cache TTL
$cachedFunctionEvents = $dbForProject->getCache()->load($cacheKey, $ttl);
if ($cachedFunctionEvents !== false) {
return \json_decode($cachedFunctionEvents, true) ?? [];
}
try {
$functions = $dbForProject->skipValidation(fn () => $dbForProject->find('functions', [
Query::limit(APP_LIMIT_SUBQUERY),
]));
$events = [];
foreach ($functions as $function) {
$functionEvents = $function->getAttribute('events', []);
if (!empty($functionEvents)) {
$events = array_merge($events, $functionEvents);
}
}
$uniqueEvents = array_unique($events);
// Save to cache
$dbForProject->getCache()->save($cacheKey, \json_encode($uniqueEvents), $ttl);
return $uniqueEvents;
} catch (\Throwable $e) {
return [];
}
}
/**
* Get webhook events for a project from the project's webhooks attribute
* @param Document|null $project
* @return array
*/
protected function getWebhookEvents(?Document $project): array
{
if ($project === null || $project->isEmpty() || $project->getId() === 'console') {
return [];
}
$webhooks = $project->getAttribute('webhooks', []);
if (empty($webhooks)) {
return [];
}
$events = [];
foreach ($webhooks as $webhook) {
if ($webhook->getAttribute('enabled', false) !== true) {
continue;
}
$webhookEvents = $webhook->getAttribute('events', []);
if (!empty($webhookEvents)) {
$events = array_merge($events, $webhookEvents);
}
}
return array_unique($events);
}
}

View file

@ -203,7 +203,8 @@ class Delete extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject
);
}
}

View file

@ -234,7 +234,8 @@ class Update extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject
);
}
}

View file

@ -209,7 +209,8 @@ class Upsert extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject
);
}
}

View file

@ -491,7 +491,8 @@ class Create extends Action
$queueForEvents,
$queueForRealtime,
$queueForFunctions,
$queueForWebhooks
$queueForWebhooks,
$dbForProject
);
return;
}

View file

@ -13,6 +13,7 @@ use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\Database\Validator\UID;
use Utopia\Platform\Action;
@ -58,6 +59,7 @@ class Delete extends Base
->param('functionId', '', new UID(), 'Function ID.')
->inject('response')
->inject('dbForProject')
->inject('project')
->inject('queueForDeletes')
->inject('queueForEvents')
->inject('dbForPlatform')
@ -68,6 +70,7 @@ class Delete extends Base
string $functionId,
Response $response,
Database $dbForProject,
Document $project,
DeleteEvent $queueForDeletes,
Event $queueForEvents,
Database $dbForPlatform
@ -95,6 +98,9 @@ class Delete extends Base
$queueForEvents->setParam('functionId', $function->getId());
// Purge function events cache when function is deleted
$this->purgeFunctionEventsCache($project, $dbForProject);
$response->noContent();
}
}

View file

@ -286,6 +286,9 @@ class Update extends Base
$queueForEvents->setParam('functionId', $function->getId());
// Purge function events cache when function is updated
$this->purgeFunctionEventsCache($project, $dbForProject);
$response->dynamic($function, Response::MODEL_FUNCTION);
}
}