chore: added targets to realtime, refactor webhook queues

This commit is contained in:
Chirag Aggarwal 2025-02-18 07:58:56 +00:00
parent 6ef4e76aa6
commit a74c6e0b10
8 changed files with 103 additions and 99 deletions

View file

@ -9,6 +9,7 @@ use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Validator\FunctionEvent;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\Validator\Headers;
@ -194,10 +195,12 @@ App::post('/v1/functions')
->inject('user')
->inject('queueForEvents')
->inject('queueForBuilds')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('dbForPlatform')
->inject('gitHub')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
// Temporary abuse check
@ -396,26 +399,19 @@ App::post('/v1/functions')
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
/** Trigger Webhook */
$ruleCreate
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME)
$queueForWebhooks
->from($ruleCreate)
->trigger();
/** Trigger Functions */
$ruleCreate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($ruleCreate)
->trigger();
/** Trigger Realtime Events */
$queueForRealtime
->from($ruleCreate)
->setProjectId('console')
->trigger();
$queueForRealtime
->from($ruleCreate)
->setProjectId($project->getId())
->setTargets(['console', $project->getId()])
->trigger();
}

View file

@ -62,11 +62,11 @@ class Event
protected array $params = [];
protected array $sensitive = [];
protected array $payload = [];
protected array $targets = [];
protected array $context = [];
protected ?Document $project = null;
protected ?Document $user = null;
protected ?string $userId = null;
protected ?string $projectId = null;
protected bool $paused = false;
/**
@ -152,18 +152,6 @@ class Event
return $this;
}
/**
* Set projectId for this event.
*
* @param string $projectId
* @return self
*/
public function setProjectId(string $projectId): self
{
$this->projectId = $projectId;
return $this;
}
/**
* Get project for this event.
*
@ -174,16 +162,6 @@ class Event
return $this->project;
}
/**
* Get projectId for this event.
*
* @return ?string
*/
public function getProjectId(): ?string
{
return $this->projectId;
}
/**
* Set user for this event.
*
@ -255,6 +233,27 @@ class Event
return $this->payload;
}
/**
* Get targets for this event.
*
* @return array
*/
public function setTargets(array $targets): self
{
$this->targets = $targets;
return $this;
}
/**
* Get targets for this event.
*
* @return array
*/
public function getTargets(): array
{
return $this->targets;
}
/**
* Set context for this event.
*

View file

@ -53,17 +53,33 @@ class Realtime extends Event
bucket: $bucket,
);
RealtimeAdapter::send(
projectId: $this->getProjectId() ?? $target['projectId'] ?? $this->getProject()->getId(),
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
if (!empty($this->getTargets())) {
foreach ($this->getTargets() as $targetProjectId) {
RealtimeAdapter::send(
projectId: $targetProjectId,
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
}
} else {
RealtimeAdapter::send(
projectId: $target['projectId'] ?? $this->getProject()->getId(),
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
}
return true;
}

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Utopia\Response\Model\Deployment;
use Appwrite\Vcs\Comment;
use Exception;
@ -49,6 +50,7 @@ class Builds extends Action
->inject('project')
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForStatsUsage')
@ -57,8 +59,8 @@ class Builds extends Action
->inject('deviceForFunctions')
->inject('isResourceBlocked')
->inject('log')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log));
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log));
}
/**
@ -66,6 +68,7 @@ class Builds extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage
@ -76,7 +79,7 @@ class Builds extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -97,7 +100,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log);
$this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log);
break;
default:
@ -107,6 +110,7 @@ class Builds extends Action
/**
* @param Device $deviceForFunctions
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Event $queueForEvents
@ -123,7 +127,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void
protected function buildDeployment(Device $deviceForFunctions, Webhooks $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -379,7 +383,7 @@ class Builds extends Action
*/
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setTargets(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
@ -431,19 +435,19 @@ class Builds extends Action
$this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForPlatform);
}
/** Trigger Webhook */
$deploymentModel = new Deployment();
$deploymentUpdate =
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setEvent('functions.[functionId].deployments.[deploymentId].update')
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules())));
$deploymentUpdate->trigger();
/** Trigger Webhook */
$queueForWebhooks
->from($deploymentUpdate)
->trigger();
/** Trigger Functions */
$queueForFunctions
@ -453,7 +457,7 @@ class Builds extends Action
/** Trigger Realtime Event */
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setTargets(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
@ -585,7 +589,7 @@ class Builds extends Action
*/
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setTargets(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
@ -682,7 +686,7 @@ class Builds extends Action
*/
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setTargets(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Network\Validator\CNAME;
use Appwrite\Template\Template;
use Appwrite\Utopia\Response\Model\Rule;
@ -46,13 +47,14 @@ class Certificates extends Action
->inject('dbForPlatform')
->inject('queueForMails')
->inject('queueForEvents')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('log')
->inject('certificates')
->callback(
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates)
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates)
);
}
@ -61,6 +63,7 @@ class Certificates extends Action
* @param Database $dbForPlatform
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Log $log
@ -69,7 +72,7 @@ class Certificates extends Action
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void
{
$payload = $message->getPayload() ?? [];
@ -83,7 +86,7 @@ class Certificates extends Action
$log->addTag('domain', $domain->get());
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck);
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck);
}
/**
@ -99,7 +102,7 @@ class Certificates extends Action
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
{
/**
* 1. Read arguments and validate domain
@ -189,7 +192,7 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime);
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
}
}
@ -209,7 +212,7 @@ class Certificates extends Action
* @throws Conflict
* @throws Structure
*/
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// Check if update or insert required
$certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]);
@ -223,7 +226,7 @@ class Certificates extends Action
}
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime);
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
}
/**
@ -342,7 +345,7 @@ class Certificates extends Action
*
* @return void
*/
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime): void
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (System::getEnv('_APP_RULES_FORMAT') === 'md5') {
@ -379,9 +382,8 @@ class Certificates extends Action
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
/** Trigger Webhook */
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
$queueForWebhooks
->from($queueForEvents)
->trigger();
/** Trigger Functions */
@ -392,12 +394,7 @@ class Certificates extends Action
/** Trigger Realtime Events */
$queueForRealtime
->from($queueForEvents)
->setProjectId('console')
->trigger();
$queueForRealtime
->from($queueForEvents)
->setProjectId($project->getId())
->setTargets(['console', $projectId])
->trigger();
}
}

View file

@ -604,7 +604,7 @@ class Databases extends Action
): void {
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setTargets(['console'])
->setEvent($event)
->setParam('databaseId', $database->getId())
->setParam('collectionId', $collection->getId());

View file

@ -7,6 +7,7 @@ use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Utopia\Response\Model\Execution;
use Exception;
use Executor\Executor;
@ -44,16 +45,17 @@ class Functions extends Action
->inject('project')
->inject('message')
->inject('dbForProject')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('log')
->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
}
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -137,6 +139,7 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
@ -178,6 +181,7 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
@ -201,6 +205,7 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
@ -312,6 +317,7 @@ class Functions extends Action
private function execute(
Log $log,
Database $dbForProject,
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
StatsUsage $queueForStatsUsage,
@ -582,9 +588,8 @@ class Functions extends Action
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())));
/** Trigger Webhook */
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
$queueForWebhooks
->from($queueForEvents)
->trigger();
/** Trigger Functions */
@ -595,12 +600,7 @@ class Functions extends Action
/** Trigger Realtime Events */
$queueForRealtime
->from($queueForEvents)
->setProjectId('console')
->trigger();
$queueForRealtime
->from($queueForEvents)
->setProjectId($project->getId())
->setTargets(['console', $project->getId()])
->trigger();
if (!empty($error)) {

View file

@ -160,15 +160,7 @@ class Migrations extends Action
/** Trigger Realtime Events */
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($migration->getArrayCopy())
->trigger();
$queueForRealtime
->setProject($project)
->setProjectId($project->getId())
->setTargets(['console', $project->getId()])
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($migration->getArrayCopy())