mirror of
https://github.com/appwrite/appwrite
synced 2026-05-23 00:49:02 +00:00
fix: realtime events.
This commit is contained in:
parent
a8dc5c3797
commit
a807dd97d9
12 changed files with 95 additions and 123 deletions
|
|
@ -73,21 +73,46 @@ class Realtime extends Event
|
|||
}
|
||||
|
||||
$allEvents = Event::generateEvents($this->getEvent(), $this->getParams());
|
||||
$firstEvent = $allEvents[0]; // most verbose event pattern
|
||||
|
||||
// generate and merge all collection and tables api events.
|
||||
if (str_contains($this->getEvent(), 'databases.') && str_contains($firstEvent, 'collections')) {
|
||||
$tableEventMap = [
|
||||
'collections' => 'tables', 'attributes' => 'columns',
|
||||
'attributeId' => 'columnId', 'documents' => 'rows', 'documentId' => 'rowId',
|
||||
];
|
||||
|
||||
// replace params!
|
||||
$tableEvent = str_replace(
|
||||
array_keys($tableEventMap),
|
||||
array_values($tableEventMap),
|
||||
$this->getEvent()
|
||||
);
|
||||
|
||||
// generate new events
|
||||
$tableEvents = Event::generateEvents($tableEvent, $this->getParams());
|
||||
|
||||
// merge all of the api events
|
||||
$allEvents = array_merge($allEvents, $tableEvents);
|
||||
|
||||
// remove duplicates
|
||||
$allEvents = array_values(array_unique($allEvents));
|
||||
}
|
||||
|
||||
$payload = new Document($this->getPayload());
|
||||
|
||||
$db = $this->getContext('database');
|
||||
$bucket = $this->getContext('bucket');
|
||||
|
||||
// can be Tables API or Collections API, generated channels include both!
|
||||
// Can be Tables API or Collections API; generated channels include both!
|
||||
$tableOrCollection = $this->getContext('table') ?? $this->getContext('collection');
|
||||
|
||||
$target = RealtimeAdapter::fromPayload(
|
||||
// Pass first, most verbose event pattern
|
||||
event: $allEvents[0],
|
||||
event: $firstEvent,
|
||||
payload: $payload,
|
||||
project: $this->getProject(),
|
||||
database: $db,
|
||||
table: $tableOrCollection,
|
||||
collection: $tableOrCollection,
|
||||
bucket: $bucket,
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -252,12 +252,12 @@ class Realtime extends Adapter
|
|||
* @param Document $payload
|
||||
* @param Document|null $project
|
||||
* @param Document|null $database
|
||||
* @param Document|null $table
|
||||
* @param Document|null $collection
|
||||
* @param Document|null $bucket
|
||||
* @return array
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function fromPayload(string $event, Document $payload, Document $project = null, Document $database = null, Document $table = null, Document $bucket = null): array
|
||||
public static function fromPayload(string $event, Document $payload, Document $project = null, Document $database = null, Document $collection = null, Document $bucket = null): array
|
||||
{
|
||||
$channels = [];
|
||||
$roles = [];
|
||||
|
|
@ -308,7 +308,7 @@ class Realtime extends Adapter
|
|||
if ($database->isEmpty()) {
|
||||
throw new \Exception('Database needs to be passed to Realtime for Document/Row events in the Database.');
|
||||
}
|
||||
if ($table->isEmpty()) {
|
||||
if ($collection->isEmpty()) {
|
||||
throw new \Exception('Collection or the Table needs to be passed to Realtime for Document/Row events in the Database.');
|
||||
}
|
||||
|
||||
|
|
@ -322,9 +322,9 @@ class Realtime extends Adapter
|
|||
$channels[] = 'databases.' . $database->getId() . '.collections.' . $payload->getAttribute('$collectionId') . '.documents';
|
||||
$channels[] = 'databases.' . $database->getId() . '.collections.' . $payload->getAttribute('$collectionId') . '.documents.' . $payload->getId();
|
||||
|
||||
$roles = $table->getAttribute('documentSecurity', false)
|
||||
? \array_merge($table->getRead(), $payload->getRead())
|
||||
: $table->getRead();
|
||||
$roles = $collection->getAttribute('documentSecurity', false)
|
||||
? \array_merge($collection->getRead(), $payload->getRead())
|
||||
: $collection->getRead();
|
||||
}
|
||||
break;
|
||||
case 'buckets':
|
||||
|
|
|
|||
|
|
@ -84,22 +84,6 @@ abstract class Action extends UtopiaAction
|
|||
return $this->isCollectionsAPI() ? 'collections' : 'tables';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct parent param key (e.g. `tableId` or `collectionId`)
|
||||
*/
|
||||
final protected function getParentEventsParamKey(): string
|
||||
{
|
||||
return $this->isCollectionsAPI() ? 'collectionId' : 'tableId';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct param key (e.g. `attributeId` or `columnId`)
|
||||
*/
|
||||
final protected function getEventsParamKey(): string
|
||||
{
|
||||
return $this->getContext() . 'Id';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the appropriate parent level not found exception.
|
||||
*/
|
||||
|
|
@ -423,9 +407,10 @@ abstract class Action extends UtopiaAction
|
|||
$queueForEvents
|
||||
->setContext('database', $db)
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setParam($this->getEventsParamKey(), $attribute->getId())
|
||||
// tableId or columnId
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setParam('attributeId', $attribute->getId())
|
||||
->setParam('columnId', $attribute->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
|
||||
$response->setStatusCode(SwooleResponse::STATUS_CODE_CREATED);
|
||||
|
|
@ -623,9 +608,10 @@ abstract class Action extends UtopiaAction
|
|||
$queueForEvents
|
||||
->setContext('database', $db)
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setParam($this->getEventsParamKey(), $attribute->getId())
|
||||
// tableId or columnId
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setParam('attributeId', $attribute->getId())
|
||||
->setParam('columnId', $attribute->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
|
||||
return $attribute;
|
||||
|
|
|
|||
|
|
@ -146,9 +146,10 @@ class Delete extends Action
|
|||
->setContext('database', $db)
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setPayload($response->output($attribute, $model))
|
||||
->setParam($this->getEventsParamKey(), $attribute->getId())
|
||||
// tableId or columnId
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setParam('attributeId', $attribute->getId())
|
||||
->setParam('columnId', $attribute->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
// set proper context
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
|
||||
|
|
|
|||
|
|
@ -67,22 +67,6 @@ abstract class Action extends UtopiaAction
|
|||
return $this->isCollectionsAPI() ? 'collections' : 'tables';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct parent param key (e.g. `tableId` or `collectionId`)
|
||||
*/
|
||||
final protected function getParentEventsParamKey(): string
|
||||
{
|
||||
return $this->isCollectionsAPI() ? 'collectionId' : 'tableId';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct param key (e.g. `documentId` or `rowId`)
|
||||
*/
|
||||
final protected function getEventsParamKey(): string
|
||||
{
|
||||
return $this->getContext() . 'Id';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the appropriate parent level not found exception.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -310,9 +310,11 @@ class Create extends Action
|
|||
$queueForEvents
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setContext('database', $database)
|
||||
->setParam($this->getEventsParamKey(), $document->getId())
|
||||
->setParam('rowId', $document->getId())
|
||||
->setParam('documentId', $document->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setPayload($response->getPayload(), sensitive: $relationships)
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -164,8 +164,10 @@ class Delete extends Action
|
|||
$queueForEvents
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setContext('database', $database)
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setParam($this->getEventsParamKey(), $document->getId())
|
||||
->setParam('rowId', $document->getId())
|
||||
->setParam('documentId', $document->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection)
|
||||
->setPayload($response->output($document, $this->getResponseModel()), sensitive: $relationships);
|
||||
|
||||
|
|
|
|||
|
|
@ -297,8 +297,10 @@ class Update extends Action
|
|||
$queueForEvents
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setContext('database', $database)
|
||||
->setParam($this->getEventsParamKey(), $document->getId())
|
||||
->setParam($this->getParentEventsParamKey(), $collection->getId())
|
||||
->setParam('rowId', $document->getId())
|
||||
->setParam('documentId', $document->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection)
|
||||
->setPayload($response->getPayload(), sensitive: $relationships);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,22 +49,6 @@ abstract class Action extends UtopiaAction
|
|||
return $this->context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the correct grand parent param key (e.g. `tableId` or `collectionId`)
|
||||
*/
|
||||
final protected function getGrandParentEventsParamKey(): string
|
||||
{
|
||||
return $this->isCollectionsAPI() ? 'collectionId' : 'tableId';
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the key used in event parameters.
|
||||
*/
|
||||
final protected function getEventsParamKey(): string
|
||||
{
|
||||
return 'indexId';
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if the current action is for the Collections API.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -220,8 +220,9 @@ class Create extends Action
|
|||
$queueForEvents
|
||||
->setContext('database', $db)
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setParam($this->getEventsParamKey(), $index->getId())
|
||||
->setParam($this->getGrandParentEventsParamKey(), $collection->getId())
|
||||
->setParam('indexId', $index->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
|
||||
$response
|
||||
|
|
|
|||
|
|
@ -115,9 +115,10 @@ class Delete extends Action
|
|||
$queueForEvents
|
||||
->setContext('database', $db)
|
||||
->setParam('databaseId', $databaseId)
|
||||
->setParam($this->getEventsParamKey(), $index->getId())
|
||||
->setParam('indexId', $index->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setPayload($response->output($index, $this->getResponseModel()))
|
||||
->setParam($this->getGrandParentEventsParamKey(), $collection->getId())
|
||||
->setContext($this->isCollectionsAPI() ? 'collection' : 'table', $collection);
|
||||
|
||||
$response->noContent();
|
||||
|
|
|
|||
|
|
@ -115,10 +115,8 @@ class Databases extends Action
|
|||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
$events = [
|
||||
"databases.[databaseId].tables.[tableId].columns.[columnId].update",
|
||||
"databases.[databaseId].collections.[collectionId].attributes.[attributeId].update",
|
||||
];
|
||||
$event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update";
|
||||
|
||||
/**
|
||||
* TODO @christyjacob4 verify if this is still the case
|
||||
* Fetch attribute from the database, since with Resque float values are loosing information.
|
||||
|
|
@ -207,7 +205,7 @@ class Databases extends Action
|
|||
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $project, $events, $queueForRealtime, $attribute);
|
||||
$this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute);
|
||||
|
||||
if (! $relatedCollection->isEmpty()) {
|
||||
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
|
||||
|
|
@ -241,10 +239,7 @@ class Databases extends Action
|
|||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
$events = [
|
||||
'databases.[databaseId].tables.[tableId].columns.[columnId].delete',
|
||||
'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete',
|
||||
];
|
||||
$event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete';
|
||||
$collectionId = $collection->getId();
|
||||
$key = $attribute->getAttribute('key', '');
|
||||
$type = $attribute->getAttribute('type', '');
|
||||
|
|
@ -317,7 +312,7 @@ class Databases extends Action
|
|||
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $project, $events, $queueForRealtime, $attribute);
|
||||
$this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute);
|
||||
}
|
||||
|
||||
// The underlying database removes/rebuilds indexes when attribute is removed
|
||||
|
|
@ -404,10 +399,7 @@ class Databases extends Action
|
|||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
$events = [
|
||||
'databases.[databaseId].tables.[tableId].indexes.[indexId].update',
|
||||
'databases.[databaseId].collections.[collectionId].indexes.[indexId].update',
|
||||
];
|
||||
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update';
|
||||
$collectionId = $collection->getId();
|
||||
$key = $index->getAttribute('key', '');
|
||||
$type = $index->getAttribute('type', '');
|
||||
|
|
@ -434,7 +426,7 @@ class Databases extends Action
|
|||
|
||||
throw $e;
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $project, $events, $queueForRealtime, null, $index);
|
||||
$this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index);
|
||||
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
|
||||
}
|
||||
}
|
||||
|
|
@ -464,10 +456,7 @@ class Databases extends Action
|
|||
}
|
||||
|
||||
$projectId = $project->getId();
|
||||
$events = [
|
||||
'databases.[databaseId].tables.[tableId].indexes.[indexId].delete',
|
||||
'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete',
|
||||
];
|
||||
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete';
|
||||
$key = $index->getAttribute('key');
|
||||
$status = $index->getAttribute('status', '');
|
||||
$project = $dbForPlatform->getDocument('projects', $projectId);
|
||||
|
|
@ -493,7 +482,7 @@ class Databases extends Action
|
|||
throw $e;
|
||||
|
||||
} finally {
|
||||
$this->trigger($database, $collection, $project, $events, $queueForRealtime, null, $index);
|
||||
$this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index);
|
||||
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
|
||||
}
|
||||
}
|
||||
|
|
@ -516,7 +505,6 @@ class Databases extends Action
|
|||
/**
|
||||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $project
|
||||
* @param Database $dbForProject
|
||||
* @return void
|
||||
* @throws Authorization
|
||||
|
|
@ -601,7 +589,7 @@ class Databases extends Action
|
|||
* @param Document $database
|
||||
* @param Document $collection
|
||||
* @param Document $project
|
||||
* @param string[] $events
|
||||
* @param string $event
|
||||
* @param Realtime $queueForRealtime
|
||||
* @param Document|null $attribute
|
||||
* @param Document|null $index
|
||||
|
|
@ -612,35 +600,31 @@ class Databases extends Action
|
|||
Document $database,
|
||||
Document $collection,
|
||||
Document $project,
|
||||
array $events,
|
||||
string $event,
|
||||
Realtime $queueForRealtime,
|
||||
Document|null $attribute = null,
|
||||
Document|null $index = null,
|
||||
): void {
|
||||
// table and collection
|
||||
foreach ($events as $event) {
|
||||
$queueForRealtime
|
||||
->setProject($project)
|
||||
->setSubscribers(['console'])
|
||||
->setEvent($event)
|
||||
->setParam('databaseId', $database->getId())
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId());
|
||||
|
||||
if (! empty($attribute)) {
|
||||
$queueForRealtime
|
||||
->setProject($project)
|
||||
->setSubscribers(['console'])
|
||||
->setEvent($event)
|
||||
->setParam('tableId', $collection->getId())
|
||||
->setParam('collectionId', $collection->getId())
|
||||
->setParam('databaseId', $database->getId());
|
||||
|
||||
if (! empty($attribute)) {
|
||||
$queueForRealtime
|
||||
->setParam('columnId', $attribute->getId())
|
||||
->setParam('attributeId', $attribute->getId())
|
||||
->setPayload($attribute->getArrayCopy());
|
||||
}
|
||||
|
||||
if (! empty($index)) {
|
||||
$queueForRealtime
|
||||
->setParam('indexId', $index->getId())
|
||||
->setPayload($index->getArrayCopy());
|
||||
}
|
||||
|
||||
$queueForRealtime->trigger();
|
||||
->setParam('columnId', $attribute->getId())
|
||||
->setParam('attributeId', $attribute->getId())
|
||||
->setPayload($attribute->getArrayCopy());
|
||||
}
|
||||
if (! empty($index)) {
|
||||
$queueForRealtime
|
||||
->setParam('indexId', $index->getId())
|
||||
->setPayload($index->getArrayCopy());
|
||||
}
|
||||
|
||||
$queueForRealtime->trigger();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue