From a74c6e0b10368c16977e576e90fef76be0a9e2cc Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 18 Feb 2025 07:58:56 +0000 Subject: [PATCH] chore: added targets to realtime, refactor webhook queues --- app/controllers/api/functions.php | 22 ++++----- src/Appwrite/Event/Event.php | 45 +++++++++---------- src/Appwrite/Event/Realtime.php | 38 +++++++++++----- src/Appwrite/Platform/Workers/Builds.php | 30 +++++++------ .../Platform/Workers/Certificates.php | 33 +++++++------- src/Appwrite/Platform/Workers/Databases.php | 2 +- src/Appwrite/Platform/Workers/Functions.php | 22 ++++----- src/Appwrite/Platform/Workers/Migrations.php | 10 +---- 8 files changed, 103 insertions(+), 99 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 0f9f5211f4..644aee4336 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -9,6 +9,7 @@ use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; @@ -194,10 +195,12 @@ App::post('/v1/functions') ->inject('user') ->inject('queueForEvents') ->inject('queueForBuilds') + ->inject('queueForWebhooks') + ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; // Temporary abuse check @@ -396,26 +399,19 @@ App::post('/v1/functions') ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); /** Trigger Webhook */ - $ruleCreate - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME) + $queueForWebhooks + ->from($ruleCreate) ->trigger(); /** Trigger Functions */ - $ruleCreate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $queueForFunctions + ->from($ruleCreate) ->trigger(); /** Trigger Realtime Events */ $queueForRealtime ->from($ruleCreate) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($ruleCreate) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->trigger(); } diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 8085a836a8..2187210e34 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -62,11 +62,11 @@ class Event protected array $params = []; protected array $sensitive = []; protected array $payload = []; + protected array $targets = []; protected array $context = []; protected ?Document $project = null; protected ?Document $user = null; protected ?string $userId = null; - protected ?string $projectId = null; protected bool $paused = false; /** @@ -152,18 +152,6 @@ class Event return $this; } - /** - * Set projectId for this event. - * - * @param string $projectId - * @return self - */ - public function setProjectId(string $projectId): self - { - $this->projectId = $projectId; - return $this; - } - /** * Get project for this event. * @@ -174,16 +162,6 @@ class Event return $this->project; } - /** - * Get projectId for this event. - * - * @return ?string - */ - public function getProjectId(): ?string - { - return $this->projectId; - } - /** * Set user for this event. * @@ -255,6 +233,27 @@ class Event return $this->payload; } + /** + * Get targets for this event. + * + * @return array + */ + public function setTargets(array $targets): self + { + $this->targets = $targets; + return $this; + } + + /** + * Get targets for this event. + * + * @return array + */ + public function getTargets(): array + { + return $this->targets; + } + /** * Set context for this event. * diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index 8c302bbabf..d16057ad2c 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -53,17 +53,33 @@ class Realtime extends Event bucket: $bucket, ); - RealtimeAdapter::send( - projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(), - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); + if (!empty($this->getTargets())) { + foreach ($this->getTargets() as $targetProjectId) { + RealtimeAdapter::send( + projectId: $targetProjectId, + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } + } else { + RealtimeAdapter::send( + projectId: $target['projectId'] ?? $this->getProject()->getId(), + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } return true; } diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 0bc0937da0..8120adccb6 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; use Exception; @@ -49,6 +50,7 @@ class Builds extends Action ->inject('project') ->inject('dbForPlatform') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForStatsUsage') @@ -57,8 +59,8 @@ class Builds extends Action ->inject('deviceForFunctions') ->inject('isResourceBlocked') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => - $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => + $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); } /** @@ -66,6 +68,7 @@ class Builds extends Action * @param Document $project * @param Database $dbForPlatform * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage @@ -76,7 +79,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void { $payload = $message->getPayload() ?? []; @@ -97,7 +100,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); + $this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); break; default: @@ -107,6 +110,7 @@ class Builds extends Action /** * @param Device $deviceForFunctions + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param Event $queueForEvents @@ -123,7 +127,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Webhooks $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -379,7 +383,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -431,19 +435,19 @@ class Builds extends Action $this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForPlatform); } - /** Trigger Webhook */ $deploymentModel = new Deployment(); $deploymentUpdate = $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setEvent('functions.[functionId].deployments.[deploymentId].update') ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) ->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules()))); - $deploymentUpdate->trigger(); + /** Trigger Webhook */ + $queueForWebhooks + ->from($deploymentUpdate) + ->trigger(); /** Trigger Functions */ $queueForFunctions @@ -453,7 +457,7 @@ class Builds extends Action /** Trigger Realtime Event */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -585,7 +589,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) @@ -682,7 +686,7 @@ class Builds extends Action */ $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index c7f345f77a..009ac24021 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Realtime; +use Appwrite\Event\Webhook; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; use Appwrite\Utopia\Response\Model\Rule; @@ -46,13 +47,14 @@ class Certificates extends Action ->inject('dbForPlatform') ->inject('queueForMails') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('log') ->inject('certificates') ->callback( - fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => - $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates) + fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => + $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates) ); } @@ -61,6 +63,7 @@ class Certificates extends Action * @param Database $dbForPlatform * @param Mail $queueForMails * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param Log $log @@ -69,7 +72,7 @@ class Certificates extends Action * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void + public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void { $payload = $message->getPayload() ?? []; @@ -83,7 +86,7 @@ class Certificates extends Action $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); + $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); } /** @@ -99,7 +102,7 @@ class Certificates extends Action * @throws Throwable * @throws \Utopia\Database\Exception */ - private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -189,7 +192,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } } @@ -209,7 +212,7 @@ class Certificates extends Action * @throws Conflict * @throws Structure */ - private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // Check if update or insert required $certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]); @@ -223,7 +226,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } /** @@ -342,7 +345,7 @@ class Certificates extends Action * * @return void */ - private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // TODO: @christyjacob remove once we migrate the rules in 1.7.x if (System::getEnv('_APP_RULES_FORMAT') === 'md5') { @@ -379,9 +382,8 @@ class Certificates extends Action ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); /** Trigger Webhook */ - $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) + $queueForWebhooks + ->from($queueForEvents) ->trigger(); /** Trigger Functions */ @@ -392,12 +394,7 @@ class Certificates extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($queueForEvents) - ->setProjectId($project->getId()) + ->setTargets(['console', $projectId]) ->trigger(); } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 7d58521b82..ae75a74deb 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -604,7 +604,7 @@ class Databases extends Action ): void { $queueForRealtime ->setProject($project) - ->setProjectId('console') + ->setTargets(['console']) ->setEvent($event) ->setParam('databaseId', $database->getId()) ->setParam('collectionId', $collection->getId()); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index f485db8648..73b02be85e 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -7,6 +7,7 @@ use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; @@ -44,16 +45,17 @@ class Functions extends Action ->inject('project') ->inject('message') ->inject('dbForProject') + ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -137,6 +139,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -178,6 +181,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -201,6 +205,7 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, @@ -312,6 +317,7 @@ class Functions extends Action private function execute( Log $log, Database $dbForProject, + Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, @@ -582,9 +588,8 @@ class Functions extends Action ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); /** Trigger Webhook */ - $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) + $queueForWebhooks + ->from($queueForEvents) ->trigger(); /** Trigger Functions */ @@ -595,12 +600,7 @@ class Functions extends Action /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) - ->setProjectId('console') - ->trigger(); - - $queueForRealtime - ->from($queueForEvents) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->trigger(); if (!empty($error)) { diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 50d6002d28..541c171a22 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -160,15 +160,7 @@ class Migrations extends Action /** Trigger Realtime Events */ $queueForRealtime ->setProject($project) - ->setProjectId('console') - ->setEvent('migrations.[migrationId].update') - ->setParam('migrationId', $migration->getId()) - ->setPayload($migration->getArrayCopy()) - ->trigger(); - - $queueForRealtime - ->setProject($project) - ->setProjectId($project->getId()) + ->setTargets(['console', $project->getId()]) ->setEvent('migrations.[migrationId].update') ->setParam('migrationId', $migration->getId()) ->setPayload($migration->getArrayCopy())