From a807dd97d9e5e7c47618dfb70a89b08911a25b0d Mon Sep 17 00:00:00 2001 From: Darshan Date: Fri, 9 May 2025 14:47:01 +0530 Subject: [PATCH] fix: realtime events. --- src/Appwrite/Event/Realtime.php | 33 +++++++- src/Appwrite/Messaging/Adapter/Realtime.php | 12 +-- .../Collections/Attributes/Action.php | 30 ++------ .../Collections/Attributes/Delete.php | 7 +- .../Collections/Documents/Action.php | 16 ---- .../Collections/Documents/Create.php | 6 +- .../Collections/Documents/Delete.php | 6 +- .../Collections/Documents/Update.php | 6 +- .../Databases/Collections/Indexes/Action.php | 16 ---- .../Databases/Collections/Indexes/Create.php | 5 +- .../Databases/Collections/Indexes/Delete.php | 5 +- .../Modules/Databases/Workers/Databases.php | 76 ++++++++----------- 12 files changed, 95 insertions(+), 123 deletions(-) diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 67d51e5c52..f45b6548d8 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -73,21 +73,46 @@ class Realtime extends Event } $allEvents = Event::generateEvents($this->getEvent(), $this->getParams()); + $firstEvent = $allEvents[0]; // most verbose event pattern + + // generate and merge all collection and tables api events. + if (str_contains($this->getEvent(), 'databases.') && str_contains($firstEvent, 'collections')) { + $tableEventMap = [ + 'collections' => 'tables', 'attributes' => 'columns', + 'attributeId' => 'columnId', 'documents' => 'rows', 'documentId' => 'rowId', + ]; + + // replace params! + $tableEvent = str_replace( + array_keys($tableEventMap), + array_values($tableEventMap), + $this->getEvent() + ); + + // generate new events + $tableEvents = Event::generateEvents($tableEvent, $this->getParams()); + + // merge all of the api events + $allEvents = array_merge($allEvents, $tableEvents); + + // remove duplicates + $allEvents = array_values(array_unique($allEvents)); + } + $payload = new Document($this->getPayload()); $db = $this->getContext('database'); $bucket = $this->getContext('bucket'); - // can be Tables API or Collections API, generated channels include both! + // Can be Tables API or Collections API; generated channels include both! $tableOrCollection = $this->getContext('table') ?? $this->getContext('collection'); $target = RealtimeAdapter::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], + event: $firstEvent, payload: $payload, project: $this->getProject(), database: $db, - table: $tableOrCollection, + collection: $tableOrCollection, bucket: $bucket, ); diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 4980618ab1..bfe87c49db 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -252,12 +252,12 @@ class Realtime extends Adapter * @param Document $payload * @param Document|null $project * @param Document|null $database - * @param Document|null $table + * @param Document|null $collection * @param Document|null $bucket * @return array * @throws \Exception */ - public static function fromPayload(string $event, Document $payload, Document $project = null, Document $database = null, Document $table = null, Document $bucket = null): array + public static function fromPayload(string $event, Document $payload, Document $project = null, Document $database = null, Document $collection = null, Document $bucket = null): array { $channels = []; $roles = []; @@ -308,7 +308,7 @@ class Realtime extends Adapter if ($database->isEmpty()) { throw new \Exception('Database needs to be passed to Realtime for Document/Row events in the Database.'); } - if ($table->isEmpty()) { + if ($collection->isEmpty()) { throw new \Exception('Collection or the Table needs to be passed to Realtime for Document/Row events in the Database.'); } @@ -322,9 +322,9 @@ class Realtime extends Adapter $channels[] = 'databases.' . $database->getId() . '.collections.' . $payload->getAttribute('$collectionId') . '.documents'; $channels[] = 'databases.' . $database->getId() . '.collections.' . $payload->getAttribute('$collectionId') . '.documents.' . $payload->getId(); - $roles = $table->getAttribute('documentSecurity', false) - ? \array_merge($table->getRead(), $payload->getRead()) - : $table->getRead(); + $roles = $collection->getAttribute('documentSecurity', false) + ? \array_merge($collection->getRead(), $payload->getRead()) + : $collection->getRead(); } break; case 'buckets': diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Action.php index d1047e671d..954e997872 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Action.php @@ -84,22 +84,6 @@ abstract class Action extends UtopiaAction return $this->isCollectionsAPI() ? 'collections' : 'tables'; } - /** - * Get the correct parent param key (e.g. `tableId` or `collectionId`) - */ - final protected function getParentEventsParamKey(): string - { - return $this->isCollectionsAPI() ? 'collectionId' : 'tableId'; - } - - /** - * Get the correct param key (e.g. `attributeId` or `columnId`) - */ - final protected function getEventsParamKey(): string - { - return $this->getContext() . 'Id'; - } - /** * Get the appropriate parent level not found exception. */ @@ -423,9 +407,10 @@ abstract class Action extends UtopiaAction $queueForEvents ->setContext('database', $db) ->setParam('databaseId', $databaseId) - ->setParam($this->getEventsParamKey(), $attribute->getId()) - // tableId or columnId - ->setParam($this->getParentEventsParamKey(), $collection->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setParam('columnId', $attribute->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); $response->setStatusCode(SwooleResponse::STATUS_CODE_CREATED); @@ -623,9 +608,10 @@ abstract class Action extends UtopiaAction $queueForEvents ->setContext('database', $db) ->setParam('databaseId', $databaseId) - ->setParam($this->getEventsParamKey(), $attribute->getId()) - // tableId or columnId - ->setParam($this->getParentEventsParamKey(), $collection->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setParam('columnId', $attribute->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); return $attribute; diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Delete.php index 2432ac5fd0..81bc77a9ba 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Attributes/Delete.php @@ -146,9 +146,10 @@ class Delete extends Action ->setContext('database', $db) ->setParam('databaseId', $databaseId) ->setPayload($response->output($attribute, $model)) - ->setParam($this->getEventsParamKey(), $attribute->getId()) - // tableId or columnId - ->setParam($this->getParentEventsParamKey(), $collection->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setParam('columnId', $attribute->getId()) + ->setParam('collectionId', $collection->getId()) + ->setParam('tableId', $collection->getId()) // set proper context ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php index b9a3247f96..a5b8fc5805 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Action.php @@ -67,22 +67,6 @@ abstract class Action extends UtopiaAction return $this->isCollectionsAPI() ? 'collections' : 'tables'; } - /** - * Get the correct parent param key (e.g. `tableId` or `collectionId`) - */ - final protected function getParentEventsParamKey(): string - { - return $this->isCollectionsAPI() ? 'collectionId' : 'tableId'; - } - - /** - * Get the correct param key (e.g. `documentId` or `rowId`) - */ - final protected function getEventsParamKey(): string - { - return $this->getContext() . 'Id'; - } - /** * Get the appropriate parent level not found exception. */ diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php index 297d0f3543..4c63837d57 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Create.php @@ -310,9 +310,11 @@ class Create extends Action $queueForEvents ->setParam('databaseId', $databaseId) ->setContext('database', $database) - ->setParam($this->getEventsParamKey(), $document->getId()) + ->setParam('rowId', $document->getId()) + ->setParam('documentId', $document->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setPayload($response->getPayload(), sensitive: $relationships) - ->setParam($this->getParentEventsParamKey(), $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Delete.php index 2f0c4e76db..66e1b37320 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Delete.php @@ -164,8 +164,10 @@ class Delete extends Action $queueForEvents ->setParam('databaseId', $databaseId) ->setContext('database', $database) - ->setParam($this->getParentEventsParamKey(), $collection->getId()) - ->setParam($this->getEventsParamKey(), $document->getId()) + ->setParam('rowId', $document->getId()) + ->setParam('documentId', $document->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection) ->setPayload($response->output($document, $this->getResponseModel()), sensitive: $relationships); diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Update.php index eaaa2bb63f..5bee2ee761 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Documents/Update.php @@ -297,8 +297,10 @@ class Update extends Action $queueForEvents ->setParam('databaseId', $databaseId) ->setContext('database', $database) - ->setParam($this->getEventsParamKey(), $document->getId()) - ->setParam($this->getParentEventsParamKey(), $collection->getId()) + ->setParam('rowId', $document->getId()) + ->setParam('documentId', $document->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection) ->setPayload($response->getPayload(), sensitive: $relationships); } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Action.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Action.php index 6186139bfb..93459440f0 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Action.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Action.php @@ -49,22 +49,6 @@ abstract class Action extends UtopiaAction return $this->context; } - /** - * Get the correct grand parent param key (e.g. `tableId` or `collectionId`) - */ - final protected function getGrandParentEventsParamKey(): string - { - return $this->isCollectionsAPI() ? 'collectionId' : 'tableId'; - } - - /** - * Get the key used in event parameters. - */ - final protected function getEventsParamKey(): string - { - return 'indexId'; - } - /** * Determine if the current action is for the Collections API. */ diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Create.php index 6370a32a4c..014627e0a8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Create.php @@ -220,8 +220,9 @@ class Create extends Action $queueForEvents ->setContext('database', $db) ->setParam('databaseId', $databaseId) - ->setParam($this->getEventsParamKey(), $index->getId()) - ->setParam($this->getGrandParentEventsParamKey(), $collection->getId()) + ->setParam('indexId', $index->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); $response diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Delete.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Delete.php index 2d57a8a5f0..f86cf7b11f 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Delete.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Collections/Indexes/Delete.php @@ -115,9 +115,10 @@ class Delete extends Action $queueForEvents ->setContext('database', $db) ->setParam('databaseId', $databaseId) - ->setParam($this->getEventsParamKey(), $index->getId()) + ->setParam('indexId', $index->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()) ->setPayload($response->output($index, $this->getResponseModel())) - ->setParam($this->getGrandParentEventsParamKey(), $collection->getId()) ->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection); $response->noContent(); diff --git a/src/Appwrite/Platform/Modules/Databases/Workers/Databases.php b/src/Appwrite/Platform/Modules/Databases/Workers/Databases.php index 8711d28d2c..35334de266 100644 --- a/src/Appwrite/Platform/Modules/Databases/Workers/Databases.php +++ b/src/Appwrite/Platform/Modules/Databases/Workers/Databases.php @@ -115,10 +115,8 @@ class Databases extends Action } $projectId = $project->getId(); - $events = [ - "databases.[databaseId].tables.[tableId].columns.[columnId].update", - "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update", - ]; + $event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update"; + /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing information. @@ -207,7 +205,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $events, $queueForRealtime, $attribute); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -241,10 +239,7 @@ class Databases extends Action } $projectId = $project->getId(); - $events = [ - 'databases.[databaseId].tables.[tableId].columns.[columnId].delete', - 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', - ]; + $event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete'; $collectionId = $collection->getId(); $key = $attribute->getAttribute('key', ''); $type = $attribute->getAttribute('type', ''); @@ -317,7 +312,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $events, $queueForRealtime, $attribute); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -404,10 +399,7 @@ class Databases extends Action } $projectId = $project->getId(); - $events = [ - 'databases.[databaseId].tables.[tableId].indexes.[indexId].update', - 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update', - ]; + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update'; $collectionId = $collection->getId(); $key = $index->getAttribute('key', ''); $type = $index->getAttribute('type', ''); @@ -434,7 +426,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $events, $queueForRealtime, null, $index); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -464,10 +456,7 @@ class Databases extends Action } $projectId = $project->getId(); - $events = [ - 'databases.[databaseId].tables.[tableId].indexes.[indexId].delete', - 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', - ]; + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete'; $key = $index->getAttribute('key'); $status = $index->getAttribute('status', ''); $project = $dbForPlatform->getDocument('projects', $projectId); @@ -493,7 +482,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $events, $queueForRealtime, null, $index); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -516,7 +505,6 @@ class Databases extends Action /** * @param Document $database * @param Document $collection - * @param Document $project * @param Database $dbForProject * @return void * @throws Authorization @@ -601,7 +589,7 @@ class Databases extends Action * @param Document $database * @param Document $collection * @param Document $project - * @param string[] $events + * @param string $event * @param Realtime $queueForRealtime * @param Document|null $attribute * @param Document|null $index @@ -612,35 +600,31 @@ class Databases extends Action Document $database, Document $collection, Document $project, - array $events, + string $event, Realtime $queueForRealtime, Document|null $attribute = null, Document|null $index = null, ): void { - // table and collection - foreach ($events as $event) { + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('databaseId', $database->getId()) + ->setParam('tableId', $collection->getId()) + ->setParam('collectionId', $collection->getId()); + + if (! empty($attribute)) { $queueForRealtime - ->setProject($project) - ->setSubscribers(['console']) - ->setEvent($event) - ->setParam('tableId', $collection->getId()) - ->setParam('collectionId', $collection->getId()) - ->setParam('databaseId', $database->getId()); - - if (! empty($attribute)) { - $queueForRealtime - ->setParam('columnId', $attribute->getId()) - ->setParam('attributeId', $attribute->getId()) - ->setPayload($attribute->getArrayCopy()); - } - - if (! empty($index)) { - $queueForRealtime - ->setParam('indexId', $index->getId()) - ->setPayload($index->getArrayCopy()); - } - - $queueForRealtime->trigger(); + ->setParam('columnId', $attribute->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setPayload($attribute->getArrayCopy()); } + if (! empty($index)) { + $queueForRealtime + ->setParam('indexId', $index->getId()) + ->setPayload($index->getArrayCopy()); + } + + $queueForRealtime->trigger(); } }