From 375f6432142f462fe8a052c1ea7138dea4755fed Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 03:29:32 +0000 Subject: [PATCH 01/15] refactor: triggering realtime events with queueForRealtime --- app/controllers/api/functions.php | 52 ++++++--------- app/worker.php | 5 ++ src/Appwrite/Event/Event.php | 23 +++++++ src/Appwrite/Event/Realtime.php | 2 +- src/Appwrite/Platform/Workers/Databases.php | 74 +++++++++------------ 5 files changed, 80 insertions(+), 76 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 14255ef7a4..0f9f5211f4 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -6,13 +6,13 @@ use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; use Appwrite\Functions\Validator\RuntimeSpecification; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -194,9 +194,10 @@ App::post('/v1/functions') ->inject('user') ->inject('queueForEvents') ->inject('queueForBuilds') + ->inject('queueForRealtime') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; // Temporary abuse check @@ -386,18 +387,18 @@ App::post('/v1/functions') ])) ); - /** Trigger Webhook */ $ruleModel = new Rule(); $ruleCreate = $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME); + ->setProject($project) + ->setEvent('rules.[ruleId].create') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); + /** Trigger Webhook */ $ruleCreate - ->setProject($project) - ->setEvent('rules.[ruleId].create') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->setClass(Event::WEBHOOK_CLASS_NAME) + ->setQueue(Event::WEBHOOK_QUEUE_NAME) ->trigger(); /** Trigger Functions */ @@ -406,31 +407,16 @@ App::post('/v1/functions') ->setQueue(Event::FUNCTIONS_QUEUE_NAME) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].create', [ - 'ruleId' => $rule->getId(), - ]); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($ruleCreate) + ->setProjectId('console') + ->trigger(); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->from($ruleCreate) + ->setProjectId($project->getId()) + ->trigger(); } $queueForEvents->setParam('functionId', $function->getId()); diff --git a/app/worker.php b/app/worker.php index 605474e9f1..ad6bf475f9 100644 --- a/app/worker.php +++ b/app/worker.php @@ -13,6 +13,7 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsageDump; /** remove */ @@ -317,6 +318,10 @@ Server::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); +Server::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); + Server::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 0edffdf4dc..8085a836a8 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -66,6 +66,7 @@ class Event protected ?Document $project = null; protected ?Document $user = null; protected ?string $userId = null; + protected ?string $projectId = null; protected bool $paused = false; /** @@ -151,6 +152,18 @@ class Event return $this; } + /** + * Set projectId for this event. + * + * @param string $projectId + * @return self + */ + public function setProjectId(string $projectId): self + { + $this->projectId = $projectId; + return $this; + } + /** * Get project for this event. * @@ -161,6 +174,16 @@ class Event return $this->project; } + /** + * Get projectId for this event. + * + * @return ?string + */ + public function getProjectId(): ?string + { + return $this->projectId; + } + /** * Set user for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index f4f00b59d4..8c302bbabf 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -54,7 +54,7 @@ class Realtime extends Event ); RealtimeAdapter::send( - projectId: $target['projectId'] ?? $this->getProject()->getId(), + projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(), payload: $this->getRealtimePayload(), events: $allEvents, channels: $target['channels'], diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 441b09b4cc..50a3fa52f3 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -3,7 +3,7 @@ namespace Appwrite\Platform\Workers; use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Database\Database; @@ -38,7 +38,8 @@ class Databases extends Action ->inject('dbForPlatform') ->inject('dbForProject') ->inject('log') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log)); + ->inject('queueForRealtime') + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime)); } /** @@ -47,10 +48,11 @@ class Databases extends Action * @param Database $dbForPlatform * @param Database $dbForProject * @param Log $log + * @param Realtime $queueForRealtime * @return void * @throws \Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void { $payload = $message->getPayload() ?? []; @@ -75,10 +77,10 @@ class Databases extends Action match (\strval($type)) { DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), - DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), + DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), default => throw new \Exception('No database operation for type: ' . \strval($type)), }; } @@ -90,13 +92,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable */ - private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -106,12 +109,6 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing informations. @@ -200,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -217,13 +214,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable **/ - private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -312,7 +310,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -358,7 +356,7 @@ class Databases extends Action } if ($exists) { // Delete the duplicate if created, else update in db - $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject); + $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject, $queueForRealtime); } else { $dbForProject->updateDocument('indexes', $index->getId(), $index); } @@ -381,6 +379,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -388,7 +387,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -430,7 +429,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $index, $project, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -442,6 +441,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -449,7 +449,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -490,7 +490,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $index, $project, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -617,26 +617,16 @@ class Databases extends Action Document $collection, Document $attribute, Document $project, - string $projectId, - array $events + Realtime $queueForRealtime ): void { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $attribute, - project: $project, - ); - Realtime::send( - projectId: 'console', - payload: $attribute->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update') + ->setParam('databaseId', $database->getId()) + ->setParam('collectionId', $collection->getId()) + ->setParam('attributeId', $attribute->getId()) + ->setPayload($attribute->getArrayCopy()) + ->trigger(); } } From 2c4c42de054ad6388f53562c29621b913a58864d Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 03:53:18 +0000 Subject: [PATCH 02/15] docs: fix database worker --- src/Appwrite/Platform/Workers/Databases.php | 40 ++++++++------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 50a3fa52f3..6db88a618f 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -2,7 +2,6 @@ namespace Appwrite\Platform\Workers; -use Appwrite\Event\Event; use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; @@ -57,7 +56,7 @@ class Databases extends Action $payload = $message->getPayload() ?? []; if (empty($payload)) { - throw new \Exception('Missing payload'); + throw new Exception('Missing payload'); } $type = $payload['type']; @@ -81,7 +80,7 @@ class Databases extends Action DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), - default => throw new \Exception('No database operation for type: ' . \strval($type)), + default => throw new Exception('No database operation for type: ' . \strval($type)), }; } @@ -166,7 +165,7 @@ class Databases extends Action break; default: if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) { - throw new \Exception('Failed to create Attribute'); + throw new Exception('Failed to create Attribute'); } } @@ -231,15 +230,8 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); $collectionId = $collection->getId(); $key = $attribute->getAttribute('key', ''); - $status = $attribute->getAttribute('status', ''); $type = $attribute->getAttribute('type', ''); $project = $dbForPlatform->getDocument('projects', $projectId); $options = $attribute->getAttribute('options', []); @@ -397,12 +389,6 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); $collectionId = $collection->getId(); $key = $index->getAttribute('key', ''); $type = $index->getAttribute('type', ''); @@ -459,12 +445,6 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); $key = $index->getAttribute('key'); $status = $index->getAttribute('status', ''); $project = $dbForPlatform->getDocument('projects', $projectId); @@ -568,14 +548,14 @@ class Databases extends Action /** - * @param string $collection collectionID + * @param string $collectionId * @param array $queries * @param Database $database * @param callable|null $callback * @return void * @throws Exception */ - protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void + protected function deleteByGroup(string $collectionId, array $queries, Database $database, callable $callback = null): void { $count = 0; $chunk = 0; @@ -587,7 +567,7 @@ class Databases extends Action while ($sum === $limit) { $chunk++; - $results = $database->find($collection, \array_merge([Query::limit($limit)], $queries)); + $results = $database->find($collectionId, \array_merge([Query::limit($limit)], $queries)); $sum = count($results); @@ -612,6 +592,14 @@ class Databases extends Action Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds"); } + /** + * @param Document $database + * @param Document $collection + * @param Document $attribute + * @param Document $project + * @param Realtime $queueForRealtime + * @return void + */ protected function trigger( Document $database, Document $collection, From 47fbb777ed2893e8230316d0386febed76e407a4 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 04:03:02 +0000 Subject: [PATCH 03/15] chore: refactor migrations realtime queue --- src/Appwrite/Platform/Workers/Migrations.php | 66 +++++++++----------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index cd567f6fa3..50d6002d28 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -3,8 +3,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; -use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -54,13 +53,14 @@ class Migrations extends Action ->inject('dbForProject') ->inject('dbForPlatform') ->inject('logError') - ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError)); + ->inject('queueForRealtime') + ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError, $queueForRealtime)); } /** * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void + public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime): void { $payload = $message->getPayload() ?? []; @@ -87,7 +87,7 @@ class Migrations extends Action return; } - $this->processMigration($migration); + $this->processMigration($migration, $queueForRealtime); } /** @@ -155,34 +155,24 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function updateMigrationDocument(Document $migration, Document $project): Document + protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document { - /** Trigger Realtime */ - $allEvents = Event::generateEvents('migrations.[migrationId].update', [ - 'migrationId' => $migration->getId(), - ]); + /** Trigger Realtime Events */ + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent('migrations.[migrationId].update') + ->setParam('migrationId', $migration->getId()) + ->setPayload($migration->getArrayCopy()) + ->trigger(); - $target = Realtime::fromPayload( - event: $allEvents[0], - payload: $migration, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); - - Realtime::send( - projectId: $project->getId(), - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); + $queueForRealtime + ->setProject($project) + ->setProjectId($project->getId()) + ->setEvent('migrations.[migrationId].update') + ->setParam('migrationId', $migration->getId()) + ->setPayload($migration->getArrayCopy()) + ->trigger(); return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration); } @@ -241,7 +231,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration): void + protected function processMigration(Document $migration, Realtime $queueForRealtime): void { $project = $this->project; $projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId()); @@ -265,7 +255,7 @@ class Migrations extends Action $migration->setAttribute('stage', 'processing'); $migration->setAttribute('status', 'processing'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $source = $this->processSource($migration); $destination = $this->processDestination($migration, $tempAPIKey); @@ -279,14 +269,14 @@ class Migrations extends Action /** Start Transfer */ $migration->setAttribute('stage', 'migrating'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $transfer->run( $migration->getAttribute('resources'), - function () use ($migration, $transfer, $projectDocument) { + function () use ($migration, $transfer, $projectDocument, $queueForRealtime) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); }, $migration->getAttribute('resourceId'), $migration->getAttribute('resourceType') @@ -323,7 +313,7 @@ class Migrations extends Action } $migration->setAttribute('errors', $errorMessages); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); return; } @@ -364,7 +354,7 @@ class Migrations extends Action $migration->setAttribute('errors', $errorMessages); } } finally { - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); if ($migration->getAttribute('status', '') === 'failed') { Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')'); From bcdae7f4e37ff7d348d54f39f2991676a89e77c4 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 04:21:36 +0000 Subject: [PATCH 04/15] chore: refactor certificate realtime, fix database --- .../Platform/Workers/Certificates.php | 76 ++++++++----------- src/Appwrite/Platform/Workers/Databases.php | 23 +++--- 2 files changed, 46 insertions(+), 53 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index 7e220b2734..c7f345f77a 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -6,7 +6,7 @@ use Appwrite\Certificates\Adapter as CertificatesAdapter; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; use Appwrite\Utopia\Response\Model\Rule; @@ -47,11 +47,12 @@ class Certificates extends Action ->inject('queueForMails') ->inject('queueForEvents') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('log') ->inject('certificates') ->callback( - fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates) => - $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates) + fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => + $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates) ); } @@ -61,13 +62,14 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Log $log * @param CertificatesAdapter $certificates * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates): void + public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void { $payload = $message->getPayload() ?? []; @@ -81,7 +83,7 @@ class Certificates extends Action $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates, $skipRenewCheck); + $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); } /** @@ -90,13 +92,14 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param CertificatesAdapter $certificates * @param bool $skipRenewCheck * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -186,7 +189,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); } } @@ -199,13 +202,14 @@ class Certificates extends Action * @param Database $dbForPlatform Database connection for console * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @return void * @throws \Utopia\Database\Exception * @throws Authorization * @throws Conflict * @throws Structure */ - private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void { // Check if update or insert required $certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]); @@ -219,7 +223,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); } /** @@ -338,7 +342,7 @@ class Certificates extends Action * * @return void */ - private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void { // TODO: @christyjacob remove once we migrate the rules in 1.7.x if (System::getEnv('_APP_RULES_FORMAT') === 'md5') { @@ -367,50 +371,34 @@ class Certificates extends Action return; } - /** Trigger Webhook */ $ruleModel = new Rule(); + $queueForEvents + ->setProject($project) + ->setEvent('rules.[ruleId].update') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); + + /** Trigger Webhook */ $queueForEvents ->setQueue(Event::WEBHOOK_QUEUE_NAME) ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setProject($project) - ->setEvent('rules.[ruleId].update') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) ->trigger(); - /** Trigger Functions */ $queueForFunctions - ->setProject($project) - ->setEvent('rules.[ruleId].update') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].update', [ - 'ruleId' => $rule->getId(), - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setProjectId('console') + ->trigger(); + + $queueForRealtime + ->from($queueForEvents) + ->setProjectId($project->getId()) + ->trigger(); } } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 6db88a618f..b617da3e33 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -36,9 +36,9 @@ class Databases extends Action ->inject('project') ->inject('dbForPlatform') ->inject('dbForProject') - ->inject('log') ->inject('queueForRealtime') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime)); + ->inject('log') + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $queueForRealtime, $log)); } /** @@ -46,12 +46,12 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject - * @param Log $log * @param Realtime $queueForRealtime + * @param Log $log * @return void * @throws \Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void + public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log): void { $payload = $message->getPayload() ?? []; @@ -108,6 +108,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update"; /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing informations. @@ -196,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); + $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -230,6 +231,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete'; $collectionId = $collection->getId(); $key = $attribute->getAttribute('key', ''); $type = $attribute->getAttribute('type', ''); @@ -302,7 +304,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); + $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -389,6 +391,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update'; $collectionId = $collection->getId(); $key = $index->getAttribute('key', ''); $type = $index->getAttribute('type', ''); @@ -415,7 +418,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $queueForRealtime); + $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -445,6 +448,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete'; $key = $index->getAttribute('key'); $status = $index->getAttribute('status', ''); $project = $dbForPlatform->getDocument('projects', $projectId); @@ -470,7 +474,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $queueForRealtime); + $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -605,12 +609,13 @@ class Databases extends Action Document $collection, Document $attribute, Document $project, + string $event, Realtime $queueForRealtime ): void { $queueForRealtime ->setProject($project) ->setProjectId('console') - ->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update') + ->setEvent($event) ->setParam('databaseId', $database->getId()) ->setParam('collectionId', $collection->getId()) ->setParam('attributeId', $attribute->getId()) From f666e18154ecf2b9b0e51dd3c64b2fe400cd0c80 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 05:00:45 +0000 Subject: [PATCH 05/15] chore: fix databases worker realtime queueing --- src/Appwrite/Platform/Workers/Databases.php | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index b617da3e33..b0c6a9df73 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -197,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $attribute, null, $attribute, $event, $queueForRealtime); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -304,7 +304,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $attribute, null, $attribute, $event, $queueForRealtime); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -418,7 +418,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, null, $index, $index, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -474,7 +474,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, null, $index, $index, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -599,16 +599,20 @@ class Databases extends Action /** * @param Document $database * @param Document $collection - * @param Document $attribute * @param Document $project + * @param Document|null $attribute + * @param Document|null $index + * @param Document $payload * @param Realtime $queueForRealtime * @return void */ protected function trigger( Document $database, Document $collection, - Document $attribute, Document $project, + Document|null $attribute = null, + Document|null $index = null, + Document $payload, string $event, Realtime $queueForRealtime ): void { @@ -617,9 +621,17 @@ class Databases extends Action ->setProjectId('console') ->setEvent($event) ->setParam('databaseId', $database->getId()) - ->setParam('collectionId', $collection->getId()) - ->setParam('attributeId', $attribute->getId()) - ->setPayload($attribute->getArrayCopy()) + ->setParam('collectionId', $collection->getId()); + + if ($attribute !== null) { + $queueForRealtime->setParam('attributeId', $attribute->getId()); + } + if ($index !== null) { + $queueForRealtime->setParam('indexId', $index->getId()); + } + + $queueForRealtime + ->setPayload($payload->getArrayCopy()) ->trigger(); } } From 532160705ffbde2ec6f008546d66ddcbe5e8048c Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 05:50:30 +0000 Subject: [PATCH 06/15] chore: refactor realtime queueing in functions worker --- src/Appwrite/Platform/Workers/Functions.php | 58 +++++++++------------ 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 0a7c39c02f..f485db8648 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; @@ -45,14 +45,15 @@ class Functions extends Action ->inject('message') ->inject('dbForProject') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -137,6 +138,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -177,6 +179,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -199,6 +202,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -284,6 +288,7 @@ class Functions extends Action * @param Log $log * @param Database $dbForProject * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project @@ -308,6 +313,7 @@ class Functions extends Action Log $log, Database $dbForProject, Func $queueForFunctions, + Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Event $queueForEvents, Document $project, @@ -564,20 +570,21 @@ class Functions extends Action ; } - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - /** Trigger Webhook */ $executionModel = new Execution(); $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setUser($user) ->setEvent('functions.[functionId].executions.[executionId].update') ->setParam('functionId', $function->getId()) ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); + + /** Trigger Webhook */ + $queueForEvents + ->setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME) ->trigger(); /** Trigger Functions */ @@ -585,31 +592,16 @@ class Functions extends Action ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setProjectId('console') + ->trigger(); + + $queueForRealtime + ->from($queueForEvents) + ->setProjectId($project->getId()) + ->trigger(); if (!empty($error)) { throw new Exception($error, $errorCode); From 5076303c33a600da6b91a4a8ebe7cb18d79d325c Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 11 Feb 2025 06:06:43 +0000 Subject: [PATCH 07/15] chore: refactor realtime queueing in builds worker --- src/Appwrite/Platform/Workers/Builds.php | 115 +++++++++-------------- 1 file changed, 47 insertions(+), 68 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index e7cbbd5088..f3f733f5eb 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; use Exception; @@ -50,12 +50,13 @@ class Builds extends Action ->inject('dbForPlatform') ->inject('queueForEvents') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForStatsUsage') ->inject('cache') ->inject('dbForProject') ->inject('deviceForFunctions') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); } /** @@ -64,6 +65,7 @@ class Builds extends Action * @param Database $dbForPlatform * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Cache $cache * @param Database $dbForProject @@ -72,7 +74,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -93,7 +95,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); + $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -104,6 +106,7 @@ class Builds extends Action /** * @param Device $deviceForFunctions * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Event $queueForEvents * @param StatsUsage $queueForStatsUsage * @param Database $dbForPlatform @@ -118,7 +121,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -152,10 +155,7 @@ class Builds extends Action } // Realtime preparation - $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ - 'functionId' => $function->getId(), - 'deploymentId' => $deployment->getId() - ]); + $event = "functions.[functionId].deployments.[deploymentId].update"; $startTime = DateTime::now(); $durationStart = \microtime(true); @@ -369,21 +369,16 @@ class Builds extends Action $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } $tmpPath = '/tmp/builds/' . $buildId; @@ -449,21 +444,15 @@ class Builds extends Action ->from($deploymentUpdate) ->trigger(); - /** Trigger Realtime */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Event */ + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); $vars = []; @@ -556,12 +545,12 @@ class Builds extends Action $err = $error; } }), - Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { + Co\go(function () use ($executor, $project, $function, $deployment, &$response, &$build, $dbForProject, $event, &$err, $queueForRealtime, &$isCanceled) { try { $executor->getLogs( deploymentId: $deployment->getId(), projectId: $project->getId(), - callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { + callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $event, $project, $function, $deployment, $queueForRealtime, &$isCanceled) { if ($isCanceled) { return; } @@ -586,21 +575,16 @@ class Builds extends Action $build = $dbForProject->updateDocument('builds', $build->getId(), $build); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } } ); @@ -688,21 +672,16 @@ class Builds extends Action } } finally { /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); /** Trigger usage queue */ if ($build->getAttribute('status') === 'ready') { From a74c6e0b10368c16977e576e90fef76be0a9e2cc Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 07:58:56 +0000 Subject: [PATCH 08/15] chore: added targets to realtime, refactor webhook queues --- app/controllers/api/functions.php | 22 ++++----- src/Appwrite/Event/Event.php | 45 +++++++++---------- src/Appwrite/Event/Realtime.php | 38 +++++++++++----- src/Appwrite/Platform/Workers/Builds.php | 30 +++++++------ .../Platform/Workers/Certificates.php | 33 +++++++------- src/Appwrite/Platform/Workers/Databases.php | 2 +- src/Appwrite/Platform/Workers/Functions.php | 22 ++++----- src/Appwrite/Platform/Workers/Migrations.php | 10 +---- 8 files changed, 103 insertions(+), 99 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 0f9f5211f4..644aee4336 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -9,6 +9,7 @@ use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; @@ -194,10 +195,12 @@ App::post('/v1/functions') ->inject('user') ->inject('queueForEvents') ->inject('queueForBuilds') + ->inject('queueForWebhooks') + ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; // Temporary abuse check @@ -396,26 +399,19 @@ App::post('/v1/functions') ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); /** Trigger Webhook */ - $ruleCreate - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) + $queueForWebhooks + ->from($ruleCreate) ->trigger(); /** Trigger Functions */ - $ruleCreate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $queueForFunctions + ->from($ruleCreate) ->trigger(); /** Trigger Realtime Events */ $queueForRealtime ->from($ruleCreate) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($ruleCreate) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->trigger(); } diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 8085a836a8..2187210e34 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -62,11 +62,11 @@ class Event protected array $params = []; protected array $sensitive = []; protected array $payload = []; + protected array $targets = []; protected array $context = []; protected ?Document $project = null; protected ?Document $user = null; protected ?string $userId = null; - protected ?string $projectId = null; protected bool $paused = false; /** @@ -152,18 +152,6 @@ class Event return $this; } - /** - * Set projectId for this event. - * - * @param string $projectId - * @return self - */ - public function setProjectId(string $projectId): self - { - $this->projectId = $projectId; - return $this; - } - /** * Get project for this event. * @@ -174,16 +162,6 @@ class Event return $this->project; } - /** - * Get projectId for this event. - * - * @return ?string - */ - public function getProjectId(): ?string - { - return $this->projectId; - } - /** * Set user for this event. * @@ -255,6 +233,27 @@ class Event return $this->payload; } + /** + * Get targets for this event. + * + * @return array + */ + public function setTargets(array $targets): self + { + $this->targets = $targets; + return $this; + } + + /** + * Get targets for this event. + * + * @return array + */ + public function getTargets(): array + { + return $this->targets; + } + /** * Set context for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 8c302bbabf..d16057ad2c 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -53,17 +53,33 @@ class Realtime extends Event bucket: $bucket, ); - RealtimeAdapter::send( - projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(), - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); + if (!empty($this->getTargets())) { + foreach ($this->getTargets() as $targetProjectId) { + RealtimeAdapter::send( + projectId: $targetProjectId, + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } + } else { + RealtimeAdapter::send( + projectId: $target['projectId'] ?? $this->getProject()->getId(), + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } return true; } diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 0bc0937da0..8120adccb6 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; use Exception; @@ -49,6 +50,7 @@ class Builds extends Action ->inject('project') ->inject('dbForPlatform') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForStatsUsage') @@ -57,8 +59,8 @@ class Builds extends Action ->inject('deviceForFunctions') ->inject('isResourceBlocked') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => - $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => + $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); } /** @@ -66,6 +68,7 @@ class Builds extends Action * @param Document $project * @param Database $dbForPlatform * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage @@ -76,7 +79,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void { $payload = $message->getPayload() ?? []; @@ -97,7 +100,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); + $this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); break; default: @@ -107,6 +110,7 @@ class Builds extends Action /** * @param Device $deviceForFunctions + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param Event $queueForEvents @@ -123,7 +127,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Webhooks $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -379,7 +383,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -431,19 +435,19 @@ class Builds extends Action $this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForPlatform); } - /** Trigger Webhook */ $deploymentModel = new Deployment(); $deploymentUpdate = $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setEvent('functions.[functionId].deployments.[deploymentId].update') ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) ->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules()))); - $deploymentUpdate->trigger(); + /** Trigger Webhook */ + $queueForWebhooks + ->from($deploymentUpdate) + ->trigger(); /** Trigger Functions */ $queueForFunctions @@ -453,7 +457,7 @@ class Builds extends Action /** Trigger Realtime Event */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -585,7 +589,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -682,7 +686,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index c7f345f77a..009ac24021 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Realtime; +use Appwrite\Event\Webhook; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; use Appwrite\Utopia\Response\Model\Rule; @@ -46,13 +47,14 @@ class Certificates extends Action ->inject('dbForPlatform') ->inject('queueForMails') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('log') ->inject('certificates') ->callback( - fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => - $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates) + fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => + $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates) ); } @@ -61,6 +63,7 @@ class Certificates extends Action * @param Database $dbForPlatform * @param Mail $queueForMails * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param Log $log @@ -69,7 +72,7 @@ class Certificates extends Action * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void + public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void { $payload = $message->getPayload() ?? []; @@ -83,7 +86,7 @@ class Certificates extends Action $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); + $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); } /** @@ -99,7 +102,7 @@ class Certificates extends Action * @throws Throwable * @throws \Utopia\Database\Exception */ - private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -189,7 +192,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } } @@ -209,7 +212,7 @@ class Certificates extends Action * @throws Conflict * @throws Structure */ - private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // Check if update or insert required $certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]); @@ -223,7 +226,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } /** @@ -342,7 +345,7 @@ class Certificates extends Action * * @return void */ - private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // TODO: @christyjacob remove once we migrate the rules in 1.7.x if (System::getEnv('_APP_RULES_FORMAT') === 'md5') { @@ -379,9 +382,8 @@ class Certificates extends Action ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); /** Trigger Webhook */ - $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) + $queueForWebhooks + ->from($queueForEvents) ->trigger(); /** Trigger Functions */ @@ -392,12 +394,7 @@ class Certificates extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($queueForEvents) - ->setProjectId($project->getId()) + ->setTargets(['console', $projectId]) ->trigger(); } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 7d58521b82..ae75a74deb 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -604,7 +604,7 @@ class Databases extends Action ): void { $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('databaseId', $database->getId()) ->setParam('collectionId', $collection->getId()); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index f485db8648..73b02be85e 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; @@ -44,16 +45,17 @@ class Functions extends Action ->inject('project') ->inject('message') ->inject('dbForProject') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -137,6 +139,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -178,6 +181,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -201,6 +205,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -312,6 +317,7 @@ class Functions extends Action private function execute( Log $log, Database $dbForProject, + Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, @@ -582,9 +588,8 @@ class Functions extends Action ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); /** Trigger Webhook */ - $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) + $queueForWebhooks + ->from($queueForEvents) ->trigger(); /** Trigger Functions */ @@ -595,12 +600,7 @@ class Functions extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($queueForEvents) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->trigger(); if (!empty($error)) { diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 50d6002d28..541c171a22 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -160,15 +160,7 @@ class Migrations extends Action /** Trigger Realtime Events */ $queueForRealtime ->setProject($project) - ->setProjectId('console') - ->setEvent('migrations.[migrationId].update') - ->setParam('migrationId', $migration->getId()) - ->setPayload($migration->getArrayCopy()) - ->trigger(); - - $queueForRealtime - ->setProject($project) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->setEvent('migrations.[migrationId].update') ->setParam('migrationId', $migration->getId()) ->setPayload($migration->getArrayCopy()) From 235a357ca3aa22876881840d262ca34853473331 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 11:37:07 +0000 Subject: [PATCH 09/15] chore: initialized queueForWebhooks in worker init --- app/worker.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/worker.php b/app/worker.php index ad6bf475f9..1088a9b6db 100644 --- a/app/worker.php +++ b/app/worker.php @@ -20,6 +20,7 @@ use Appwrite\Event\StatsUsageDump; use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; /** /remove */ +use Appwrite\Event\Webhook; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; @@ -314,6 +315,10 @@ Server::setResource('queueForAudits', function (Publisher $publisher) { return new Audit($publisher); }, ['publisher']); +Server::setResource('queueForWebhooks', function (Publisher $publisher) { + return new Webhook($publisher); +}, ['publisher']); + Server::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); From 9f7aaf702913548ce52cc7311973f0ba7de97f03 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 12:51:19 +0000 Subject: [PATCH 10/15] chore: fix warning in database worker --- src/Appwrite/Event/Event.php | 3 ++- src/Appwrite/Platform/Workers/Databases.php | 28 ++++++++++----------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 2187210e34..7e15cabca9 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -234,8 +234,9 @@ class Event } /** - * Get targets for this event. + * Set targets for this event. * + * @param array $targets * @return array */ public function setTargets(array $targets): self diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index ae75a74deb..b85285ae72 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -197,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $attribute, null, $attribute, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -304,7 +304,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, $attribute, null, $attribute, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -418,7 +418,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, null, $index, $index, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -474,7 +474,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $project, null, $index, $index, $event, $queueForRealtime); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -586,21 +586,19 @@ class Databases extends Action * @param Document $database * @param Document $collection * @param Document $project + * @param Realtime $queueForRealtime * @param Document|null $attribute * @param Document|null $index - * @param Document $payload - * @param Realtime $queueForRealtime * @return void */ protected function trigger( Document $database, Document $collection, Document $project, + string $event, + Realtime $queueForRealtime, Document|null $attribute = null, Document|null $index = null, - Document $payload, - string $event, - Realtime $queueForRealtime ): void { $queueForRealtime ->setProject($project) @@ -610,14 +608,16 @@ class Databases extends Action ->setParam('collectionId', $collection->getId()); if ($attribute !== null) { - $queueForRealtime->setParam('attributeId', $attribute->getId()); + $queueForRealtime + ->setParam('attributeId', $attribute->getId()) + ->setPayload($attribute->getArrayCopy()); } if ($index !== null) { - $queueForRealtime->setParam('indexId', $index->getId()); + $queueForRealtime + ->setParam('indexId', $index->getId()) + ->setPayload($index->getArrayCopy()); } - $queueForRealtime - ->setPayload($payload->getArrayCopy()) - ->trigger(); + $queueForRealtime->trigger(); } } From 88a6616cf8572a6a1132b1527dfde654cbb8a819 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 13:05:50 +0000 Subject: [PATCH 11/15] chore: fix naming --- src/Appwrite/Platform/Workers/Builds.php | 2 +- src/Appwrite/Platform/Workers/Databases.php | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 8120adccb6..c3853a1dde 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -127,7 +127,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Webhooks $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index b85285ae72..74101dd1eb 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -516,7 +516,6 @@ class Databases extends Action $collectionId = $collection->getId(); $collectionInternalId = $collection->getInternalId(); - $databaseId = $database->getId(); $databaseInternalId = $database->getInternalId(); $dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId()); From 55e7633bd12c2631f03f9bc75e5967e1b600f0cd Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 14:04:59 +0000 Subject: [PATCH 12/15] chore: shift targets to realtime --- src/Appwrite/Event/Event.php | 23 ----------------------- src/Appwrite/Event/Realtime.php | 29 +++++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 7e15cabca9..0edffdf4dc 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -62,7 +62,6 @@ class Event protected array $params = []; protected array $sensitive = []; protected array $payload = []; - protected array $targets = []; protected array $context = []; protected ?Document $project = null; protected ?Document $user = null; @@ -233,28 +232,6 @@ class Event return $this->payload; } - /** - * Set targets for this event. - * - * @param array $targets - * @return array - */ - public function setTargets(array $targets): self - { - $this->targets = $targets; - return $this; - } - - /** - * Get targets for this event. - * - * @return array - */ - public function getTargets(): array - { - return $this->targets; - } - /** * Set context for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index d16057ad2c..e04bbfffb4 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -7,10 +7,17 @@ use Utopia\Database\Document; class Realtime extends Event { + protected array $targets = []; + public function __construct() { } + /** + * Get Realtime payload for this event. + * + * @return array + */ public function getRealtimePayload(): array { $payload = []; @@ -24,6 +31,28 @@ class Realtime extends Event return $payload; } + /** + * Set targets for this realtime event. + * + * @param array $targets + * @return array + */ + public function setTargets(array $targets): self + { + $this->targets = $targets; + return $this; + } + + /** + * Get targets for this realtime event. + * + * @return array + */ + public function getTargets(): array + { + return $this->targets; + } + /** * Execute Event. * From 0ddcb9b14c9a07a647a5eb3e5bf95f41b3e2b891 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Wed, 26 Feb 2025 16:24:05 +0000 Subject: [PATCH 13/15] chore: refactor adapter --- src/Appwrite/Event/Realtime.php | 42 ++++++++++++--------------------- 1 file changed, 15 insertions(+), 27 deletions(-) diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index e04bbfffb4..e7a61ea545 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -82,33 +82,21 @@ class Realtime extends Event bucket: $bucket, ); - if (!empty($this->getTargets())) { - foreach ($this->getTargets() as $targetProjectId) { - RealtimeAdapter::send( - projectId: $targetProjectId, - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); - } - } else { - RealtimeAdapter::send( - projectId: $target['projectId'] ?? $this->getProject()->getId(), - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); - } + $projectIds = !empty($this->getTargets()) + ? $this->getTargets() + : [$target['projectId'] ?? $this->getProject()->getId()]; + + RealtimeAdapter::send( + projectId: $projectIds, + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); return true; } From 28896081d78cf12a0a7c06838f7fb2987f823bb1 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Wed, 26 Feb 2025 16:50:12 +0000 Subject: [PATCH 14/15] chore: fix forloop --- src/Appwrite/Event/Realtime.php | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index e7a61ea545..324f140165 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -86,17 +86,19 @@ class Realtime extends Event ? $this->getTargets() : [$target['projectId'] ?? $this->getProject()->getId()]; - RealtimeAdapter::send( - projectId: $projectIds, - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); + foreach ($projectIds as $projectId) { + RealtimeAdapter::send( + projectId: $projectId, + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } return true; } From 388affa3d0f9f0c231f731b30b86cc4127d2c129 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Thu, 27 Feb 2025 07:23:13 +0000 Subject: [PATCH 15/15] chore: updated naming for subscribers --- app/controllers/api/functions.php | 2 +- src/Appwrite/Event/Realtime.php | 20 +++++++++---------- src/Appwrite/Platform/Workers/Builds.php | 8 ++++---- .../Platform/Workers/Certificates.php | 2 +- src/Appwrite/Platform/Workers/Databases.php | 6 +++--- src/Appwrite/Platform/Workers/Functions.php | 2 +- src/Appwrite/Platform/Workers/Migrations.php | 2 +- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index a36313b7ab..583468f6c1 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -411,7 +411,7 @@ App::post('/v1/functions') /** Trigger Realtime Events */ $queueForRealtime ->from($ruleCreate) - ->setTargets(['console', $project->getId()]) + ->setSubscribers(['console', $project->getId()]) ->trigger(); } diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 324f140165..28a1bb6a6d 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -7,7 +7,7 @@ use Utopia\Database\Document; class Realtime extends Event { - protected array $targets = []; + protected array $subscribers = []; public function __construct() { @@ -32,25 +32,25 @@ class Realtime extends Event } /** - * Set targets for this realtime event. + * Set subscribers for this realtime event. * - * @param array $targets + * @param array $subscribers * @return array */ - public function setTargets(array $targets): self + public function setSubscribers(array $subscribers): self { - $this->targets = $targets; + $this->subscribers = $subscribers; return $this; } /** - * Get targets for this realtime event. + * Get subscribers for this realtime event. * * @return array */ - public function getTargets(): array + public function getSubscribers(): array { - return $this->targets; + return $this->subscribers; } /** @@ -82,8 +82,8 @@ class Realtime extends Event bucket: $bucket, ); - $projectIds = !empty($this->getTargets()) - ? $this->getTargets() + $projectIds = !empty($this->getSubscribers()) + ? $this->getSubscribers() : [$target['projectId'] ?? $this->getProject()->getId()]; foreach ($projectIds as $projectId) { diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index c3853a1dde..6f26e9a80c 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -383,7 +383,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setTargets(['console']) + ->setSubscribers(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -457,7 +457,7 @@ class Builds extends Action /** Trigger Realtime Event */ $queueForRealtime ->setProject($project) - ->setTargets(['console']) + ->setSubscribers(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -589,7 +589,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setTargets(['console']) + ->setSubscribers(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -686,7 +686,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setTargets(['console']) + ->setSubscribers(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index 009ac24021..093e6fda5a 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -394,7 +394,7 @@ class Certificates extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setTargets(['console', $projectId]) + ->setSubscribers(['console', $projectId]) ->trigger(); } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 74101dd1eb..4abd035599 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -601,17 +601,17 @@ class Databases extends Action ): void { $queueForRealtime ->setProject($project) - ->setTargets(['console']) + ->setSubscribers(['console']) ->setEvent($event) ->setParam('databaseId', $database->getId()) ->setParam('collectionId', $collection->getId()); - if ($attribute !== null) { + if ($attribute !== null && !empty($attribute)) { $queueForRealtime ->setParam('attributeId', $attribute->getId()) ->setPayload($attribute->getArrayCopy()); } - if ($index !== null) { + if ($index !== null && !empty($index)) { $queueForRealtime ->setParam('indexId', $index->getId()) ->setPayload($index->getArrayCopy()); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 73b02be85e..a7caa3207f 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -600,7 +600,7 @@ class Functions extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setTargets(['console', $project->getId()]) + ->setSubscribers(['console', $project->getId()]) ->trigger(); if (!empty($error)) { diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 08c75d934a..f21a846a0d 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -160,7 +160,7 @@ class Migrations extends Action /** Trigger Realtime Events */ $queueForRealtime ->setProject($project) - ->setTargets(['console', $project->getId()]) + ->setSubscribers(['console', $project->getId()]) ->setEvent('migrations.[migrationId].update') ->setParam('migrationId', $migration->getId()) ->setPayload($migration->getArrayCopy())