diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index 7e220b2734..c7f345f77a 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -6,7 +6,7 @@ use Appwrite\Certificates\Adapter as CertificatesAdapter; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; use Appwrite\Utopia\Response\Model\Rule; @@ -47,11 +47,12 @@ class Certificates extends Action ->inject('queueForMails') ->inject('queueForEvents') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('log') ->inject('certificates') ->callback( - fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates) => - $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates) + fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => + $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates) ); } @@ -61,13 +62,14 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Log $log * @param CertificatesAdapter $certificates * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates): void + public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void { $payload = $message->getPayload() ?? []; @@ -81,7 +83,7 @@ class Certificates extends Action $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates, $skipRenewCheck); + $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); } /** @@ -90,13 +92,14 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param CertificatesAdapter $certificates * @param bool $skipRenewCheck * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -186,7 +189,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); } } @@ -199,13 +202,14 @@ class Certificates extends Action * @param Database $dbForPlatform Database connection for console * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @return void * @throws \Utopia\Database\Exception * @throws Authorization * @throws Conflict * @throws Structure */ - private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void { // Check if update or insert required $certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]); @@ -219,7 +223,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime); } /** @@ -338,7 +342,7 @@ class Certificates extends Action * * @return void */ - private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void { // TODO: @christyjacob remove once we migrate the rules in 1.7.x if (System::getEnv('_APP_RULES_FORMAT') === 'md5') { @@ -367,50 +371,34 @@ class Certificates extends Action return; } - /** Trigger Webhook */ $ruleModel = new Rule(); + $queueForEvents + ->setProject($project) + ->setEvent('rules.[ruleId].update') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); + + /** Trigger Webhook */ $queueForEvents ->setQueue(Event::WEBHOOK_QUEUE_NAME) ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setProject($project) - ->setEvent('rules.[ruleId].update') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) ->trigger(); - /** Trigger Functions */ $queueForFunctions - ->setProject($project) - ->setEvent('rules.[ruleId].update') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].update', [ - 'ruleId' => $rule->getId(), - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setProjectId('console') + ->trigger(); + + $queueForRealtime + ->from($queueForEvents) + ->setProjectId($project->getId()) + ->trigger(); } } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 6db88a618f..b617da3e33 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -36,9 +36,9 @@ class Databases extends Action ->inject('project') ->inject('dbForPlatform') ->inject('dbForProject') - ->inject('log') ->inject('queueForRealtime') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime)); + ->inject('log') + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $queueForRealtime, $log)); } /** @@ -46,12 +46,12 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject - * @param Log $log * @param Realtime $queueForRealtime + * @param Log $log * @return void * @throws \Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void + public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log): void { $payload = $message->getPayload() ?? []; @@ -108,6 +108,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update"; /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing informations. @@ -196,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); + $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -230,6 +231,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete'; $collectionId = $collection->getId(); $key = $attribute->getAttribute('key', ''); $type = $attribute->getAttribute('type', ''); @@ -302,7 +304,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $queueForRealtime); + $this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -389,6 +391,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update'; $collectionId = $collection->getId(); $key = $index->getAttribute('key', ''); $type = $index->getAttribute('type', ''); @@ -415,7 +418,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $queueForRealtime); + $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -445,6 +448,7 @@ class Databases extends Action } $projectId = $project->getId(); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete'; $key = $index->getAttribute('key'); $status = $index->getAttribute('status', ''); $project = $dbForPlatform->getDocument('projects', $projectId); @@ -470,7 +474,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $queueForRealtime); + $this->trigger($database, $collection, $index, $project, $event, $queueForRealtime); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -605,12 +609,13 @@ class Databases extends Action Document $collection, Document $attribute, Document $project, + string $event, Realtime $queueForRealtime ): void { $queueForRealtime ->setProject($project) ->setProjectId('console') - ->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update') + ->setEvent($event) ->setParam('databaseId', $database->getId()) ->setParam('collectionId', $collection->getId()) ->setParam('attributeId', $attribute->getId())