chore: refactor certificate realtime, fix database

This commit is contained in:
Chirag Aggarwal 2025-02-11 04:21:36 +00:00
parent 47fbb777ed
commit bcdae7f4e3
2 changed files with 46 additions and 53 deletions

View file

@ -6,7 +6,7 @@ use Appwrite\Certificates\Adapter as CertificatesAdapter;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Appwrite\Network\Validator\CNAME;
use Appwrite\Template\Template;
use Appwrite\Utopia\Response\Model\Rule;
@ -47,11 +47,12 @@ class Certificates extends Action
->inject('queueForMails')
->inject('queueForEvents')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('log')
->inject('certificates')
->callback(
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates)
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates)
);
}
@ -61,13 +62,14 @@ class Certificates extends Action
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Log $log
* @param CertificatesAdapter $certificates
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates): void
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void
{
$payload = $message->getPayload() ?? [];
@ -81,7 +83,7 @@ class Certificates extends Action
$log->addTag('domain', $domain->get());
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates, $skipRenewCheck);
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck);
}
/**
@ -90,13 +92,14 @@ class Certificates extends Action
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param CertificatesAdapter $certificates
* @param bool $skipRenewCheck
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
{
/**
* 1. Read arguments and validate domain
@ -186,7 +189,7 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions);
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime);
}
}
@ -199,13 +202,14 @@ class Certificates extends Action
* @param Database $dbForPlatform Database connection for console
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @return void
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Conflict
* @throws Structure
*/
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// Check if update or insert required
$certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]);
@ -219,7 +223,7 @@ class Certificates extends Action
}
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions);
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime);
}
/**
@ -338,7 +342,7 @@ class Certificates extends Action
*
* @return void
*/
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (System::getEnv('_APP_RULES_FORMAT') === 'md5') {
@ -367,50 +371,34 @@ class Certificates extends Action
return;
}
/** Trigger Webhook */
$ruleModel = new Rule();
$queueForEvents
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
/** Trigger Webhook */
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->trigger();
/** Trigger Functions */
$queueForFunctions
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->from($queueForEvents)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('rules.[ruleId].update', [
'ruleId' => $rule->getId(),
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $rule,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Events */
$queueForRealtime
->from($queueForEvents)
->setProjectId('console')
->trigger();
$queueForRealtime
->from($queueForEvents)
->setProjectId($project->getId())
->trigger();
}
}
}

View file

@ -36,9 +36,9 @@ class Databases extends Action
->inject('project')
->inject('dbForPlatform')
->inject('dbForProject')
->inject('log')
->inject('queueForRealtime')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log, $queueForRealtime));
->inject('log')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $queueForRealtime, $log));
}
/**
@ -46,12 +46,12 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Log $log
* @param Realtime $queueForRealtime
* @param Log $log
* @return void
* @throws \Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log, Realtime $queueForRealtime): void
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -108,6 +108,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update";
/**
* TODO @christyjacob4 verify if this is still the case
* Fetch attribute from the database, since with Resque float values are loosing informations.
@ -196,7 +197,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $queueForRealtime);
$this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime);
if (! $relatedCollection->isEmpty()) {
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
@ -230,6 +231,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete';
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$type = $attribute->getAttribute('type', '');
@ -302,7 +304,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $queueForRealtime);
$this->trigger($database, $collection, $attribute, $project, $event, $queueForRealtime);
}
// The underlying database removes/rebuilds indexes when attribute is removed
@ -389,6 +391,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update';
$collectionId = $collection->getId();
$key = $index->getAttribute('key', '');
$type = $index->getAttribute('type', '');
@ -415,7 +418,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $queueForRealtime);
$this->trigger($database, $collection, $index, $project, $event, $queueForRealtime);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
}
}
@ -445,6 +448,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete';
$key = $index->getAttribute('key');
$status = $index->getAttribute('status', '');
$project = $dbForPlatform->getDocument('projects', $projectId);
@ -470,7 +474,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $queueForRealtime);
$this->trigger($database, $collection, $index, $project, $event, $queueForRealtime);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
}
}
@ -605,12 +609,13 @@ class Databases extends Action
Document $collection,
Document $attribute,
Document $project,
string $event,
Realtime $queueForRealtime
): void {
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update')
->setEvent($event)
->setParam('databaseId', $database->getId())
->setParam('collectionId', $collection->getId())
->setParam('attributeId', $attribute->getId())